下面列出了javax.jms.MessageProducer#setTimeToLive ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void testMessageExpiredAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
assertNotNull(consumer);
Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
producer.setTimeToLive(1);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(2000);
assertNotNull(msg);
}
public static MessageProducer createMessageProducer(
Session session,
Destination destination,
MessageProducerOption producerOption) throws JMSException {
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryDelay(producerOption.getDeliveryDelay());
producer.setDeliveryMode(producerOption.getDeliveryMode());
producer.setDisableMessageTimestamp(producerOption.isDisableMessageTimestamp());
producer.setDisableMessageID(producerOption.isDisableMessageId());
producer.setPriority(producerOption.getPriority());
producer.setTimeToLive(producerOption.getTimeToLive());
return producer;
}
@Test
public void testGetTimeToLiveOnClosedProducer() throws Exception {
Connection pconn = createConnection();
Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = ps.createProducer(ActiveMQServerTestCase.topic1);
p.close();
try {
p.setTimeToLive(100L);
ProxyAssertSupport.fail("should throw exception");
} catch (javax.jms.IllegalStateException e) {
// OK
}
}
@Test
public void testPreCommitAcksWithMessageExpiry() throws Exception {
conn = cf.createConnection();
Session session = conn.createSession(false, ActiveMQJMSConstants.PRE_ACKNOWLEDGE);
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
MessageProducer producer = session.createProducer(jBossQueue);
MessageConsumer consumer = session.createConsumer(jBossQueue);
int noOfMessages = 1000;
for (int i = 0; i < noOfMessages; i++) {
TextMessage textMessage = session.createTextMessage("m" + i);
producer.setTimeToLive(1);
producer.send(textMessage);
}
Thread.sleep(2);
conn.start();
Message m = consumer.receiveNoWait();
Assert.assertNull(m);
// Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this
// point
// which can cause delivering count to flip to 1
}
@Test
public void testAsyncReceiveWithExpirationChecks() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
final CountDownLatch received = new CountDownLatch(1);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(name.getMethodName());
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
received.countDown();
}
});
MessageProducer producer = session.createProducer(destination);
producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
producer.send(session.createTextMessage("test"));
// Allow message to expire in the prefetch buffer
TimeUnit.SECONDS.sleep(4);
connection.start();
assertFalse(received.await(1, TimeUnit.SECONDS));
connection.close();
}
@Test
public void testPassiveTTL() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
long timeToLiveMillis = getReceiveTimeout();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(queue);
producer.setTimeToLive(timeToLiveMillis);
producer.send(session.createTextMessage("A"));
producer.setTimeToLive(0);
producer.send(session.createTextMessage("B"));
session.commit();
Thread.sleep(timeToLiveMillis);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Message message = consumer.receive(getReceiveTimeout());
assertTrue("TextMessage should be received", message instanceof TextMessage);
assertEquals("Unexpected message received", "B", ((TextMessage)message).getText());
}
finally
{
connection.close();
}
}
@Test
public void testSetTimeToLive() throws Exception {
Connection pconn = createConnection();
Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = ps.createProducer(ActiveMQServerTestCase.topic1);
p.setTimeToLive(100L);
ProxyAssertSupport.assertEquals(100L, p.getTimeToLive());
p.setTimeToLive(0L);
ProxyAssertSupport.assertEquals(0L, p.getTimeToLive());
}
/**
* Reports are sent to the clients as json messages.
* @param jsonResponse the message to be sent
*/
private void sendJsonResponse(final String jsonResponse) {
if (replyDestination == null) {
log.error("sendJsonResponse() : JMSReplyTo destination is null - cannot send reply.");
return;
}
MessageProducer messageProducer = null;
try {
messageProducer = session.createProducer(replyDestination);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
messageProducer.setTimeToLive(defaultReplyTTL);
Message replyMessage = null;
// Send response as Json message
replyMessage = session.createTextMessage(jsonResponse);
messageProducer.send(replyMessage);
log.debug("ClientRequestReportHandler() : Report sent.");
} catch (Throwable e) {
log.warn("daqTotalParts(): Failed to send Progress report :" + e.getMessage(), e);
} finally {
if (messageProducer != null) {
try {
messageProducer.close();
} catch (JMSException ignore) { // IGNORE
}
}
}
}
@Override
protected void doSend(Session session, Destination destination, MessageCreator messageCreator) throws JMSException {
MessageProducer producer = null;
try {
Message message = messageCreator.createMessage(session);
boolean async = message.getBooleanProperty(ThunderConstant.ASYNC_ATTRIBUTE_NAME);
long timeout = message.getLongProperty(ThunderConstant.TIMEOUT_ATTRIBUTE_NAME);
producer = createProducer(session, destination);
// DeliveryMode.PERSISTENT:持久化模式,消息在硬盘堆积模式
// DeliveryMode.NON_PERSISTENT:非持久化模式,消息在内存堆积模式
if (async) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
producer.setTimeToLive(timeout);
doSend(producer, message);
if (session.getTransacted() && isSessionLocallyTransacted(session)) {
JmsUtils.commitIfNecessary(session);
}
} finally {
if (producer != null) {
JmsUtils.closeMessageProducer(producer);
}
}
}
@Test
public void testExpiredAndLivingMessages() throws Exception {
Connection conn = getConnectionFactory().createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = session.createProducer(queue1);
// sent 2 messages: 1 expiring, 1 living
TextMessage livingMessage = session.createTextMessage("This message will live");
TextMessage expiringMessage = session.createTextMessage("This message will expire");
prod.setTimeToLive(1);
prod.send(expiringMessage);
prod.setTimeToLive(0);
prod.send(livingMessage);
// wait for the expiring message to die
Thread.sleep(250);
MessageConsumer cons = session.createConsumer(queue1);
conn.start();
// receive living message
Message receivedMessage = cons.receive(1000);
ProxyAssertSupport.assertNotNull("did not receive living message", receivedMessage);
ProxyAssertSupport.assertTrue(receivedMessage instanceof TextMessage);
ProxyAssertSupport.assertEquals(livingMessage.getText(), ((TextMessage) receivedMessage).getText());
// we do not receive the expiring message
ProxyAssertSupport.assertNull(cons.receiveNoWait());
conn.close();
}
@Test
public void testMessageExpireClient() throws Exception {
final AckPluginVerifier expiredVerifier = new AckPluginVerifier((ref, reason) -> assertEquals(AckReason.EXPIRED, reason));
server.registerBrokerPlugin(expiredVerifier);
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(queue);
prod.setTimeToLive(500);
MessageConsumer cons = sess.createConsumer(queue);
for (int i = 0; i < 10; i++) {
TextMessage msg1 = sess.createTextMessage("test");
prod.send(msg1);
}
Thread.sleep(500);
assertNull(cons.receive(500));
conn.close();
verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS, BEFORE_REMOVE_BINDING, AFTER_REMOVE_BINDING);
verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, MESSAGE_EXPIRED, BEFORE_ADD_ADDRESS,
AFTER_ADD_ADDRESS, BEFORE_ADD_BINDING, AFTER_ADD_BINDING);
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);
assertFalse(expiredVerifier.getErrorMsg(), expiredVerifier.hasError());
}
@Test(timeout = 10000)
public void testTimeToLiveConfiguration() throws Exception {
MessageProducer producer = session.createProducer(null);
assertEquals(Message.DEFAULT_TIME_TO_LIVE, producer.getTimeToLive());
producer.setTimeToLive(1000);
assertEquals(1000, producer.getTimeToLive());
}
protected void sendAndReceive(boolean send,
boolean receive,
String txtMessage,
long expiry) throws JMSException, InterruptedException {
Connection connection = createActiveMQConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Queue queue = session.createQueue("TEST.QUEUE");
MessageConsumer messageConsumer = null;
if (receive) {
messageConsumer = session.createConsumer(queue);
Thread.sleep(1000);
}
if (send) {
MessageProducer messageProducer = session.createProducer(queue);
if (expiry > 0) {
messageProducer.setTimeToLive(expiry);
}
messageProducer.send(session.createTextMessage(txtMessage));
}
if (receive) {
messageConsumer.receive(100);
messageConsumer.close();
}
session.close();
connection.close();
}
private Message doSearchRequest(final Session session, final Message message) throws JMSException {
final Destination replyQueue = acquireTempQueue(session);
if (log.isDebugEnabled()) {
log.debug("doSearchRequest replyQueue=" + replyQueue);
}
try {
final MessageConsumer responseConsumer = session.createConsumer(replyQueue);
message.setJMSReplyTo(replyQueue);
final String correlationId = createRandomString();
message.setJMSCorrelationID(correlationId);
final MessageProducer producer = session.createProducer(searchQueue_);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setTimeToLive(timeToLive_);
if (log.isDebugEnabled()) {
log.debug("Sending search request message with correlationId=" + correlationId);
}
producer.send(message);
producer.close();
Message returnedMessage = null;
final long start = System.currentTimeMillis();
while (true) {
final long diff = (start + receiveTimeout_) - System.currentTimeMillis();
if (diff <= 0) {
// timeout
log.info("Timeout in search. Remaining time zero or negative.");
break;
}
if (log.isDebugEnabled()) {
log.debug("doSearchRequest: call receive with timeout=" + diff);
}
returnedMessage = responseConsumer.receive(diff);
if (returnedMessage == null) {
// timeout case, we're stopping now with a reply...
log.info("Timeout in search. Reply was null.");
break;
} else if (!correlationId.equals(returnedMessage.getJMSCorrelationID())) {
// we got an old reply from a previous search request
log.info("Got a response with a wrong correlationId. Ignoring and waiting for the next");
continue;
} else {
// we got a valid reply
break;
}
}
responseConsumer.close();
if (log.isDebugEnabled()) {
log.debug("doSearchRequest: returnedMessage=" + returnedMessage);
}
return returnedMessage;
} finally {
releaseTempQueue(replyQueue);
}
}
@Test
public void testSimpleExpiration() throws Exception {
Connection conn = getConnectionFactory().createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = session.createProducer(queue1);
prod.setTimeToLive(1);
Message m = session.createTextMessage("This message will die");
prod.send(m);
// wait for the message to die
Thread.sleep(250);
MessageConsumer cons = session.createConsumer(queue1);
conn.start();
ProxyAssertSupport.assertNull(cons.receiveNoWait());
conn.close();
}
@Test
public void testAmqpJmsReloaded() throws Exception {
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString expiry = RandomUtil.randomSimpleString();
server.createQueue(new QueueConfiguration(queue).setRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(expiry));
server.getAddressSettingsRepository().addMatch(queue.toString(), new AddressSettings().setExpiryAddress(expiry));
ConnectionFactory cf = new JmsConnectionFactory("amqp://localhost:61616");
Connection connection = cf.createConnection();
Session session = connection.createSession();
MessageProducer producer = session.createProducer(session.createQueue(queue.toString()));
producer.setTimeToLive(EXPIRATION);
for (int i = 0; i < 20; i++) {
javax.jms.Message message = session.createMessage();
producer.send(message);
}
connection.close();
Wait.assertEquals(20L, () -> server.locateQueue(queue).getMessageCount(), 2000, 100);
Wait.assertEquals(0L, () -> server.locateQueue(expiry).getMessageCount(), 2000, 100);
server.stop();
server.start();
Thread.sleep(EXPIRATION * 2);
Wait.assertEquals(0L, () -> server.locateQueue(queue).getMessageCount(), 2000, 100);
Wait.assertEquals(20L, () -> server.locateQueue(expiry).getMessageCount(), 2000, 100);
connection = cf.createConnection();
session = connection.createSession();
MessageConsumer consumer = session.createConsumer(session.createQueue(queue.toString()));
connection.start();
for (int i = 0; i < 20; i++) {
javax.jms.Message message2 = consumer.receiveNoWait();
Assert.assertNull(message2);
}
consumer.close();
}
private Message doSearchRequest(final Session session, final Message message) throws JMSException {
final Destination replyQueue = acquireTempQueue(session);
if (log.isDebugEnabled()) {
log.debug("doSearchRequest replyQueue=" + replyQueue);
}
try {
final MessageConsumer responseConsumer = session.createConsumer(replyQueue);
message.setJMSReplyTo(replyQueue);
final String correlationId = createRandomString();
message.setJMSCorrelationID(correlationId);
final MessageProducer producer = session.createProducer(searchQueue_);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setTimeToLive(timeToLive_);
if (log.isDebugEnabled()) {
log.debug("Sending search request message with correlationId=" + correlationId);
}
producer.send(message);
producer.close();
Message returnedMessage = null;
final long start = System.currentTimeMillis();
while (true) {
final long diff = (start + receiveTimeout_) - System.currentTimeMillis();
if (diff <= 0) {
// timeout
log.info("Timeout in search. Remaining time zero or negative.");
break;
}
if (log.isDebugEnabled()) {
log.debug("doSearchRequest: call receive with timeout=" + diff);
}
returnedMessage = responseConsumer.receive(diff);
if (returnedMessage == null) {
// timeout case, we're stopping now with a reply...
log.info("Timeout in search. Repy was null.");
break;
} else if (!correlationId.equals(returnedMessage.getJMSCorrelationID())) {
// we got an old reply from a previous search request
log.info("Got a response with a wrong correlationId. Ignoring and waiting for the next");
continue;
} else {
// we got a valid reply
break;
}
}
responseConsumer.close();
if (log.isDebugEnabled()) {
log.debug("doSearchRequest: returnedMessage=" + returnedMessage);
}
return returnedMessage;
} finally {
releaseTempQueue(replyQueue);
}
}
@Test
public void testManyExpiredMessagesAtOnce() throws Exception {
Connection conn = getConnectionFactory().createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = session.createProducer(queue1);
prod.setTimeToLive(1);
Message m = session.createTextMessage("This message will die");
final int MESSAGE_COUNT = 100;
for (int i = 0; i < MESSAGE_COUNT; i++) {
prod.send(m);
}
MessageConsumer cons = session.createConsumer(queue1);
conn.start();
ProxyAssertSupport.assertNull(cons.receiveNoWait());
conn.close();
}
@Test
public void testSendTopicNoSubscription() throws Exception {
Topic topic = createTopic("test-topic");
AddressControl control = ManagementControlHelper.createAddressControl(new SimpleString(topic.getTopicName()), mbeanServer);
Connection conn2 = cf.createConnection();
conn2.setClientID("client1");
Session sess2 = conn2.createSession(true, Session.SESSION_TRANSACTED);
sess2.createDurableSubscriber(topic, "client-sub1");
sess2.createDurableSubscriber(topic, "client-sub2");
conn2.close();
conn = cf.createConnection();
Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer prod = sess.createProducer(topic);
prod.setTimeToLive(100);
for (int i = 0; i < 100; i++) {
TextMessage txt = sess.createTextMessage("txt");
prod.send(txt);
}
sess.commit();
conn.close();
Wait.assertEquals(0, control::getMessageCount);
}
@Test
public void testJMSExpirationOnSelector() throws Exception {
Connection conn = null;
try {
conn = getConnectionFactory().createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = session.createProducer(queue1);
TextMessage msg1 = session.createTextMessage("msg1");
prod.send(msg1);
prod.setTimeToLive(100000);
TextMessage msg2 = session.createTextMessage("msg2");
prod.send(msg2);
long expire = msg2.getJMSExpiration();
String selector = "JMSExpiration = " + expire;
MessageConsumer cons = session.createConsumer(queue1, selector);
conn.start();
TextMessage rec = (TextMessage) cons.receive(10000);
assertNotNull(rec);
Assert.assertEquals("msg2", rec.getText());
assertNull(cons.receiveNoWait());
} finally {
if (conn != null) {
conn.close();
}
}
}