javax.jms.Message#setJMSReplyTo ( )源码实例Demo

下面列出了javax.jms.Message#setJMSReplyTo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

/**
 * Actually execute the given request, sending the invoker request message
 * to the specified target queue and waiting for a corresponding response.
 * <p>The default implementation is based on standard JMS send/receive,
 * using a {@link javax.jms.TemporaryQueue} for receiving the response.
 * @param session the JMS Session to use
 * @param queue the resolved target Queue to send to
 * @param requestMessage the JMS Message to send
 * @return the RemoteInvocationResult object
 * @throws JMSException in case of JMS failure
 */
@Nullable
protected Message doExecuteRequest(Session session, Queue queue, Message requestMessage) throws JMSException {
	TemporaryQueue responseQueue = null;
	MessageProducer producer = null;
	MessageConsumer consumer = null;
	try {
		responseQueue = session.createTemporaryQueue();
		producer = session.createProducer(queue);
		consumer = session.createConsumer(responseQueue);
		requestMessage.setJMSReplyTo(responseQueue);
		producer.send(requestMessage);
		long timeout = getReceiveTimeout();
		return (timeout > 0 ? consumer.receive(timeout) : consumer.receive());
	}
	finally {
		JmsUtils.closeMessageConsumer(consumer);
		JmsUtils.closeMessageProducer(producer);
		if (responseQueue != null) {
			responseQueue.delete();
		}
	}
}
 
源代码2 项目: activemq-artemis   文件: MessageHeaderTest.java
/**
 * Test that if the JMS ReplyTo header field has been set as a <code>TemporaryQueue</code>,
 * it will be rightly get also as a <code>TemporaryQueue</code>
 * (and not only as a <code>Queue</code>).
 */
@Test
public void testJMSReplyTo_2() {
   try {
      TemporaryQueue tempQueue = senderSession.createTemporaryQueue();
      Message message = senderSession.createMessage();
      message.setJMSReplyTo(tempQueue);
      sender.send(message);

      Message msg = receiver.receive(TestConfig.TIMEOUT);
      Destination dest = msg.getJMSReplyTo();
      Assert.assertTrue("JMS ReplyTo header field should be a TemporaryQueue", dest instanceof TemporaryQueue);
      Queue replyTo = (Queue) dest;
      Assert.assertEquals("JMS ReplyTo header field should be equals to the temporary queue", replyTo.getQueueName(), tempQueue.getQueueName());
   } catch (JMSException e) {
      fail(e);
   }
}
 
源代码3 项目: pooled-jms   文件: MockJMSProducer.java
private void doSend(Destination destination, Message message) throws JMSException {

        if (message == null) {
            throw new MessageFormatException("Message must not be null");
        }

        for (Map.Entry<String, Object> entry : messageProperties.entrySet()) {
            message.setObjectProperty(entry.getKey(), entry.getValue());
        }

        if (correlationId != null) {
            message.setJMSCorrelationID(correlationId);
        }
        if (correlationIdBytes != null) {
            message.setJMSCorrelationIDAsBytes(correlationIdBytes);
        }
        if (type != null) {
            message.setJMSType(type);
        }
        if (replyTo != null) {
            message.setJMSReplyTo(replyTo);
        }

        session.send(producer, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryDelay, completionListener);
    }
 
源代码4 项目: hazelcast-simulator   文件: CoordinatorClient.java
private void run() throws JMSException {
    Message message = remoteBroker.session.createMessage();

    if (requestId != null) {
        message.setJMSReplyTo(remoteBroker.replyQueue);
        message.setJMSCorrelationID(requestId);
    }

    message.setStringProperty("source", coordinatorAddress().toString());
    message.setStringProperty("target", target.toString());
    message.setStringProperty("payload", OperationCodec.toJson(op));
    message.setIntProperty("operationType", getOperationType(op).toInt());

    switch (target.getAddressLevel()) {
        case AGENT:
            remoteBroker.agentProducer.send(message);
            break;
        case WORKER:
            remoteBroker.workerProducer.send(message);
            break;
        default:
            throw new RuntimeException("unhandled target:" + target);
    }
}
 
/**
 * Actually execute the given request, sending the invoker request message
 * to the specified target queue and waiting for a corresponding response.
 * <p>The default implementation is based on standard JMS send/receive,
 * using a {@link javax.jms.TemporaryQueue} for receiving the response.
 * @param session the JMS Session to use
 * @param queue the resolved target Queue to send to
 * @param requestMessage the JMS Message to send
 * @return the RemoteInvocationResult object
 * @throws JMSException in case of JMS failure
 */
protected Message doExecuteRequest(Session session, Queue queue, Message requestMessage) throws JMSException {
	TemporaryQueue responseQueue = null;
	MessageProducer producer = null;
	MessageConsumer consumer = null;
	try {
		responseQueue = session.createTemporaryQueue();
		producer = session.createProducer(queue);
		consumer = session.createConsumer(responseQueue);
		requestMessage.setJMSReplyTo(responseQueue);
		producer.send(requestMessage);
		long timeout = getReceiveTimeout();
		return (timeout > 0 ? consumer.receive(timeout) : consumer.receive());
	}
	finally {
		JmsUtils.closeMessageConsumer(consumer);
		JmsUtils.closeMessageProducer(producer);
		if (responseQueue != null) {
			responseQueue.delete();
		}
	}
}
 
public void testBrokerStats() throws Exception {
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   Queue replyTo = session.createTemporaryQueue();
   MessageConsumer consumer = session.createConsumer(replyTo);
   Queue query = session.createQueue(StatisticsBroker.STATS_BROKER_PREFIX);
   MessageProducer producer = session.createProducer(query);
   Message msg = session.createMessage();
   msg.setJMSReplyTo(replyTo);
   producer.send(msg);
   MapMessage reply = (MapMessage) consumer.receive(10 * 1000);
   assertNotNull(reply);
   assertTrue(reply.getMapNames().hasMoreElements());
   assertTrue(reply.getJMSTimestamp() > 0);
   assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());
     /*
     for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
         String name = e.nextElement().toString();
         System.err.println(name+"="+reply.getObject(name));
     }
     */
}
 
@Test
public void testBrowseWithSelector() throws Exception {
   Connection connection = createConnection();

   // Setup the scheduled Message
   scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9));
   scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10));
   scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5));
   scheduleMessage(connection, TimeUnit.SECONDS.toMillis(45));

   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

   // Create the Browse Destination and the Reply To location
   Destination requestBrowse = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
   Destination browseDest = session.createTemporaryTopic();

   // Create the "Browser"
   MessageConsumer browser = session.createConsumer(browseDest, ScheduledMessage.AMQ_SCHEDULED_DELAY + " = 45000");

   connection.start();

   // Send the browse request
   MessageProducer producer = session.createProducer(requestBrowse);
   Message request = session.createMessage();
   request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
   request.setJMSReplyTo(browseDest);
   producer.send(request);

   // Now try and receive the one we selected
   Message message = browser.receive(5000);
   assertNotNull(message);
   assertEquals(45000, message.getLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY));

   // Now check if there are anymore, there shouldn't be
   message = browser.receive(5000);
   assertNull(message);
}
 
源代码8 项目: activemq-artemis   文件: ActiveMQJMSProducer.java
@Override
public JMSProducer send(Destination destination, Message message) {
   if (message == null) {
      throw new MessageFormatRuntimeException("null message");
   }

   try {
      if (jmsHeaderCorrelationID != null) {
         message.setJMSCorrelationID(jmsHeaderCorrelationID);
      }
      if (jmsHeaderCorrelationIDAsBytes != null && jmsHeaderCorrelationIDAsBytes.length > 0) {
         message.setJMSCorrelationIDAsBytes(jmsHeaderCorrelationIDAsBytes);
      }
      if (jmsHeaderReplyTo != null) {
         message.setJMSReplyTo(jmsHeaderReplyTo);
      }
      if (jmsHeaderType != null) {
         message.setJMSType(jmsHeaderType);
      }
      // XXX HORNETQ-1209 "JMS 2.0" can this be a foreign msg?
      // if so, then "SimpleString" properties will trigger an error.
      setProperties(message);
      if (completionListener != null) {
         CompletionListener wrapped = new CompletionListenerWrapper(completionListener);
         producer.send(destination, message, wrapped);
      } else {
         producer.send(destination, message);
      }
   } catch (JMSException e) {
      throw JmsExceptionUtils.convertToRuntimeException(e);
   }
   return this;
}
 
源代码9 项目: activemq-artemis   文件: MessageTestBase.java
protected void prepareMessage(final Message m) throws JMSException {
   m.setBooleanProperty("booleanProperty", true);
   m.setByteProperty("byteProperty", (byte) 3);
   m.setDoubleProperty("doubleProperty", 4.0);
   m.setFloatProperty("floatProperty", 5.0f);
   m.setIntProperty("intProperty", 6);
   m.setLongProperty("longProperty", 7);
   m.setShortProperty("shortProperty", (short) 8);
   m.setStringProperty("stringProperty", "this is a String property");

   m.setJMSCorrelationID("this is the correlation ID");
   m.setJMSReplyTo(ActiveMQServerTestCase.topic1);
   m.setJMSType("someArbitraryType");
}
 
源代码10 项目: activemq-artemis   文件: JMSReplyToHeaderTest.java
@Test
public void testJMSDestinationNull() throws Exception {
   Message m = queueProducerSession.createMessage();
   m.setJMSReplyTo(null);

   queueProducer.send(m);
   queueConsumer.receive();
   ProxyAssertSupport.assertNull(m.getJMSReplyTo());
}
 
源代码11 项目: activemq-artemis   文件: JMSUtil.java
public static Message sendMessageWithReplyTo(final Session session,
                                             final Destination destination,
                                             final String replyTo) throws JMSException {
   MessageProducer producer = session.createProducer(destination);
   Message message = session.createMessage();
   message.setJMSReplyTo(ActiveMQJMSClient.createQueue(replyTo));
   producer.send(message);
   return message;
}
 
源代码12 项目: nifi   文件: PutJMS.java
private Message createMessage(final Session jmsSession, final ProcessContext context, final byte[] messageContent,
        final FlowFile flowFile, final Destination replyToQueue, final Integer priority) throws JMSException {
    final Message message;

    switch (context.getProperty(MESSAGE_TYPE).getValue()) {
        case MSG_TYPE_EMPTY: {
            message = jmsSession.createTextMessage("");
            break;
        }
        case MSG_TYPE_STREAM: {
            final StreamMessage streamMessage = jmsSession.createStreamMessage();
            streamMessage.writeBytes(messageContent);
            message = streamMessage;
            break;
        }
        case MSG_TYPE_TEXT: {
            message = jmsSession.createTextMessage(new String(messageContent, UTF8));
            break;
        }
        case MSG_TYPE_MAP: {
            message = jmsSession.createMapMessage();
            break;
        }
        case MSG_TYPE_BYTE:
        default: {
            final BytesMessage bytesMessage = jmsSession.createBytesMessage();
            bytesMessage.writeBytes(messageContent);
            message = bytesMessage;
        }
    }

    message.setJMSTimestamp(System.currentTimeMillis());

    if (replyToQueue != null) {
        message.setJMSReplyTo(replyToQueue);
    }

    if (priority != null) {
        message.setJMSPriority(priority);
    }

    if (context.getProperty(ATTRIBUTES_TO_JMS_PROPS).asBoolean()) {
        copyAttributesToJmsProps(flowFile, message);
    }

    return message;
}
 
源代码13 项目: localization_nifi   文件: PutJMS.java
private Message createMessage(final Session jmsSession, final ProcessContext context, final byte[] messageContent,
        final FlowFile flowFile, final Destination replyToQueue, final Integer priority) throws JMSException {
    final Message message;

    switch (context.getProperty(MESSAGE_TYPE).getValue()) {
        case MSG_TYPE_EMPTY: {
            message = jmsSession.createTextMessage("");
            break;
        }
        case MSG_TYPE_STREAM: {
            final StreamMessage streamMessage = jmsSession.createStreamMessage();
            streamMessage.writeBytes(messageContent);
            message = streamMessage;
            break;
        }
        case MSG_TYPE_TEXT: {
            message = jmsSession.createTextMessage(new String(messageContent, UTF8));
            break;
        }
        case MSG_TYPE_MAP: {
            message = jmsSession.createMapMessage();
            break;
        }
        case MSG_TYPE_BYTE:
        default: {
            final BytesMessage bytesMessage = jmsSession.createBytesMessage();
            bytesMessage.writeBytes(messageContent);
            message = bytesMessage;
        }
    }

    message.setJMSTimestamp(System.currentTimeMillis());

    if (replyToQueue != null) {
        message.setJMSReplyTo(replyToQueue);
    }

    if (priority != null) {
        message.setJMSPriority(priority);
    }

    if (context.getProperty(ATTRIBUTES_TO_JMS_PROPS).asBoolean()) {
        copyAttributesToJmsProps(flowFile, message);
    }

    return message;
}
 
源代码14 项目: qpid-jms   文件: MessageIntegrationTest.java
private void doSendMessageOnConnectionWithPrefixTestImpl(Class<? extends Destination> destType,
                                                         String destPrefix,
                                                         String destName,
                                                         String destAddress,
                                                         Byte destTypeAnnotationValue) throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        Connection connection = null;
        if (destType == Topic.class) {
            connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix);
        } else if (destType == Queue.class) {
            connection = testFixture.establishConnecton(testPeer, "?jms.queuePrefix=" + destPrefix);
        } else {
            // Set both the non-temporary prefixes, we wont use non-temp dests but want to ensure they don't affect anything
            connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix + "&jms.queuePrefix=" + destPrefix);
        }

        connection.start();

        // Set the prefix if Topic or Queue dest type.
        if (destType == Topic.class) {
            ((JmsConnection) connection).setTopicPrefix(destPrefix);
        } else if (destType == Queue.class) {
            ((JmsConnection) connection).setQueuePrefix(destPrefix);
        }

        testPeer.expectBegin();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create the destination
        Destination dest = null;
        if (destType == Topic.class) {
            dest = session.createTopic(destName);
        } else if (destType == Queue.class) {
            dest = session.createQueue(destName);
        } else if (destType == TemporaryTopic.class) {
            testPeer.expectTempTopicCreationAttach(destAddress);
            dest = session.createTemporaryTopic();
        } else if (destType == TemporaryQueue.class) {
            testPeer.expectTempQueueCreationAttach(destAddress);
            dest = session.createTemporaryQueue();
        }

        TargetMatcher targetMatcher = new TargetMatcher();
        targetMatcher.withAddress(equalTo(destAddress));

        testPeer.expectSenderAttach(targetMatcher, false, false);

        MessageProducer producer = session.createProducer(dest);

        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL.toString()), equalTo(destTypeAnnotationValue));
        msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL.toString()), equalTo(destTypeAnnotationValue));
        MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
        propsMatcher.withTo(equalTo(destAddress));
        propsMatcher.withReplyTo(equalTo(destAddress));

        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(headersMatcher);
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
        messageMatcher.setPropertiesMatcher(propsMatcher);

        //TODO: currently we aren't sending any body section, decide if this is allowed
        //messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
        testPeer.expectTransfer(messageMatcher);

        Message message = session.createMessage();
        message.setJMSReplyTo(dest);

        producer.send(message);

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(2000);
    }
}
 
源代码15 项目: dubbox   文件: JmsQueueRequestor.java
public Message request(Message message) throws JMSException {
	message.setJMSReplyTo(getTemporaryQueue());
	getSender().send(message);
	return getReceiver().receive();
}
 
源代码16 项目: dubbox   文件: JmsQueueRequestor.java
public Message request(Message message, long timeout) throws JMSException {
	message.setJMSReplyTo(getTemporaryQueue());
	getSender().send(message);
	return getReceiver().receive(timeout);
}
 
源代码17 项目: qpid-jms   文件: MessageIntegrationTest.java
/**
 * Tests that the {@link AmqpDestinationHelper#JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL} is set as a byte on
 * a sent message to indicate its 'reply-to' address represents a Topic JMSDestination.
 *
 * @throws Exception if an error occurs during the test.
 */
@Test(timeout = 20000)
public void testSentMessageContainsReplyToTypeAnnotationByte() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        Connection connection = testFixture.establishConnecton(testPeer);

        testPeer.expectBegin();
        testPeer.expectSenderAttach();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        String queueName = "myQueue";
        String replyTopicName = "myReplyTopic";

        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        Symbol annotationKey = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL;
        msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_TYPE));

        MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withReplyTo(equalTo(replyTopicName));

        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(headersMatcher);
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
        messageMatcher.setPropertiesMatcher(propsMatcher);

        testPeer.expectTransfer(messageMatcher);

        Topic replyTopic = session.createTopic(replyTopicName);
        Message message = session.createMessage();
        message.setJMSReplyTo(replyTopic);

        Queue queue = session.createQueue(queueName);
        MessageProducer producer = session.createProducer(queue);
        producer.send(message);

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(2000);
    }
}
 
源代码18 项目: olat   文件: SearchClientJMSProxy.java
private Message doSearchRequest(final Session session, final Message message) throws JMSException {
    final Destination replyQueue = acquireTempQueue(session);
    if (log.isDebugEnabled()) {
        log.debug("doSearchRequest replyQueue=" + replyQueue);
    }
    try {
        final MessageConsumer responseConsumer = session.createConsumer(replyQueue);

        message.setJMSReplyTo(replyQueue);
        final String correlationId = createRandomString();
        message.setJMSCorrelationID(correlationId);

        final MessageProducer producer = session.createProducer(searchQueue_);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        producer.setTimeToLive(timeToLive_);
        if (log.isDebugEnabled()) {
            log.debug("Sending search request message with correlationId=" + correlationId);
        }
        producer.send(message);
        producer.close();

        Message returnedMessage = null;
        final long start = System.currentTimeMillis();
        while (true) {
            final long diff = (start + receiveTimeout_) - System.currentTimeMillis();
            if (diff <= 0) {
                // timeout
                log.info("Timeout in search. Remaining time zero or negative.");
                break;
            }
            if (log.isDebugEnabled()) {
                log.debug("doSearchRequest: call receive with timeout=" + diff);
            }
            returnedMessage = responseConsumer.receive(diff);
            if (returnedMessage == null) {
                // timeout case, we're stopping now with a reply...
                log.info("Timeout in search. Reply was null.");
                break;
            } else if (!correlationId.equals(returnedMessage.getJMSCorrelationID())) {
                // we got an old reply from a previous search request
                log.info("Got a response with a wrong correlationId. Ignoring and waiting for the next");
                continue;
            } else {
                // we got a valid reply
                break;
            }
        }
        responseConsumer.close();
        if (log.isDebugEnabled()) {
            log.debug("doSearchRequest: returnedMessage=" + returnedMessage);
        }
        return returnedMessage;
    } finally {
        releaseTempQueue(replyQueue);
    }
}
 
源代码19 项目: qpid-broker-j   文件: MessagingACLTest.java
@Test
public void testRequestResponseSuccess() throws Exception
{
    String queueName = getTestName();
    Queue queue = createQueue(queueName);
    String groupName = "messaging-users";
    createGroupProvider(groupName, USER1, USER2);

    configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", groupName),
                 String.format("ACL ALLOW-LOG %s CONSUME QUEUE name=\"%s\"", USER1, queueName),
                 String.format("ACL ALLOW-LOG %s CONSUME QUEUE temporary=true", USER2),
                 String.format("ACL ALLOW-LOG %s CREATE QUEUE temporary=true", USER2),
                 isLegacyClient() ?
                         String.format("ACL ALLOW-LOG %s BIND EXCHANGE name=\"amq.direct\" temporary=true", USER2) :
                         String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"\" routingKey=\"TempQueue*\"", USER1),
                 isLegacyClient() ?
                         String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"amq.direct\" routingKey=\"%s\"", USER2, queueName) :
                         String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"\" routingKey=\"%s\"", USER2, queueName),
                 isLegacyClient() ? String.format("ACL ALLOW-LOG %s CREATE QUEUE name=\"%s\"", USER1, queueName) : "",
                 isLegacyClient() ? String.format("ACL ALLOW-LOG %s BIND EXCHANGE", USER1) : "",
                 isLegacyClient() ? String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"amq.direct\" routingKey=\"TempQueue*\"", USER1) : ""
                 );

    Connection responderConnection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
    try
    {
        Session responderSession = responderConnection.createSession(true, Session.SESSION_TRANSACTED);
        MessageConsumer requestConsumer = responderSession.createConsumer(queue);
        responderConnection.start();

        Connection requesterConnection = getConnectionBuilder().setUsername(USER2).setPassword(USER2_PASSWORD).build();
        try
        {
            Session requesterSession = requesterConnection.createSession(true, Session.SESSION_TRANSACTED);
            Queue responseQueue = requesterSession.createTemporaryQueue();
            MessageConsumer responseConsumer = requesterSession.createConsumer(responseQueue);
            requesterConnection.start();

            Message request = requesterSession.createTextMessage("Request");
            request.setJMSReplyTo(responseQueue);

            requesterSession.createProducer(queue).send(request);
            requesterSession.commit();

            Message receivedRequest = requestConsumer.receive(getReceiveTimeout());
            assertNotNull("Request is not received", receivedRequest);
            assertNotNull("Request should have Reply-To", receivedRequest.getJMSReplyTo());

            MessageProducer responder = responderSession.createProducer(receivedRequest.getJMSReplyTo());
            responder.send(responderSession.createTextMessage("Response"));
            responderSession.commit();

            Message receivedResponse = responseConsumer.receive(getReceiveTimeout());
            requesterSession.commit();
            assertNotNull("Response is not received", receivedResponse);
            assertEquals("Unexpected response is received", "Response", ((TextMessage) receivedResponse).getText());
        }
        finally
        {
             requesterConnection.close();
        }
    }
    finally
    {
        responderConnection.close();
    }

}
 
/**
 * So we network three brokers together, and send a message with request-reply semantics.
 * The message goes to an echo service listening on broker C. We send a message on a queue
 * to broker A which gets demand forwarded to broker C. the echo service will respond to the
 * temp destination listed in the JMSReplyTo header. that will get demand forwarded back to
 * broker A. When the consumer of the temp dest on broker A closes, that subscription should
 * be removed on broker A. advisories firing from broker A to broker B should remove that
 * subscription on broker B. advisories firing from broker B to broker C should remove that
 * subscription on broker C.
 *
 * @throws Exception
 */
public void testSubscriptionsCleanedUpRace() throws Exception {

   final BrokerItem brokerA = brokers.get(BROKER_A);

   Runnable tester = new Runnable() {

      @Override
      public void run() {
         for (int i = 0; i < NUM_ITER; i++) {

            Connection conn = null;
            try {
               conn = brokerA.createConnection();

               conn.start();

               final Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
               Destination destination = sess.createQueue(ECHO_QUEUE_NAME);

               MessageProducer producer = sess.createProducer(destination);

               LOG.info("Starting iter: " + i);
               Destination replyTo = sess.createTemporaryQueue();
               MessageConsumer responseConsumer = sess.createConsumer(replyTo);

               Message message = sess.createTextMessage("Iteration: " + i);
               message.setJMSReplyTo(replyTo);

               producer.send(message);

               TextMessage response = (TextMessage) responseConsumer.receive(CONSUME_TIMEOUT);
               assertNotNull("We should have gotten a response, but didn't for iter: " + i, response);
               assertEquals("We got the wrong response from the echo service", "Iteration: " + i, response.getText());

               // so we close the consumer so that an actual RemoveInfo command gets propagated through the
               // network
               responseConsumer.close();
               conn.close();

            } catch (Exception e) {
               e.printStackTrace();
               fail();
            }

         }
      }
   };

   ExecutorService threadService = Executors.newFixedThreadPool(2);
   threadService.submit(tester);
   threadService.submit(tester);

   threadService.shutdown();
   assertTrue("executor done on time", threadService.awaitTermination(30L, TimeUnit.SECONDS));

   // for the real test... we should not have any subscriptions left on broker C for the temp dests
   BrokerItem brokerC = brokers.get(BROKER_C);
   RegionBroker regionBroker = (RegionBroker) brokerC.broker.getRegionBroker();
   final AbstractRegion region = (AbstractRegion) regionBroker.getTempQueueRegion();

   assertTrue("There were no lingering temp-queue destinations", Wait.waitFor(new Wait.Condition() {
      @Override
      public boolean isSatisified() throws Exception {
         LOG.info("Lingering temps: " + region.getSubscriptions().size());
         return 0 == region.getSubscriptions().size();
      }
   }));
}