javax.jms.MessageProducer#setDeliveryDelay ( )源码实例Demo

下面列出了javax.jms.MessageProducer#setDeliveryDelay ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

public static MessageProducer createMessageProducer(
    Session session,
    Destination destination,
    MessageProducerOption producerOption) throws JMSException {

  MessageProducer producer = session.createProducer(destination);
  producer.setDeliveryDelay(producerOption.getDeliveryDelay());
  producer.setDeliveryMode(producerOption.getDeliveryMode());
  producer.setDisableMessageTimestamp(producerOption.isDisableMessageTimestamp());
  producer.setDisableMessageID(producerOption.isDisableMessageId());
  producer.setPriority(producerOption.getPriority());
  producer.setTimeToLive(producerOption.getTimeToLive());

  return producer;

}
 
@Test
public void testScheduledStats() throws Exception {
   AtomicLong publishedMessageSize = new AtomicLong();

   Connection connection = cf.createConnection();
   connection.start();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageProducer producer = session.createProducer(session.createQueue(defaultQueueName));
   producer.setDeliveryDelay(2000);
   producer.send(session.createTextMessage("test"));

   verifyPendingStats(defaultQueueName, 1, publishedMessageSize.get());
   verifyPendingDurableStats(defaultQueueName, 1, publishedMessageSize.get());
   verifyScheduledStats(defaultQueueName, 1, publishedMessageSize.get());

   consumeTestQueueMessages(1);

   verifyPendingStats(defaultQueueName, 0, 0);
   verifyPendingDurableStats(defaultQueueName, 0, 0);
   verifyScheduledStats(defaultQueueName, 0, 0);

   connection.close();
}
 
源代码3 项目: spring-analysis-note   文件: JmsTemplate.java
/**
 * Actually send the given JMS message.
 * @param producer the JMS MessageProducer to send with
 * @param message the JMS Message to send
 * @throws JMSException if thrown by JMS API methods
 */
protected void doSend(MessageProducer producer, Message message) throws JMSException {
	if (this.deliveryDelay >= 0) {
		producer.setDeliveryDelay(this.deliveryDelay);
	}
	if (isExplicitQosEnabled()) {
		producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
	}
	else {
		producer.send(message);
	}
}
 
源代码4 项目: java-technology-stack   文件: JmsTemplate.java
/**
 * Actually send the given JMS message.
 * @param producer the JMS MessageProducer to send with
 * @param message the JMS Message to send
 * @throws JMSException if thrown by JMS API methods
 */
protected void doSend(MessageProducer producer, Message message) throws JMSException {
	if (this.deliveryDelay >= 0) {
		producer.setDeliveryDelay(this.deliveryDelay);
	}
	if (isExplicitQosEnabled()) {
		producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
	}
	else {
		producer.send(message);
	}
}
 
源代码5 项目: pooled-jms   文件: JmsPoolMessageProducer.java
private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException {
    MessageProducer messageProducer = getMessageProducer();

    // Only one thread can use the producer at a time to allow for dynamic configuration
    // changes to match what's been configured here.
    synchronized (messageProducer) {

        long oldDelayValue = 0;
        if (deliveryDelay != 0 && session.isJMSVersionSupported(2, 0)) {
            oldDelayValue = messageProducer.getDeliveryDelay();
            messageProducer.setDeliveryDelay(deliveryDelay);
        }

        // For the non-shared MessageProducer that is also not an anonymous producer we
        // need to call the send method for an explicit MessageProducer otherwise we
        // would be violating the JMS specification in regards to send calls.
        //
        // In all other cases we create an anonymous producer so we call the send with
        // destination parameter version.
        try {
            if (getDelegate().getDestination() != null) {
                if (listener == null) {
                    messageProducer.send(message, deliveryMode, priority, timeToLive);
                } else {
                    messageProducer.send(message, deliveryMode, priority, timeToLive, listener);
                }
            } else {
                if (listener == null) {
                    messageProducer.send(destination, message, deliveryMode, priority, timeToLive);
                } else {
                    messageProducer.send(destination, message, deliveryMode, priority, timeToLive, listener);
                }
            }
        } finally {
            if (deliveryDelay != 0 && session.isJMSVersionSupported(2, 0)) {
                messageProducer.setDeliveryDelay(oldDelayValue);
            }
        }
    }
}
 
源代码6 项目: qpid-jms   文件: JmsMessageProducerTest.java
@Test(timeout = 10000)
public void testDeliveryDelayConfiguration() throws Exception {
    MessageProducer producer = session.createProducer(null);
    assertEquals(Message.DEFAULT_DELIVERY_DELAY, producer.getDeliveryDelay());
    producer.setDeliveryDelay(2000);
    assertEquals(2000, producer.getDeliveryDelay());
}
 
源代码7 项目: qpid-jms   文件: ProducerIntegrationTest.java
@Test(timeout = 20000)
public void testSendFailsWhenDelayedDeliveryIsNotSupported() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

        // DO NOT add capability to indicate server support for DELAYED-DELIVERY

        Connection connection = testFixture.establishConnecton(testPeer);

        connection.start();

        testPeer.expectBegin();

        Matcher<Symbol[]> desiredCapabilitiesMatcher = arrayContaining(new Symbol[] { DELAYED_DELIVERY });
        Symbol[] offeredCapabilities = null;
        testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        String topicName = "myTopic";
        Topic dest = session.createTopic(topicName);

        MessageProducer producer = session.createProducer(dest);
        producer.setDeliveryDelay(5000);

        // Producer should fail to send when message has delivery delay since remote
        // did not report that it supports that option.
        Message message = session.createMessage();
        try {
            producer.send(message);
            fail("Send should fail");
        } catch (JMSException jmsEx) {
            LOG.debug("Caught expected error from failed send.");
        }

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
源代码8 项目: qpid-jms   文件: ProducerIntegrationTest.java
@Test(timeout = 20000)
public void testSendWorksWhenDelayedDeliveryIsSupported() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

        String topicName = "myTopic";

        // add connection capability to indicate server support for DELAYED-DELIVERY
        Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ DELAYED_DELIVERY });

        connection.start();

        testPeer.expectBegin();

        int deliveryDelay = 100000;
        long currentTime = System.currentTimeMillis();
        long deliveryTimeLower = currentTime + deliveryDelay;
        long deliveryTimeUpper = deliveryTimeLower + 5000;

        // Create matcher to expect the deliverytime annotation to be set to
        // a value greater than 'now'+deliveryDelay, within a delta for test execution.
        Matcher<Long> inRange = both(greaterThanOrEqualTo(deliveryTimeLower)).and(lessThanOrEqualTo(deliveryTimeUpper));

        Matcher<Object> desiredCapabilitiesMatcher = nullValue();
        Symbol[] offeredCapabilities = null;
        testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);

        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        msgAnnotationsMatcher.withEntry(AmqpMessageSupport.JMS_DELIVERY_TIME, inRange);

        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(headersMatcher);
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);

        testPeer.expectTransfer(messageMatcher);

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Topic dest = session.createTopic(topicName);

        // Create a message, [erroneously] set a JMSDeliveryTime value, expect it to be overwritten
        Message message = session.createMessage();
        assertEquals("JMSDeliveryTime should not yet be set", 0, message.getJMSDeliveryTime());
        message.setJMSDeliveryTime(1234);
        assertEquals("JMSDeliveryTime should now (erroneously) be set", 1234, message.getJMSDeliveryTime());

        MessageProducer producer = session.createProducer(dest);
        producer.setDeliveryDelay(deliveryDelay);

        // Now send the message, peer will verify the actual delivery time was set as expected
        producer.send(message);

        testPeer.waitForAllHandlersToComplete(3000);

        // Now verify the local message also has the deliveryTime set as expected
        MatcherAssert.assertThat("JMSDeliveryTime should now be set in expected range", message.getJMSDeliveryTime(), inRange);

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
源代码9 项目: qpid-jms   文件: ProducerIntegrationTest.java
@Test(timeout = 20000)
public void testSendWorksWhenDelayedDeliveryIsSupportedOnlyLinkCapability() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

        String topicName = "myTopic";

        // DONT add connection capability to indicate support for DELAYED-DELIVERY
        Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ });

        connection.start();

        testPeer.expectBegin();

        Matcher<Symbol[]> desiredCapabilitiesMatcher = arrayContaining(new Symbol[] { DELAYED_DELIVERY });
        Symbol[] offeredCapabilities = new Symbol[] { DELAYED_DELIVERY };
        testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);

        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        msgAnnotationsMatcher.withEntry(AmqpMessageSupport.JMS_DELIVERY_TIME, notNullValue());

        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(headersMatcher);
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);

        testPeer.expectTransfer(messageMatcher);

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Topic dest = session.createTopic(topicName);

        MessageProducer producer = session.createProducer(dest);
        producer.setDeliveryDelay(5000);
        producer.send(session.createMessage());

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
源代码10 项目: qpid-jms   文件: ProducerIntegrationTest.java
private void doSendingMessageSetsJMSDeliveryTimeTestImpl(boolean deliveryDelay) throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        // add connection capability to indicate server support for DELAYED-DELIVERY
        Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ DELAYED_DELIVERY });

        testPeer.expectBegin();
        testPeer.expectSenderAttach();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        String queueName = "myQueue";
        Queue queue = session.createQueue(queueName);
        MessageProducer producer = session.createProducer(queue);

        int delay = 0;
        if (deliveryDelay) {
            delay = 123456;
            producer.setDeliveryDelay(delay);
        }

        // Create matcher to expect the DeliveryTime to be set to a value
        // representing 'now' [+ delivery-delay], within a upper delta for execution time.
        long deliveryTimeLower = System.currentTimeMillis();
        long deliveryTimeUpper = deliveryTimeLower + delay + 3000;
        Matcher<Long> inRange = both(greaterThanOrEqualTo(deliveryTimeLower)).and(lessThanOrEqualTo(deliveryTimeUpper));
        Symbol DELIVERY_TIME = Symbol.valueOf("x-opt-delivery-time");

        String text = "myMessage";
        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        if (deliveryDelay) {
            msgAnnotationsMatcher.withEntry(DELIVERY_TIME, inRange);
        }
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
        messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
        messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
        testPeer.expectTransfer(messageMatcher);

        Message message = session.createTextMessage(text);

        producer.send(message);

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);

        if (!deliveryDelay) {
            assertFalse("Message should not have delivery time annotation",
                    msgAnnotationsMatcher.keyExistsInReceivedAnnotations(DELIVERY_TIME));
        }

        assertThat(message.getJMSDeliveryTime(), inRange);
    }
}
 
源代码11 项目: qpid-jms   文件: ForeignMessageIntegrationTest.java
@Test(timeout = 20000)
public void testSendForeignMessageWithDeliveryDelay() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

        String topicName = "myTopic";

        // add connection capability to indicate server support for DELAYED-DELIVERY
        Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ DELAYED_DELIVERY });

        connection.start();

        testPeer.expectBegin();
        testPeer.expectSenderAttach();

        int deliveryDelay = 100000;
        long currentTime = System.currentTimeMillis();
        long deliveryTimeLower = currentTime + deliveryDelay;
        long deliveryTimeUpper = deliveryTimeLower + 5000;

        // Create matcher to expect the deliverytime annotation to be set to
        // a value greater than 'now'+deliveryDelay, within a delta for test execution.
        Matcher<Long> inRange = both(greaterThanOrEqualTo(deliveryTimeLower)).and(lessThanOrEqualTo(deliveryTimeUpper));

        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        msgAnnotationsMatcher.withEntry(AmqpMessageSupport.JMS_DELIVERY_TIME, inRange);

        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(headersMatcher);
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);

        testPeer.expectTransfer(messageMatcher);

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Topic dest = session.createTopic(topicName);

        MessageProducer producer = session.createProducer(dest);
        producer.setDeliveryDelay(deliveryDelay);

        // Create a foreign message, [erroneously] set a JMSDeliveryTime value, expect it to be overwritten
        ForeignJmsMessage foreign = new ForeignJmsMessage();
        assertEquals("JMSDeliveryTime should not yet be set", 0, foreign.getJMSDeliveryTime());
        foreign.setJMSDeliveryTime(1234);
        assertEquals("JMSDeliveryTime should now (erroneously) be set", 1234, foreign.getJMSDeliveryTime());

        // Now send the message, peer will verify the actual delivery time was set as expected
        producer.send(foreign);
        testPeer.waitForAllHandlersToComplete(3000);

        // Now verify the local message also has the deliveryTime set as expected
        MatcherAssert.assertThat("JMSDeliveryTime should now be set in expected range", foreign.getJMSDeliveryTime(), inRange);

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
源代码12 项目: qpid-jms   文件: FailoverIntegrationTest.java
@Test(timeout = 20000)
public void testPassthroughOfSendFailsWhenDelayedDeliveryIsNotSupported() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer()) {

        final String testPeerURI = createPeerURI(testPeer);
        LOG.info("Original peer is at: {}", testPeerURI);

        testPeer.expectSaslAnonymous();
        testPeer.expectOpen();
        testPeer.expectBegin();
        testPeer.expectBegin();

        // DO NOT add capability to indicate server support for DELAYED-DELIVERY so that
        // send fails and we can see if the error passes through the failover provider
        JmsConnection connection = establishAnonymousConnecton(testPeer);
        connection.start();

        Matcher<Symbol[]> desiredCapabilitiesMatcher = arrayContaining(new Symbol[] { DELAYED_DELIVERY });
        Symbol[] offeredCapabilities = null;
        testPeer.expectSenderAttach(notNullValue(), notNullValue(), false, false, false, false, 0, 1, null, null, desiredCapabilitiesMatcher, offeredCapabilities);

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        String topicName = "myTopic";
        Topic dest = session.createTopic(topicName);

        MessageProducer producer = session.createProducer(dest);
        producer.setDeliveryDelay(5000);

        // Producer should fail to send when message has delivery delay since remote
        // did not report that it supports that option.
        Message message = session.createMessage();
        try {
            producer.send(message);
            fail("Send should fail");
        } catch (JMSException jmsEx) {
            LOG.debug("Caught expected error from failed send.");
        }

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}