javax.jms.MessageProducer#setTimeToLive ( )源码实例Demo

下面列出了javax.jms.MessageProducer#setTimeToLive ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: activemq-artemis   文件: AdvisoryTests.java
public void testMessageExpiredAdvisory() throws Exception {
   Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   Queue queue = s.createQueue(getClass().getName());
   MessageConsumer consumer = s.createConsumer(queue);
   assertNotNull(consumer);

   Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue);
   MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
   //start throwing messages at the consumer
   MessageProducer producer = s.createProducer(queue);
   producer.setTimeToLive(1);
   for (int i = 0; i < MESSAGE_COUNT; i++) {
      BytesMessage m = s.createBytesMessage();
      m.writeBytes(new byte[1024]);
      producer.send(m);
   }

   Message msg = advisoryConsumer.receive(2000);
   assertNotNull(msg);
}
 
public static MessageProducer createMessageProducer(
    Session session,
    Destination destination,
    MessageProducerOption producerOption) throws JMSException {

  MessageProducer producer = session.createProducer(destination);
  producer.setDeliveryDelay(producerOption.getDeliveryDelay());
  producer.setDeliveryMode(producerOption.getDeliveryMode());
  producer.setDisableMessageTimestamp(producerOption.isDisableMessageTimestamp());
  producer.setDisableMessageID(producerOption.isDisableMessageId());
  producer.setPriority(producerOption.getPriority());
  producer.setTimeToLive(producerOption.getTimeToLive());

  return producer;

}
 
源代码3 项目: activemq-artemis   文件: MessageProducerTest.java
@Test
public void testGetTimeToLiveOnClosedProducer() throws Exception {
   Connection pconn = createConnection();

   Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageProducer p = ps.createProducer(ActiveMQServerTestCase.topic1);

   p.close();

   try {
      p.setTimeToLive(100L);
      ProxyAssertSupport.fail("should throw exception");
   } catch (javax.jms.IllegalStateException e) {
      // OK
   }
}
 
源代码4 项目: activemq-artemis   文件: JmsConsumerTest.java
@Test
public void testPreCommitAcksWithMessageExpiry() throws Exception {
   conn = cf.createConnection();
   Session session = conn.createSession(false, ActiveMQJMSConstants.PRE_ACKNOWLEDGE);
   jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
   MessageProducer producer = session.createProducer(jBossQueue);
   MessageConsumer consumer = session.createConsumer(jBossQueue);
   int noOfMessages = 1000;
   for (int i = 0; i < noOfMessages; i++) {
      TextMessage textMessage = session.createTextMessage("m" + i);
      producer.setTimeToLive(1);
      producer.send(textMessage);
   }

   Thread.sleep(2);

   conn.start();

   Message m = consumer.receiveNoWait();
   Assert.assertNull(m);

   // Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this
   // point
   // which can cause delivering count to flip to 1

}
 
源代码5 项目: activemq-artemis   文件: JmsMessageConsumerTest.java
@Test
public void testAsyncReceiveWithExpirationChecks() throws Exception {
   ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);

   final CountDownLatch received = new CountDownLatch(1);

   Connection connection = factory.createConnection();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   Destination destination = session.createQueue(name.getMethodName());
   MessageConsumer consumer = session.createConsumer(destination);
   consumer.setMessageListener(new MessageListener() {

      @Override
      public void onMessage(Message message) {
         received.countDown();
      }
   });
   MessageProducer producer = session.createProducer(destination);
   producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));

   producer.send(session.createTextMessage("test"));

   // Allow message to expire in the prefetch buffer
   TimeUnit.SECONDS.sleep(4);
   connection.start();

   assertFalse(received.await(1, TimeUnit.SECONDS));
   connection.close();
}
 
源代码6 项目: qpid-broker-j   文件: TimeToLiveTest.java
@Test
public void testPassiveTTL() throws Exception
{
    Queue queue = createQueue(getTestName());
    Connection connection = getConnection();
    long timeToLiveMillis = getReceiveTimeout();
    try
    {
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        MessageProducer producer = session.createProducer(queue);
        producer.setTimeToLive(timeToLiveMillis);
        producer.send(session.createTextMessage("A"));
        producer.setTimeToLive(0);
        producer.send(session.createTextMessage("B"));
        session.commit();

        Thread.sleep(timeToLiveMillis);

        MessageConsumer consumer = session.createConsumer(queue);
        connection.start();
        Message message = consumer.receive(getReceiveTimeout());

        assertTrue("TextMessage should be received", message instanceof TextMessage);
        assertEquals("Unexpected message received", "B", ((TextMessage)message).getText());
    }
    finally
    {
        connection.close();
    }
}
 
源代码7 项目: activemq-artemis   文件: MessageProducerTest.java
@Test
public void testSetTimeToLive() throws Exception {
   Connection pconn = createConnection();
   Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageProducer p = ps.createProducer(ActiveMQServerTestCase.topic1);

   p.setTimeToLive(100L);
   ProxyAssertSupport.assertEquals(100L, p.getTimeToLive());

   p.setTimeToLive(0L);
   ProxyAssertSupport.assertEquals(0L, p.getTimeToLive());
}
 
源代码8 项目: c2mon   文件: ClientRequestReportHandler.java
/**
 * Reports are sent to the clients as json messages.
 * @param jsonResponse the message to be sent
 */
private void sendJsonResponse(final String jsonResponse) {

  if (replyDestination == null) {
    log.error("sendJsonResponse() : JMSReplyTo destination is null - cannot send reply.");
    return;
  }

  MessageProducer messageProducer = null;
  try {
    messageProducer = session.createProducer(replyDestination);
    messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    messageProducer.setTimeToLive(defaultReplyTTL);

    Message replyMessage = null;

    // Send response as Json message
    replyMessage = session.createTextMessage(jsonResponse);
    messageProducer.send(replyMessage);

    log.debug("ClientRequestReportHandler() : Report sent.");
  } catch (Throwable e) {
    log.warn("daqTotalParts(): Failed to send Progress report :" + e.getMessage(), e);
  } finally {
    if (messageProducer != null) {
      try {
        messageProducer.close();
      } catch (JMSException ignore) { // IGNORE
      }
    }
  }
}
 
源代码9 项目: Thunder   文件: MQTemplate.java
@Override
protected void doSend(Session session, Destination destination, MessageCreator messageCreator) throws JMSException {
    MessageProducer producer = null;
    try {
        Message message = messageCreator.createMessage(session);
        boolean async = message.getBooleanProperty(ThunderConstant.ASYNC_ATTRIBUTE_NAME);
        long timeout = message.getLongProperty(ThunderConstant.TIMEOUT_ATTRIBUTE_NAME);

        producer = createProducer(session, destination);
        // DeliveryMode.PERSISTENT:持久化模式,消息在硬盘堆积模式
        // DeliveryMode.NON_PERSISTENT:非持久化模式,消息在内存堆积模式
        if (async) {
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        } else {
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        }
        producer.setTimeToLive(timeout);

        doSend(producer, message);

        if (session.getTransacted() && isSessionLocallyTransacted(session)) {
            JmsUtils.commitIfNecessary(session);
        }
    } finally {
        if (producer != null) {
            JmsUtils.closeMessageProducer(producer);
        }
    }
}
 
源代码10 项目: activemq-artemis   文件: ExpiredMessageTest.java
@Test
public void testExpiredAndLivingMessages() throws Exception {
   Connection conn = getConnectionFactory().createConnection();
   Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageProducer prod = session.createProducer(queue1);

   // sent 2 messages: 1 expiring, 1 living
   TextMessage livingMessage = session.createTextMessage("This message will live");
   TextMessage expiringMessage = session.createTextMessage("This message will expire");

   prod.setTimeToLive(1);
   prod.send(expiringMessage);

   prod.setTimeToLive(0);
   prod.send(livingMessage);

   // wait for the expiring message to die
   Thread.sleep(250);

   MessageConsumer cons = session.createConsumer(queue1);
   conn.start();

   // receive living message
   Message receivedMessage = cons.receive(1000);
   ProxyAssertSupport.assertNotNull("did not receive living message", receivedMessage);
   ProxyAssertSupport.assertTrue(receivedMessage instanceof TextMessage);
   ProxyAssertSupport.assertEquals(livingMessage.getText(), ((TextMessage) receivedMessage).getText());

   // we do not receive the expiring message
   ProxyAssertSupport.assertNull(cons.receiveNoWait());

   conn.close();
}
 
源代码11 项目: activemq-artemis   文件: CorePluginTest.java
@Test
public void testMessageExpireClient() throws Exception {
   final AckPluginVerifier expiredVerifier = new AckPluginVerifier((ref, reason) -> assertEquals(AckReason.EXPIRED, reason));
   server.registerBrokerPlugin(expiredVerifier);

   conn = cf.createConnection();
   conn.start();
   Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

   MessageProducer prod = sess.createProducer(queue);
   prod.setTimeToLive(500);
   MessageConsumer cons = sess.createConsumer(queue);

   for (int i = 0; i < 10; i++) {
      TextMessage msg1 = sess.createTextMessage("test");
      prod.send(msg1);
   }
   Thread.sleep(500);
   assertNull(cons.receive(500));

   conn.close();

   verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
         BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS, BEFORE_REMOVE_BINDING, AFTER_REMOVE_BINDING);
   verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
         BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
         BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
         AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, MESSAGE_EXPIRED, BEFORE_ADD_ADDRESS,
         AFTER_ADD_ADDRESS, BEFORE_ADD_BINDING, AFTER_ADD_BINDING);
   verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
         AFTER_CLOSE_SESSION);

   assertFalse(expiredVerifier.getErrorMsg(), expiredVerifier.hasError());
}
 
源代码12 项目: qpid-jms   文件: JmsMessageProducerTest.java
@Test(timeout = 10000)
public void testTimeToLiveConfiguration() throws Exception {
    MessageProducer producer = session.createProducer(null);
    assertEquals(Message.DEFAULT_TIME_TO_LIVE, producer.getTimeToLive());
    producer.setTimeToLive(1000);
    assertEquals(1000, producer.getTimeToLive());
}
 
protected void sendAndReceive(boolean send,
                              boolean receive,
                              String txtMessage,
                              long expiry) throws JMSException, InterruptedException {
   Connection connection = createActiveMQConnection();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   connection.start();
   Queue queue = session.createQueue("TEST.QUEUE");
   MessageConsumer messageConsumer = null;

   if (receive) {
      messageConsumer = session.createConsumer(queue);
      Thread.sleep(1000);
   }
   if (send) {
      MessageProducer messageProducer = session.createProducer(queue);
      if (expiry > 0) {
         messageProducer.setTimeToLive(expiry);
      }
      messageProducer.send(session.createTextMessage(txtMessage));
   }
   if (receive) {
      messageConsumer.receive(100);
      messageConsumer.close();
   }

   session.close();
   connection.close();
}
 
源代码14 项目: olat   文件: SearchClientJMSProxy.java
private Message doSearchRequest(final Session session, final Message message) throws JMSException {
    final Destination replyQueue = acquireTempQueue(session);
    if (log.isDebugEnabled()) {
        log.debug("doSearchRequest replyQueue=" + replyQueue);
    }
    try {
        final MessageConsumer responseConsumer = session.createConsumer(replyQueue);

        message.setJMSReplyTo(replyQueue);
        final String correlationId = createRandomString();
        message.setJMSCorrelationID(correlationId);

        final MessageProducer producer = session.createProducer(searchQueue_);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        producer.setTimeToLive(timeToLive_);
        if (log.isDebugEnabled()) {
            log.debug("Sending search request message with correlationId=" + correlationId);
        }
        producer.send(message);
        producer.close();

        Message returnedMessage = null;
        final long start = System.currentTimeMillis();
        while (true) {
            final long diff = (start + receiveTimeout_) - System.currentTimeMillis();
            if (diff <= 0) {
                // timeout
                log.info("Timeout in search. Remaining time zero or negative.");
                break;
            }
            if (log.isDebugEnabled()) {
                log.debug("doSearchRequest: call receive with timeout=" + diff);
            }
            returnedMessage = responseConsumer.receive(diff);
            if (returnedMessage == null) {
                // timeout case, we're stopping now with a reply...
                log.info("Timeout in search. Reply was null.");
                break;
            } else if (!correlationId.equals(returnedMessage.getJMSCorrelationID())) {
                // we got an old reply from a previous search request
                log.info("Got a response with a wrong correlationId. Ignoring and waiting for the next");
                continue;
            } else {
                // we got a valid reply
                break;
            }
        }
        responseConsumer.close();
        if (log.isDebugEnabled()) {
            log.debug("doSearchRequest: returnedMessage=" + returnedMessage);
        }
        return returnedMessage;
    } finally {
        releaseTempQueue(replyQueue);
    }
}
 
源代码15 项目: activemq-artemis   文件: ExpiredMessageTest.java
@Test
public void testSimpleExpiration() throws Exception {
   Connection conn = getConnectionFactory().createConnection();

   Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

   MessageProducer prod = session.createProducer(queue1);
   prod.setTimeToLive(1);

   Message m = session.createTextMessage("This message will die");

   prod.send(m);

   // wait for the message to die

   Thread.sleep(250);

   MessageConsumer cons = session.createConsumer(queue1);

   conn.start();

   ProxyAssertSupport.assertNull(cons.receiveNoWait());

   conn.close();
}
 
源代码16 项目: activemq-artemis   文件: MessageExpirationTest.java
@Test
public void testAmqpJmsReloaded() throws Exception {
   SimpleString queue = RandomUtil.randomSimpleString();
   SimpleString expiry = RandomUtil.randomSimpleString();

   server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST));
   server.createQueue(new QueueConfiguration(expiry));
   server.getAddressSettingsRepository().addMatch(queue.toString(), new AddressSettings().setExpiryAddress(expiry));

   ConnectionFactory cf = new JmsConnectionFactory("amqp://localhost:61616");
   Connection connection = cf.createConnection();
   Session session = connection.createSession();
   MessageProducer producer = session.createProducer(session.createQueue(queue.toString()));
   producer.setTimeToLive(EXPIRATION);

   for (int i = 0; i < 20; i++) {
      javax.jms.Message message = session.createMessage();
      producer.send(message);
   }
   connection.close();
   Wait.assertEquals(20L, () -> server.locateQueue(queue).getMessageCount(), 2000, 100);
   Wait.assertEquals(0L, () -> server.locateQueue(expiry).getMessageCount(), 2000, 100);

   server.stop();
   server.start();

   Thread.sleep(EXPIRATION * 2);

   Wait.assertEquals(0L, () -> server.locateQueue(queue).getMessageCount(), 2000, 100);
   Wait.assertEquals(20L, () -> server.locateQueue(expiry).getMessageCount(), 2000, 100);

   connection = cf.createConnection();
   session = connection.createSession();
   MessageConsumer consumer = session.createConsumer(session.createQueue(queue.toString()));
   connection.start();

   for (int i = 0; i < 20; i++) {
      javax.jms.Message message2 = consumer.receiveNoWait();
      Assert.assertNull(message2);
   }

   consumer.close();
}
 
源代码17 项目: olat   文件: SearchClientProxy.java
private Message doSearchRequest(final Session session, final Message message) throws JMSException {
    final Destination replyQueue = acquireTempQueue(session);
    if (log.isDebugEnabled()) {
        log.debug("doSearchRequest replyQueue=" + replyQueue);
    }
    try {
        final MessageConsumer responseConsumer = session.createConsumer(replyQueue);

        message.setJMSReplyTo(replyQueue);
        final String correlationId = createRandomString();
        message.setJMSCorrelationID(correlationId);

        final MessageProducer producer = session.createProducer(searchQueue_);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        producer.setTimeToLive(timeToLive_);
        if (log.isDebugEnabled()) {
            log.debug("Sending search request message with correlationId=" + correlationId);
        }
        producer.send(message);
        producer.close();

        Message returnedMessage = null;
        final long start = System.currentTimeMillis();
        while (true) {
            final long diff = (start + receiveTimeout_) - System.currentTimeMillis();
            if (diff <= 0) {
                // timeout
                log.info("Timeout in search. Remaining time zero or negative.");
                break;
            }
            if (log.isDebugEnabled()) {
                log.debug("doSearchRequest: call receive with timeout=" + diff);
            }
            returnedMessage = responseConsumer.receive(diff);
            if (returnedMessage == null) {
                // timeout case, we're stopping now with a reply...
                log.info("Timeout in search. Repy was null.");
                break;
            } else if (!correlationId.equals(returnedMessage.getJMSCorrelationID())) {
                // we got an old reply from a previous search request
                log.info("Got a response with a wrong correlationId. Ignoring and waiting for the next");
                continue;
            } else {
                // we got a valid reply
                break;
            }
        }
        responseConsumer.close();
        if (log.isDebugEnabled()) {
            log.debug("doSearchRequest: returnedMessage=" + returnedMessage);
        }
        return returnedMessage;
    } finally {
        releaseTempQueue(replyQueue);
    }
}
 
源代码18 项目: activemq-artemis   文件: ExpiredMessageTest.java
@Test
public void testManyExpiredMessagesAtOnce() throws Exception {
   Connection conn = getConnectionFactory().createConnection();

   Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

   MessageProducer prod = session.createProducer(queue1);
   prod.setTimeToLive(1);

   Message m = session.createTextMessage("This message will die");

   final int MESSAGE_COUNT = 100;

   for (int i = 0; i < MESSAGE_COUNT; i++) {
      prod.send(m);
   }

   MessageConsumer cons = session.createConsumer(queue1);
   conn.start();

   ProxyAssertSupport.assertNull(cons.receiveNoWait());

   conn.close();
}
 
源代码19 项目: activemq-artemis   文件: ExpiryMessageTest.java
@Test
public void testSendTopicNoSubscription() throws Exception {
   Topic topic = createTopic("test-topic");
   AddressControl control = ManagementControlHelper.createAddressControl(new SimpleString(topic.getTopicName()), mbeanServer);

   Connection conn2 = cf.createConnection();

   conn2.setClientID("client1");

   Session sess2 = conn2.createSession(true, Session.SESSION_TRANSACTED);

   sess2.createDurableSubscriber(topic, "client-sub1");
   sess2.createDurableSubscriber(topic, "client-sub2");

   conn2.close();

   conn = cf.createConnection();
   Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
   MessageProducer prod = sess.createProducer(topic);
   prod.setTimeToLive(100);

   for (int i = 0; i < 100; i++) {
      TextMessage txt = sess.createTextMessage("txt");
      prod.send(txt);
   }

   sess.commit();

   conn.close();

   Wait.assertEquals(0, control::getMessageCount);

}
 
源代码20 项目: activemq-artemis   文件: SelectorTest.java
@Test
public void testJMSExpirationOnSelector() throws Exception {
   Connection conn = null;

   try {
      conn = getConnectionFactory().createConnection();
      conn.start();

      Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

      MessageProducer prod = session.createProducer(queue1);

      TextMessage msg1 = session.createTextMessage("msg1");
      prod.send(msg1);

      prod.setTimeToLive(100000);

      TextMessage msg2 = session.createTextMessage("msg2");

      prod.send(msg2);

      long expire = msg2.getJMSExpiration();

      String selector = "JMSExpiration = " + expire;

      MessageConsumer cons = session.createConsumer(queue1, selector);

      conn.start();

      TextMessage rec = (TextMessage) cons.receive(10000);

      assertNotNull(rec);

      Assert.assertEquals("msg2", rec.getText());

      assertNull(cons.receiveNoWait());

   } finally {
      if (conn != null) {
         conn.close();
      }
   }
}