下面列出了javax.jms.MessageProducer#setDeliveryDelay ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static MessageProducer createMessageProducer(
Session session,
Destination destination,
MessageProducerOption producerOption) throws JMSException {
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryDelay(producerOption.getDeliveryDelay());
producer.setDeliveryMode(producerOption.getDeliveryMode());
producer.setDisableMessageTimestamp(producerOption.isDisableMessageTimestamp());
producer.setDisableMessageID(producerOption.isDisableMessageId());
producer.setPriority(producerOption.getPriority());
producer.setTimeToLive(producerOption.getTimeToLive());
return producer;
}
@Test
public void testScheduledStats() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(defaultQueueName));
producer.setDeliveryDelay(2000);
producer.send(session.createTextMessage("test"));
verifyPendingStats(defaultQueueName, 1, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 1, publishedMessageSize.get());
verifyScheduledStats(defaultQueueName, 1, publishedMessageSize.get());
consumeTestQueueMessages(1);
verifyPendingStats(defaultQueueName, 0, 0);
verifyPendingDurableStats(defaultQueueName, 0, 0);
verifyScheduledStats(defaultQueueName, 0, 0);
connection.close();
}
/**
* Actually send the given JMS message.
* @param producer the JMS MessageProducer to send with
* @param message the JMS Message to send
* @throws JMSException if thrown by JMS API methods
*/
protected void doSend(MessageProducer producer, Message message) throws JMSException {
if (this.deliveryDelay >= 0) {
producer.setDeliveryDelay(this.deliveryDelay);
}
if (isExplicitQosEnabled()) {
producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
}
else {
producer.send(message);
}
}
/**
* Actually send the given JMS message.
* @param producer the JMS MessageProducer to send with
* @param message the JMS Message to send
* @throws JMSException if thrown by JMS API methods
*/
protected void doSend(MessageProducer producer, Message message) throws JMSException {
if (this.deliveryDelay >= 0) {
producer.setDeliveryDelay(this.deliveryDelay);
}
if (isExplicitQosEnabled()) {
producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
}
else {
producer.send(message);
}
}
private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException {
MessageProducer messageProducer = getMessageProducer();
// Only one thread can use the producer at a time to allow for dynamic configuration
// changes to match what's been configured here.
synchronized (messageProducer) {
long oldDelayValue = 0;
if (deliveryDelay != 0 && session.isJMSVersionSupported(2, 0)) {
oldDelayValue = messageProducer.getDeliveryDelay();
messageProducer.setDeliveryDelay(deliveryDelay);
}
// For the non-shared MessageProducer that is also not an anonymous producer we
// need to call the send method for an explicit MessageProducer otherwise we
// would be violating the JMS specification in regards to send calls.
//
// In all other cases we create an anonymous producer so we call the send with
// destination parameter version.
try {
if (getDelegate().getDestination() != null) {
if (listener == null) {
messageProducer.send(message, deliveryMode, priority, timeToLive);
} else {
messageProducer.send(message, deliveryMode, priority, timeToLive, listener);
}
} else {
if (listener == null) {
messageProducer.send(destination, message, deliveryMode, priority, timeToLive);
} else {
messageProducer.send(destination, message, deliveryMode, priority, timeToLive, listener);
}
}
} finally {
if (deliveryDelay != 0 && session.isJMSVersionSupported(2, 0)) {
messageProducer.setDeliveryDelay(oldDelayValue);
}
}
}
}
@Test(timeout = 10000)
public void testDeliveryDelayConfiguration() throws Exception {
MessageProducer producer = session.createProducer(null);
assertEquals(Message.DEFAULT_DELIVERY_DELAY, producer.getDeliveryDelay());
producer.setDeliveryDelay(2000);
assertEquals(2000, producer.getDeliveryDelay());
}
@Test(timeout = 20000)
public void testSendFailsWhenDelayedDeliveryIsNotSupported() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// DO NOT add capability to indicate server support for DELAYED-DELIVERY
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Matcher<Symbol[]> desiredCapabilitiesMatcher = arrayContaining(new Symbol[] { DELAYED_DELIVERY });
Symbol[] offeredCapabilities = null;
testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
MessageProducer producer = session.createProducer(dest);
producer.setDeliveryDelay(5000);
// Producer should fail to send when message has delivery delay since remote
// did not report that it supports that option.
Message message = session.createMessage();
try {
producer.send(message);
fail("Send should fail");
} catch (JMSException jmsEx) {
LOG.debug("Caught expected error from failed send.");
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testSendWorksWhenDelayedDeliveryIsSupported() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
String topicName = "myTopic";
// add connection capability to indicate server support for DELAYED-DELIVERY
Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ DELAYED_DELIVERY });
connection.start();
testPeer.expectBegin();
int deliveryDelay = 100000;
long currentTime = System.currentTimeMillis();
long deliveryTimeLower = currentTime + deliveryDelay;
long deliveryTimeUpper = deliveryTimeLower + 5000;
// Create matcher to expect the deliverytime annotation to be set to
// a value greater than 'now'+deliveryDelay, within a delta for test execution.
Matcher<Long> inRange = both(greaterThanOrEqualTo(deliveryTimeLower)).and(lessThanOrEqualTo(deliveryTimeUpper));
Matcher<Object> desiredCapabilitiesMatcher = nullValue();
Symbol[] offeredCapabilities = null;
testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
msgAnnotationsMatcher.withEntry(AmqpMessageSupport.JMS_DELIVERY_TIME, inRange);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
testPeer.expectTransfer(messageMatcher);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic dest = session.createTopic(topicName);
// Create a message, [erroneously] set a JMSDeliveryTime value, expect it to be overwritten
Message message = session.createMessage();
assertEquals("JMSDeliveryTime should not yet be set", 0, message.getJMSDeliveryTime());
message.setJMSDeliveryTime(1234);
assertEquals("JMSDeliveryTime should now (erroneously) be set", 1234, message.getJMSDeliveryTime());
MessageProducer producer = session.createProducer(dest);
producer.setDeliveryDelay(deliveryDelay);
// Now send the message, peer will verify the actual delivery time was set as expected
producer.send(message);
testPeer.waitForAllHandlersToComplete(3000);
// Now verify the local message also has the deliveryTime set as expected
MatcherAssert.assertThat("JMSDeliveryTime should now be set in expected range", message.getJMSDeliveryTime(), inRange);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testSendWorksWhenDelayedDeliveryIsSupportedOnlyLinkCapability() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
String topicName = "myTopic";
// DONT add connection capability to indicate support for DELAYED-DELIVERY
Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ });
connection.start();
testPeer.expectBegin();
Matcher<Symbol[]> desiredCapabilitiesMatcher = arrayContaining(new Symbol[] { DELAYED_DELIVERY });
Symbol[] offeredCapabilities = new Symbol[] { DELAYED_DELIVERY };
testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
msgAnnotationsMatcher.withEntry(AmqpMessageSupport.JMS_DELIVERY_TIME, notNullValue());
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
testPeer.expectTransfer(messageMatcher);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic dest = session.createTopic(topicName);
MessageProducer producer = session.createProducer(dest);
producer.setDeliveryDelay(5000);
producer.send(session.createMessage());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
private void doSendingMessageSetsJMSDeliveryTimeTestImpl(boolean deliveryDelay) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// add connection capability to indicate server support for DELAYED-DELIVERY
Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ DELAYED_DELIVERY });
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
int delay = 0;
if (deliveryDelay) {
delay = 123456;
producer.setDeliveryDelay(delay);
}
// Create matcher to expect the DeliveryTime to be set to a value
// representing 'now' [+ delivery-delay], within a upper delta for execution time.
long deliveryTimeLower = System.currentTimeMillis();
long deliveryTimeUpper = deliveryTimeLower + delay + 3000;
Matcher<Long> inRange = both(greaterThanOrEqualTo(deliveryTimeLower)).and(lessThanOrEqualTo(deliveryTimeUpper));
Symbol DELIVERY_TIME = Symbol.valueOf("x-opt-delivery-time");
String text = "myMessage";
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
if (deliveryDelay) {
msgAnnotationsMatcher.withEntry(DELIVERY_TIME, inRange);
}
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
Message message = session.createTextMessage(text);
producer.send(message);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
if (!deliveryDelay) {
assertFalse("Message should not have delivery time annotation",
msgAnnotationsMatcher.keyExistsInReceivedAnnotations(DELIVERY_TIME));
}
assertThat(message.getJMSDeliveryTime(), inRange);
}
}
@Test(timeout = 20000)
public void testSendForeignMessageWithDeliveryDelay() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
String topicName = "myTopic";
// add connection capability to indicate server support for DELAYED-DELIVERY
Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ DELAYED_DELIVERY });
connection.start();
testPeer.expectBegin();
testPeer.expectSenderAttach();
int deliveryDelay = 100000;
long currentTime = System.currentTimeMillis();
long deliveryTimeLower = currentTime + deliveryDelay;
long deliveryTimeUpper = deliveryTimeLower + 5000;
// Create matcher to expect the deliverytime annotation to be set to
// a value greater than 'now'+deliveryDelay, within a delta for test execution.
Matcher<Long> inRange = both(greaterThanOrEqualTo(deliveryTimeLower)).and(lessThanOrEqualTo(deliveryTimeUpper));
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
msgAnnotationsMatcher.withEntry(AmqpMessageSupport.JMS_DELIVERY_TIME, inRange);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
testPeer.expectTransfer(messageMatcher);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic dest = session.createTopic(topicName);
MessageProducer producer = session.createProducer(dest);
producer.setDeliveryDelay(deliveryDelay);
// Create a foreign message, [erroneously] set a JMSDeliveryTime value, expect it to be overwritten
ForeignJmsMessage foreign = new ForeignJmsMessage();
assertEquals("JMSDeliveryTime should not yet be set", 0, foreign.getJMSDeliveryTime());
foreign.setJMSDeliveryTime(1234);
assertEquals("JMSDeliveryTime should now (erroneously) be set", 1234, foreign.getJMSDeliveryTime());
// Now send the message, peer will verify the actual delivery time was set as expected
producer.send(foreign);
testPeer.waitForAllHandlersToComplete(3000);
// Now verify the local message also has the deliveryTime set as expected
MatcherAssert.assertThat("JMSDeliveryTime should now be set in expected range", foreign.getJMSDeliveryTime(), inRange);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testPassthroughOfSendFailsWhenDelayedDeliveryIsNotSupported() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
final String testPeerURI = createPeerURI(testPeer);
LOG.info("Original peer is at: {}", testPeerURI);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
// DO NOT add capability to indicate server support for DELAYED-DELIVERY so that
// send fails and we can see if the error passes through the failover provider
JmsConnection connection = establishAnonymousConnecton(testPeer);
connection.start();
Matcher<Symbol[]> desiredCapabilitiesMatcher = arrayContaining(new Symbol[] { DELAYED_DELIVERY });
Symbol[] offeredCapabilities = null;
testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
MessageProducer producer = session.createProducer(dest);
producer.setDeliveryDelay(5000);
// Producer should fail to send when message has delivery delay since remote
// did not report that it supports that option.
Message message = session.createMessage();
try {
producer.send(message);
fail("Send should fail");
} catch (JMSException jmsEx) {
LOG.debug("Caught expected error from failed send.");
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}