javax.jms.Message#getJMSReplyTo ( )源码实例Demo

下面列出了javax.jms.Message#getJMSReplyTo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: activemq-artemis   文件: RequestReplyExample.java
@Override
public void onMessage(final Message request) {
   try {
      System.out.println("Received request message: " + ((TextMessage) request).getText());

      // Extract the ReplyTo destination
      Destination replyDestination = request.getJMSReplyTo();

      System.out.println("Reply to queue: " + replyDestination);

      // Create the reply message
      TextMessage replyMessage = session.createTextMessage("A reply message");

      // Set the CorrelationID, using message id.
      replyMessage.setJMSCorrelationID(request.getJMSMessageID());

      // Send out the reply message
      replyProducer.send(replyDestination, replyMessage);

      System.out.println("Reply sent");
   } catch (JMSException e) {
      e.printStackTrace();
   }
}
 
源代码2 项目: activemq-artemis   文件: TextReverserService.java
@Override
public void onMessage(final Message request) {
   TextMessage textMessage = (TextMessage) request;
   try {
      // retrieve the request's text
      String text = textMessage.getText();
      // create a reply containing the reversed text
      TextMessage reply = session.createTextMessage(TextReverserService.reverse(text));

      // retrieve the destination to reply to
      Destination replyTo = request.getJMSReplyTo();
      // create a producer to send the reply
      try (MessageProducer producer = session.createProducer(replyTo)) {
         // send the reply
         producer.send(reply);
      }
   } catch (JMSException e) {
      e.printStackTrace();
   }
}
 
源代码3 项目: activemq-artemis   文件: MessageHeaderTest.java
/**
 * Test that a <code>Destination</code> set by the <code>setJMSReplyTo()</code>
 * method on a sended message corresponds to the <code>Destination</code> get by
 * the </code>getJMSReplyTo()</code> method.
 */
@Test
public void testJMSReplyTo_1() {
   try {
      Message message = senderSession.createMessage();
      message.setJMSReplyTo(senderQueue);
      sender.send(message);

      Message msg = receiver.receive(TestConfig.TIMEOUT);
      Destination dest = msg.getJMSReplyTo();
      Assert.assertTrue("JMS ReplyTo header field should be a Queue", dest instanceof Queue);
      Queue replyTo = (Queue) dest;
      Assert.assertEquals("JMS ReplyTo header field should be equals to the sender queue", replyTo.getQueueName(), senderQueue.getQueueName());
   } catch (JMSException e) {
      fail(e);
   }
}
 
源代码4 项目: activemq-artemis   文件: MessageHeaderTest.java
/**
 * Test that if the JMS ReplyTo header field has been set as a <code>TemporaryQueue</code>,
 * it will be rightly get also as a <code>TemporaryQueue</code>
 * (and not only as a <code>Queue</code>).
 */
@Test
public void testJMSReplyTo_2() {
   try {
      TemporaryQueue tempQueue = senderSession.createTemporaryQueue();
      Message message = senderSession.createMessage();
      message.setJMSReplyTo(tempQueue);
      sender.send(message);

      Message msg = receiver.receive(TestConfig.TIMEOUT);
      Destination dest = msg.getJMSReplyTo();
      Assert.assertTrue("JMS ReplyTo header field should be a TemporaryQueue", dest instanceof TemporaryQueue);
      Queue replyTo = (Queue) dest;
      Assert.assertEquals("JMS ReplyTo header field should be equals to the temporary queue", replyTo.getQueueName(), tempQueue.getQueueName());
   } catch (JMSException e) {
      fail(e);
   }
}
 
源代码5 项目: hazelcast-simulator   文件: Server.java
private void handle() throws Exception {
    Message message = consumer.receive();

    OperationType operationType = OperationType.fromInt(message.getIntProperty("operationType"));
    String operationData = message.getStringProperty("payload");
    SimulatorOperation op = OperationCodec.fromJson(operationData, operationType.getClassType());
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("Received operation:" + op);
    }
    PromiseImpl promise = new PromiseImpl();
    promise.replyTo = message.getJMSReplyTo();
    promise.correlationId = message.getJMSCorrelationID();
    promise.op = op;

    SimulatorAddress source = SimulatorAddress.fromString(message.getStringProperty("source"));

    try {
        processor.process(op, source, promise);
    } catch (Exception e) {
        if (stop) {
            throw e;
        } else {
            LOGGER.warn(e.getMessage(), e);
            promise.answer(e);
        }
    }
}
 
源代码6 项目: qpid-broker-j   文件: Client.java
private ClientMessage receiveMessage(final Session session,
                                     final Destination queue,
                                     final MessageDescription messageDescription) throws Exception
{
    final Message message;
    MessageConsumer consumer = session.createConsumer(queue);
    try
    {
        message = consumer.receive(RECEIVE_TIMEOUT);
        System.out.println(String.format("Received message: %s", message));
        MessageVerifier.verifyMessage(messageDescription, message);
    }
    finally
    {
        consumer.close();
    }

    if (message != null && message.getJMSReplyTo() != null)
    {
        System.out.println(String.format("Received message had replyTo: %s", message.getJMSReplyTo()));
        sendReply(session,
                  message.getJMSReplyTo(),
                  messageDescription.getHeader(MessageDescription.MessageHeader.CORRELATION_ID));
    }

    return buildClientMessage(message);
}
 
源代码7 项目: cxf   文件: JMSMessageHeadersType.java
private String getDestName(Message message) throws JMSException {
    Destination replyTo = message.getJMSReplyTo();
    if (replyTo instanceof Queue) {
        return ((Queue)replyTo).getQueueName();
    } else if (replyTo instanceof Topic) {
        return ((Topic)replyTo).getTopicName();
    }
    return null;
}
 
/**
 * If getJMSReplyTo is set then send message back to reply producer.
 *
 * @param message
 */
protected void sendReply(Message message)
{
  try {
    if (message.getJMSReplyTo() != null) { // Send reply only if the replyTo destination is set
      replyProducer.send(message.getJMSReplyTo(),
          getSession().createTextMessage("Reply: " + message.getJMSMessageID()));
    }
  } catch (JMSException ex) {
    LOG.error(ex.getLocalizedMessage());
    throwable.set(ex);
    throw new RuntimeException(ex);
  }
}
 
源代码9 项目: activemq-artemis   文件: NewQueueRequestorTest.java
@Override
public void onMessage(final Message m) {
   try {
      Destination queue = m.getJMSReplyTo();
      Message m2 = sess.createTextMessage("This is the response");
      sender.send(queue, m2);
   } catch (JMSException e) {
      instanceLog.error(e);
   }
}
 
源代码10 项目: qpid-jms   文件: MessageIntegrationTest.java
/**
 * Tests that lack of any destination type annotation value (via either
 * {@link AmqpDestinationHelper#JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL}
 * or {@link AmqpMessageSupport#LEGACY_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL}) set
 * on a message to indicate type of its 'reply-to' address results in it
 * being classed as the same type as the consumer destination.
 *
 * @throws Exception if an error occurs during the test.
 */
@Test(timeout = 20000)
public void testReceivedMessageFromTopicWithReplyToWithoutTypeAnnotationResultsInUseOfConsumerDestinationType() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        Connection connection = testFixture.establishConnecton(testPeer);
        connection.start();

        testPeer.expectBegin();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("myTopic");

        String myReplyTopicAddress = "myReplyTopicAddress";
        PropertiesDescribedType props = new PropertiesDescribedType();
        props.setReplyTo(myReplyTopicAddress);
        props.setMessageId("myMessageIDString");

        DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);

        testPeer.expectReceiverAttach();
        testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
        testPeer.expectDispositionThatIsAcceptedAndSettled();

        MessageConsumer messageConsumer = session.createConsumer(topic);
        Message receivedMessage = messageConsumer.receive(3000);
        testPeer.waitForAllHandlersToComplete(3000);

        assertNotNull(receivedMessage);

        Destination dest = receivedMessage.getJMSReplyTo();
        assertNotNull("JMSReplyTo should not be null", dest);
        assertTrue("Destination not of expected type: " + dest.getClass(), dest instanceof Topic);
        assertEquals(myReplyTopicAddress, ((Topic)dest).getTopicName());

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
源代码11 项目: activemq-artemis   文件: TopicBridgeSpringTest.java
@Override
public void onMessage(Message msg) {
   try {
      TextMessage textMsg = (TextMessage) msg;
      String payload = "REPLY: " + textMsg.getText();
      Destination replyTo;
      replyTo = msg.getJMSReplyTo();
      textMsg.clearBody();
      textMsg.setText(payload);
      LOG.info("Sending response: " + textMsg);
      requestServerProducer.send(replyTo, textMsg);
   } catch (JMSException e) {
      e.printStackTrace();
   }
}
 
public EchoRequestProcessor(Session sess, Message req) throws Exception {
   this.session = sess;
   this.request = req;

   this.resp_dest = req.getJMSReplyTo();

   if (resp_dest == null) {
      throw new Exception("invalid request: no reply-to destination given");
   }

   this.msg_prod = session.createProducer(this.resp_dest);
}
 
源代码13 项目: nifi   文件: JmsFactory.java
public static Map<String, String> createAttributeMap(final Message message) throws JMSException {
    final Map<String, String> attributes = new HashMap<>();

    final Enumeration<?> enumeration = message.getPropertyNames();
    while (enumeration.hasMoreElements()) {
        final String propName = (String) enumeration.nextElement();

        final Object value = message.getObjectProperty(propName);

        if (value == null) {
            attributes.put(ATTRIBUTE_PREFIX + propName, "");
            attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, "Unknown");
            continue;
        }

        final String valueString = value.toString();
        attributes.put(ATTRIBUTE_PREFIX + propName, valueString);

        final String propType;
        if (value instanceof String) {
            propType = PROP_TYPE_STRING;
        } else if (value instanceof Double) {
            propType = PROP_TYPE_DOUBLE;
        } else if (value instanceof Float) {
            propType = PROP_TYPE_FLOAT;
        } else if (value instanceof Long) {
            propType = PROP_TYPE_LONG;
        } else if (value instanceof Integer) {
            propType = PROP_TYPE_INTEGER;
        } else if (value instanceof Short) {
            propType = PROP_TYPE_SHORT;
        } else if (value instanceof Byte) {
            propType = PROP_TYPE_BYTE;
        } else if (value instanceof Boolean) {
            propType = PROP_TYPE_BOOLEAN;
        } else {
            propType = PROP_TYPE_OBJECT;
        }

        attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, propType);
    }

    if (message.getJMSCorrelationID() != null) {
        attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
    }
    if (message.getJMSDestination() != null) {
        String destinationName;
        if (message.getJMSDestination() instanceof Queue) {
            destinationName = ((Queue) message.getJMSDestination()).getQueueName();
        } else {
            destinationName = ((Topic) message.getJMSDestination()).getTopicName();
        }
        attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName);
    }
    if (message.getJMSMessageID() != null) {
        attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());
    }
    if (message.getJMSReplyTo() != null) {
        attributes.put(ATTRIBUTE_PREFIX + JMS_REPLY_TO, message.getJMSReplyTo().toString());
    }
    if (message.getJMSType() != null) {
        attributes.put(ATTRIBUTE_PREFIX + JMS_TYPE, message.getJMSType());
    }

    attributes.put(ATTRIBUTE_PREFIX + JMS_DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode()));
    attributes.put(ATTRIBUTE_PREFIX + JMS_EXPIRATION, String.valueOf(message.getJMSExpiration()));
    attributes.put(ATTRIBUTE_PREFIX + JMS_PRIORITY, String.valueOf(message.getJMSPriority()));
    attributes.put(ATTRIBUTE_PREFIX + JMS_REDELIVERED, String.valueOf(message.getJMSRedelivered()));
    attributes.put(ATTRIBUTE_PREFIX + JMS_TIMESTAMP, String.valueOf(message.getJMSTimestamp()));
    return attributes;
}
 
源代码14 项目: qpid-broker-j   文件: MessageVerifier.java
private static void verifyMessageHeaders(final MessageDescription messageDescription,
                                         final Message message) throws VerificationException
{
    try
    {
        for (Map.Entry<MessageDescription.MessageHeader, Serializable> entry : messageDescription.getHeaders()
                                                                                                 .entrySet())
        {
            Object actualValue;

            switch (entry.getKey())
            {
                case DESTINATION:
                    actualValue = message.getJMSDestination();
                    break;
                case DELIVERY_MODE:
                    actualValue = message.getJMSDeliveryMode();
                    break;
                case MESSAGE_ID:
                    actualValue = message.getJMSMessageID();
                    break;
                case TIMESTAMP:
                    actualValue = message.getJMSTimestamp();
                    break;
                case CORRELATION_ID:
                    if (entry.getValue() instanceof byte[])
                    {
                        actualValue = message.getJMSCorrelationIDAsBytes();
                    }
                    else
                    {
                        actualValue = message.getJMSCorrelationID();
                    }
                    break;
                case REPLY_TO:
                    actualValue = message.getJMSReplyTo();
                    break;
                case REDELIVERED:
                    actualValue = message.getJMSRedelivered();
                    break;
                case TYPE:
                    actualValue = message.getJMSType();
                    break;
                case EXPIRATION:
                    actualValue = message.getJMSExpiration();
                    break;
                case PRIORITY:
                    actualValue = message.getJMSPriority();
                    break;
                default:
                    throw new RuntimeException(String.format("unexpected message header '%s'", entry.getKey()));
            }

            verifyEquals(String.format("Unexpected message header '%s'", entry.getKey()),
                         entry.getValue(),
                         actualValue);
        }
    }
    catch (JMSException e)
    {
        throw new RuntimeException("Unexpected exception during message header verification", e);
    }
}
 
源代码15 项目: activemq-artemis   文件: RedeployTempTest.java
@Test
public void testRedeployAddressQueueOpenWire() throws Exception {
   Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
   URL url1 = RedeployTest.class.getClassLoader().getResource("RedeployTempTest-reload-temp.xml");
   URL url2 = RedeployTest.class.getClassLoader().getResource("RedeployTempTest-reload-temp-updated.xml");
   Files.copy(url1.openStream(), brokerXML);

   EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
   embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
   embeddedActiveMQ.start();

   final ReusableLatch latch = new ReusableLatch(1);

   Runnable tick = latch::countDown;

   embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);

   ConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory();
   Connection connection = connectionFactory.createConnection();
   connection.start();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   Destination destination = session.createQueue("queue");
   MessageProducer messageProducer = session.createProducer(destination);

   Destination replyTo = session.createTemporaryQueue();
   Message message = session.createTextMessage("hello");
   message.setJMSReplyTo(replyTo);
   messageProducer.send(message);

   try {
      latch.await(10, TimeUnit.SECONDS);

      Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
      brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
      latch.setCount(1);
      embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
      latch.await(10, TimeUnit.SECONDS);

      try (Connection connectionConsumer = connectionFactory.createConnection()) {
         connectionConsumer.start();
         try (Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
            Destination destinationConsumer = session.createQueue("queue");
            MessageConsumer messageConsumer = sessionConsumer.createConsumer(destinationConsumer);

            Message receivedMessage = messageConsumer.receive(1000);
            assertEquals("hello", ((TextMessage) receivedMessage).getText());

            Destination replyToDest = receivedMessage.getJMSReplyTo();
            Message message1 = sessionConsumer.createTextMessage("hi there");

            session.createProducer(replyToDest).send(message1);
         }
      }

      MessageConsumer messageConsumerProducer = session.createConsumer(replyTo);
      Message message2 = messageConsumerProducer.receive(1000);
      Assert.assertNotNull(message2);
      assertEquals("hi there", ((TextMessage) message2).getText());

   } finally {
      connection.close();
      embeddedActiveMQ.stop();
   }
}
 
源代码16 项目: qpid-jms   文件: MessageIntegrationTest.java
/**
 * Tests that the {@link AmqpMessageSupport#LEGACY_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL} set on a message to
 * indicate its 'reply-to' address represents a Topic results in the JMSReplyTo object being a
 * Topic. Ensure the consumers destination is not used by consuming from a Queue.
 *
 * @throws Exception if an error occurs during the test.
 */
@Test(timeout = 20000)
public void testReceivedMessageFromQueueWithLegacyReplyToTypeAnnotationForTopic() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        Connection connection = testFixture.establishConnecton(testPeer);
        connection.start();

        testPeer.expectBegin();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("myQueue");

        MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
        msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.LEGACY_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL.toString(), AmqpMessageSupport.LEGACY_TOPIC_ATTRIBUTE);

        PropertiesDescribedType props = new PropertiesDescribedType();
        String myTopicAddress = "myTopicAddress";
        props.setReplyTo(myTopicAddress);
        props.setMessageId("myMessageIDString");
        DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);

        testPeer.expectReceiverAttach();
        testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
        testPeer.expectDispositionThatIsAcceptedAndSettled();

        MessageConsumer messageConsumer = session.createConsumer(queue);
        Message receivedMessage = messageConsumer.receive(3000);
        testPeer.waitForAllHandlersToComplete(3000);

        assertNotNull(receivedMessage);

        Destination dest = receivedMessage.getJMSReplyTo();
        assertTrue(dest instanceof Topic);
        assertEquals(myTopicAddress, ((Topic)dest).getTopicName());

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
源代码17 项目: olat   文件: JmsSearchProvider.java
@Override
public void onMessage(final Message message) {
    if (log.isDebugEnabled()) {
        log.debug("onMessage, message=" + message);
    }
    try {
        final long sentTimestamp = message.getJMSTimestamp();
        final long currentTimestamp = System.currentTimeMillis();
        // check if received message is not too old because in case of overload we could have old search-messages
        if ((currentTimestamp - sentTimestamp) < receiveTimeout) {
            final String correlationID = message.getJMSCorrelationID();
            final Destination replyTo = message.getJMSReplyTo();
            if (message instanceof ObjectMessage) {
                final ObjectMessage objectMessage = (ObjectMessage) message;
                final SearchRequest searchRequest = (SearchRequest) objectMessage.getObject();
                taskExecutorService.runTask(new Runnable() {

                    @Override
                    public void run() {
                        onSearchMessage(searchRequest, correlationID, replyTo);
                    }

                });
            } else if (message instanceof TextMessage) {
                final TextMessage testMessage = (TextMessage) message;
                final String spellText = testMessage.getText();
                taskExecutorService.runTask(new Runnable() {

                    @Override
                    public void run() {
                        onSpellMessage(spellText, correlationID, replyTo);
                    }

                });
            }
        } else {
            // JMS message is too old, discard it (do nothing)
            log.warn("JMS message was too old, discard message,  timeout=" + receiveTimeout + "ms , received time=" + (currentTimestamp - sentTimestamp) + "ms");
        }
    } catch (final JMSException e) {
        log.error("error when receiving jms messages", e);
        return; // signal search not available
    } catch (final Error err) {
        log.warn("Error in onMessage, ", err);
        // OLAT-3973: don't throw exceptions here
    } catch (final RuntimeException runEx) {
        log.warn("RuntimeException in onMessage, ", runEx);
        // OLAT-3973: don't throw exceptions here
    }
}
 
源代码18 项目: olat   文件: JmsSearchProvider.java
@Override
public void onMessage(final Message message) {
    if (log.isDebugEnabled()) {
        log.debug("onMessage, message=" + message);
    }
    try {
        final long sentTimestamp = message.getJMSTimestamp();
        final long currentTimestamp = System.currentTimeMillis();
        // check if received message is not too old because in case of overload we could have old search-messages
        if ((currentTimestamp - sentTimestamp) < receiveTimeout) {
            final String correlationID = message.getJMSCorrelationID();
            final Destination replyTo = message.getJMSReplyTo();
            if (message instanceof ObjectMessage) {
                final ObjectMessage objectMessage = (ObjectMessage) message;
                final SearchRequest searchRequest = (SearchRequest) objectMessage.getObject();
                taskExecutorService.runTask(new Runnable() {

                    @Override
                    public void run() {
                        onSearchMessage(searchRequest, correlationID, replyTo);
                    }

                });
            } else if (message instanceof TextMessage) {
                final TextMessage testMessage = (TextMessage) message;
                final String spellText = testMessage.getText();
                taskExecutorService.runTask(new Runnable() {

                    @Override
                    public void run() {
                        onSpellMessage(spellText, correlationID, replyTo);
                    }

                });
            }
        } else {
            // JMS message is too old, discard it (do nothing)
            log.warn("JMS message was too old, discard message,  timeout=" + receiveTimeout + "ms , received time=" + (currentTimestamp - sentTimestamp) + "ms");
        }
    } catch (final JMSException e) {
        log.error("error when receiving jms messages", e);
        return; // signal search not available
    } catch (final Error err) {
        log.warn("Error in onMessage, ", err);
        // OLAT-3973: don't throw exceptions here
    } catch (final RuntimeException runEx) {
        log.warn("RuntimeException in onMessage, ", runEx);
        // OLAT-3973: don't throw exceptions here
    }
}
 
源代码19 项目: qpid-jms   文件: MessageIntegrationTest.java
private void doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(Class<? extends Destination> destType,
                                                                              String destPrefix,
                                                                              String destName,
                                                                              String replyName,
                                                                              String destAddress,
                                                                              String replyAddress,
                                                                              String annotationName,
                                                                              Object annotationValue,
                                                                              String replyAnnotationName,
                                                                              Object replyAnnotationValue) throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        // Have the test peer provide the destination prefixes as connection properties
        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
        properties.put(QUEUE_PREFIX, destPrefix);
        properties.put(TOPIC_PREFIX, destPrefix);

        Connection connection = testFixture.establishConnecton(testPeer, null, null, properties);
        connection.start();

        testPeer.expectBegin();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create the destination
        Destination dest = null;
        if (destType == Topic.class) {
            dest= session.createTopic(destName);
        } else if (destType == Queue.class) {
            dest = session.createQueue(destName);
        } else {
            fail("non-temporary destination type set");
        }

        MessageAnnotationsDescribedType msgAnnotations = null;
        if (annotationName != null || replyAnnotationName != null) {
            msgAnnotations = new MessageAnnotationsDescribedType();
            if (annotationName != null) {
                msgAnnotations.setSymbolKeyedAnnotation(annotationName, annotationValue);
            }

            if (replyAnnotationName != null) {
                msgAnnotations.setSymbolKeyedAnnotation(replyAnnotationName, replyAnnotationValue);
            }
        }

        PropertiesDescribedType props = new PropertiesDescribedType();
        props.setTo(destAddress);
        props.setReplyTo(replyAddress);
        DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);

        SourceMatcher sourceMatcher = new SourceMatcher();
        sourceMatcher.withAddress(equalTo(destAddress));

        testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
        testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
        testPeer.expectDispositionThatIsAcceptedAndSettled();

        MessageConsumer messageConsumer = session.createConsumer(dest);
        Message receivedMessage = messageConsumer.receive(3000);

        testPeer.waitForAllHandlersToComplete(2000);
        assertNotNull(receivedMessage);

        Destination jmsDest = receivedMessage.getJMSDestination();
        Destination jmsReplyTo = receivedMessage.getJMSReplyTo();

        assertNotNull("Expected JMSDestination but got null", jmsDest);
        assertNotNull("Expected JMSReplyTo but got null", jmsReplyTo);

        // Verify destination/replyto names on received message
        String recievedName = null;
        String recievedReplyName = null;
        if (destType == Topic.class) {
            recievedName = ((Topic) jmsDest).getTopicName();
            recievedReplyName = ((Topic) jmsReplyTo).getTopicName();
        } else if (destType == Queue.class) {
            recievedName = ((Queue) jmsDest).getQueueName();
            recievedReplyName = ((Queue) jmsReplyTo).getQueueName();
        }

        assertEquals("Unexpected name for JMSDestination", destName, recievedName);
        assertEquals("Unexpected name for JMSReplyTo", replyName, recievedReplyName);

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
/**
 * Determine a response destination for the given message.
 * <p>The default implementation first checks the JMS Reply-To
 * {@link Destination} of the supplied request; if that is not {@code null}
 * it is returned; if it is {@code null}, then the configured
 * {@link #resolveDefaultResponseDestination default response destination}
 * is returned; if this too is {@code null}, then an
 * {@link javax.jms.InvalidDestinationException} is thrown.
 * @param request the original incoming JMS message
 * @param response the outgoing JMS message about to be sent
 * @param session the JMS Session to operate on
 * @return the response destination (never {@code null})
 * @throws JMSException if thrown by JMS API methods
 * @throws javax.jms.InvalidDestinationException if no {@link Destination} can be determined
 * @see #setDefaultResponseDestination
 * @see javax.jms.Message#getJMSReplyTo()
 */
protected Destination getResponseDestination(Message request, Message response, Session session)
		throws JMSException {

	Destination replyTo = request.getJMSReplyTo();
	if (replyTo == null) {
		replyTo = resolveDefaultResponseDestination(session);
		if (replyTo == null) {
			throw new InvalidDestinationException("Cannot determine response destination: " +
					"Request message does not contain reply-to destination, and no default response destination set.");
		}
	}
	return replyTo;
}