下面列出了javax.jms.Message#getJMSReplyTo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void onMessage(final Message request) {
try {
System.out.println("Received request message: " + ((TextMessage) request).getText());
// Extract the ReplyTo destination
Destination replyDestination = request.getJMSReplyTo();
System.out.println("Reply to queue: " + replyDestination);
// Create the reply message
TextMessage replyMessage = session.createTextMessage("A reply message");
// Set the CorrelationID, using message id.
replyMessage.setJMSCorrelationID(request.getJMSMessageID());
// Send out the reply message
replyProducer.send(replyDestination, replyMessage);
System.out.println("Reply sent");
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void onMessage(final Message request) {
TextMessage textMessage = (TextMessage) request;
try {
// retrieve the request's text
String text = textMessage.getText();
// create a reply containing the reversed text
TextMessage reply = session.createTextMessage(TextReverserService.reverse(text));
// retrieve the destination to reply to
Destination replyTo = request.getJMSReplyTo();
// create a producer to send the reply
try (MessageProducer producer = session.createProducer(replyTo)) {
// send the reply
producer.send(reply);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* Test that a <code>Destination</code> set by the <code>setJMSReplyTo()</code>
* method on a sended message corresponds to the <code>Destination</code> get by
* the </code>getJMSReplyTo()</code> method.
*/
@Test
public void testJMSReplyTo_1() {
try {
Message message = senderSession.createMessage();
message.setJMSReplyTo(senderQueue);
sender.send(message);
Message msg = receiver.receive(TestConfig.TIMEOUT);
Destination dest = msg.getJMSReplyTo();
Assert.assertTrue("JMS ReplyTo header field should be a Queue", dest instanceof Queue);
Queue replyTo = (Queue) dest;
Assert.assertEquals("JMS ReplyTo header field should be equals to the sender queue", replyTo.getQueueName(), senderQueue.getQueueName());
} catch (JMSException e) {
fail(e);
}
}
/**
* Test that if the JMS ReplyTo header field has been set as a <code>TemporaryQueue</code>,
* it will be rightly get also as a <code>TemporaryQueue</code>
* (and not only as a <code>Queue</code>).
*/
@Test
public void testJMSReplyTo_2() {
try {
TemporaryQueue tempQueue = senderSession.createTemporaryQueue();
Message message = senderSession.createMessage();
message.setJMSReplyTo(tempQueue);
sender.send(message);
Message msg = receiver.receive(TestConfig.TIMEOUT);
Destination dest = msg.getJMSReplyTo();
Assert.assertTrue("JMS ReplyTo header field should be a TemporaryQueue", dest instanceof TemporaryQueue);
Queue replyTo = (Queue) dest;
Assert.assertEquals("JMS ReplyTo header field should be equals to the temporary queue", replyTo.getQueueName(), tempQueue.getQueueName());
} catch (JMSException e) {
fail(e);
}
}
private void handle() throws Exception {
Message message = consumer.receive();
OperationType operationType = OperationType.fromInt(message.getIntProperty("operationType"));
String operationData = message.getStringProperty("payload");
SimulatorOperation op = OperationCodec.fromJson(operationData, operationType.getClassType());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Received operation:" + op);
}
PromiseImpl promise = new PromiseImpl();
promise.replyTo = message.getJMSReplyTo();
promise.correlationId = message.getJMSCorrelationID();
promise.op = op;
SimulatorAddress source = SimulatorAddress.fromString(message.getStringProperty("source"));
try {
processor.process(op, source, promise);
} catch (Exception e) {
if (stop) {
throw e;
} else {
LOGGER.warn(e.getMessage(), e);
promise.answer(e);
}
}
}
private ClientMessage receiveMessage(final Session session,
final Destination queue,
final MessageDescription messageDescription) throws Exception
{
final Message message;
MessageConsumer consumer = session.createConsumer(queue);
try
{
message = consumer.receive(RECEIVE_TIMEOUT);
System.out.println(String.format("Received message: %s", message));
MessageVerifier.verifyMessage(messageDescription, message);
}
finally
{
consumer.close();
}
if (message != null && message.getJMSReplyTo() != null)
{
System.out.println(String.format("Received message had replyTo: %s", message.getJMSReplyTo()));
sendReply(session,
message.getJMSReplyTo(),
messageDescription.getHeader(MessageDescription.MessageHeader.CORRELATION_ID));
}
return buildClientMessage(message);
}
private String getDestName(Message message) throws JMSException {
Destination replyTo = message.getJMSReplyTo();
if (replyTo instanceof Queue) {
return ((Queue)replyTo).getQueueName();
} else if (replyTo instanceof Topic) {
return ((Topic)replyTo).getTopicName();
}
return null;
}
/**
* If getJMSReplyTo is set then send message back to reply producer.
*
* @param message
*/
protected void sendReply(Message message)
{
try {
if (message.getJMSReplyTo() != null) { // Send reply only if the replyTo destination is set
replyProducer.send(message.getJMSReplyTo(),
getSession().createTextMessage("Reply: " + message.getJMSMessageID()));
}
} catch (JMSException ex) {
LOG.error(ex.getLocalizedMessage());
throwable.set(ex);
throw new RuntimeException(ex);
}
}
@Override
public void onMessage(final Message m) {
try {
Destination queue = m.getJMSReplyTo();
Message m2 = sess.createTextMessage("This is the response");
sender.send(queue, m2);
} catch (JMSException e) {
instanceLog.error(e);
}
}
/**
* Tests that lack of any destination type annotation value (via either
* {@link AmqpDestinationHelper#JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL}
* or {@link AmqpMessageSupport#LEGACY_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL}) set
* on a message to indicate type of its 'reply-to' address results in it
* being classed as the same type as the consumer destination.
*
* @throws Exception if an error occurs during the test.
*/
@Test(timeout = 20000)
public void testReceivedMessageFromTopicWithReplyToWithoutTypeAnnotationResultsInUseOfConsumerDestinationType() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("myTopic");
String myReplyTopicAddress = "myReplyTopicAddress";
PropertiesDescribedType props = new PropertiesDescribedType();
props.setReplyTo(myReplyTopicAddress);
props.setMessageId("myMessageIDString");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(topic);
Message receivedMessage = messageConsumer.receive(3000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
Destination dest = receivedMessage.getJMSReplyTo();
assertNotNull("JMSReplyTo should not be null", dest);
assertTrue("Destination not of expected type: " + dest.getClass(), dest instanceof Topic);
assertEquals(myReplyTopicAddress, ((Topic)dest).getTopicName());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Override
public void onMessage(Message msg) {
try {
TextMessage textMsg = (TextMessage) msg;
String payload = "REPLY: " + textMsg.getText();
Destination replyTo;
replyTo = msg.getJMSReplyTo();
textMsg.clearBody();
textMsg.setText(payload);
LOG.info("Sending response: " + textMsg);
requestServerProducer.send(replyTo, textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
public EchoRequestProcessor(Session sess, Message req) throws Exception {
this.session = sess;
this.request = req;
this.resp_dest = req.getJMSReplyTo();
if (resp_dest == null) {
throw new Exception("invalid request: no reply-to destination given");
}
this.msg_prod = session.createProducer(this.resp_dest);
}
public static Map<String, String> createAttributeMap(final Message message) throws JMSException {
final Map<String, String> attributes = new HashMap<>();
final Enumeration<?> enumeration = message.getPropertyNames();
while (enumeration.hasMoreElements()) {
final String propName = (String) enumeration.nextElement();
final Object value = message.getObjectProperty(propName);
if (value == null) {
attributes.put(ATTRIBUTE_PREFIX + propName, "");
attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, "Unknown");
continue;
}
final String valueString = value.toString();
attributes.put(ATTRIBUTE_PREFIX + propName, valueString);
final String propType;
if (value instanceof String) {
propType = PROP_TYPE_STRING;
} else if (value instanceof Double) {
propType = PROP_TYPE_DOUBLE;
} else if (value instanceof Float) {
propType = PROP_TYPE_FLOAT;
} else if (value instanceof Long) {
propType = PROP_TYPE_LONG;
} else if (value instanceof Integer) {
propType = PROP_TYPE_INTEGER;
} else if (value instanceof Short) {
propType = PROP_TYPE_SHORT;
} else if (value instanceof Byte) {
propType = PROP_TYPE_BYTE;
} else if (value instanceof Boolean) {
propType = PROP_TYPE_BOOLEAN;
} else {
propType = PROP_TYPE_OBJECT;
}
attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, propType);
}
if (message.getJMSCorrelationID() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
}
if (message.getJMSDestination() != null) {
String destinationName;
if (message.getJMSDestination() instanceof Queue) {
destinationName = ((Queue) message.getJMSDestination()).getQueueName();
} else {
destinationName = ((Topic) message.getJMSDestination()).getTopicName();
}
attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName);
}
if (message.getJMSMessageID() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());
}
if (message.getJMSReplyTo() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_REPLY_TO, message.getJMSReplyTo().toString());
}
if (message.getJMSType() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_TYPE, message.getJMSType());
}
attributes.put(ATTRIBUTE_PREFIX + JMS_DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode()));
attributes.put(ATTRIBUTE_PREFIX + JMS_EXPIRATION, String.valueOf(message.getJMSExpiration()));
attributes.put(ATTRIBUTE_PREFIX + JMS_PRIORITY, String.valueOf(message.getJMSPriority()));
attributes.put(ATTRIBUTE_PREFIX + JMS_REDELIVERED, String.valueOf(message.getJMSRedelivered()));
attributes.put(ATTRIBUTE_PREFIX + JMS_TIMESTAMP, String.valueOf(message.getJMSTimestamp()));
return attributes;
}
private static void verifyMessageHeaders(final MessageDescription messageDescription,
final Message message) throws VerificationException
{
try
{
for (Map.Entry<MessageDescription.MessageHeader, Serializable> entry : messageDescription.getHeaders()
.entrySet())
{
Object actualValue;
switch (entry.getKey())
{
case DESTINATION:
actualValue = message.getJMSDestination();
break;
case DELIVERY_MODE:
actualValue = message.getJMSDeliveryMode();
break;
case MESSAGE_ID:
actualValue = message.getJMSMessageID();
break;
case TIMESTAMP:
actualValue = message.getJMSTimestamp();
break;
case CORRELATION_ID:
if (entry.getValue() instanceof byte[])
{
actualValue = message.getJMSCorrelationIDAsBytes();
}
else
{
actualValue = message.getJMSCorrelationID();
}
break;
case REPLY_TO:
actualValue = message.getJMSReplyTo();
break;
case REDELIVERED:
actualValue = message.getJMSRedelivered();
break;
case TYPE:
actualValue = message.getJMSType();
break;
case EXPIRATION:
actualValue = message.getJMSExpiration();
break;
case PRIORITY:
actualValue = message.getJMSPriority();
break;
default:
throw new RuntimeException(String.format("unexpected message header '%s'", entry.getKey()));
}
verifyEquals(String.format("Unexpected message header '%s'", entry.getKey()),
entry.getValue(),
actualValue);
}
}
catch (JMSException e)
{
throw new RuntimeException("Unexpected exception during message header verification", e);
}
}
@Test
public void testRedeployAddressQueueOpenWire() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("RedeployTempTest-reload-temp.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("RedeployTempTest-reload-temp-updated.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = latch::countDown;
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
ConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue");
MessageProducer messageProducer = session.createProducer(destination);
Destination replyTo = session.createTemporaryQueue();
Message message = session.createTextMessage("hello");
message.setJMSReplyTo(replyTo);
messageProducer.send(message);
try {
latch.await(10, TimeUnit.SECONDS);
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
try (Connection connectionConsumer = connectionFactory.createConnection()) {
connectionConsumer.start();
try (Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Destination destinationConsumer = session.createQueue("queue");
MessageConsumer messageConsumer = sessionConsumer.createConsumer(destinationConsumer);
Message receivedMessage = messageConsumer.receive(1000);
assertEquals("hello", ((TextMessage) receivedMessage).getText());
Destination replyToDest = receivedMessage.getJMSReplyTo();
Message message1 = sessionConsumer.createTextMessage("hi there");
session.createProducer(replyToDest).send(message1);
}
}
MessageConsumer messageConsumerProducer = session.createConsumer(replyTo);
Message message2 = messageConsumerProducer.receive(1000);
Assert.assertNotNull(message2);
assertEquals("hi there", ((TextMessage) message2).getText());
} finally {
connection.close();
embeddedActiveMQ.stop();
}
}
/**
* Tests that the {@link AmqpMessageSupport#LEGACY_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL} set on a message to
* indicate its 'reply-to' address represents a Topic results in the JMSReplyTo object being a
* Topic. Ensure the consumers destination is not used by consuming from a Queue.
*
* @throws Exception if an error occurs during the test.
*/
@Test(timeout = 20000)
public void testReceivedMessageFromQueueWithLegacyReplyToTypeAnnotationForTopic() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.LEGACY_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL.toString(), AmqpMessageSupport.LEGACY_TOPIC_ATTRIBUTE);
PropertiesDescribedType props = new PropertiesDescribedType();
String myTopicAddress = "myTopicAddress";
props.setReplyTo(myTopicAddress);
props.setMessageId("myMessageIDString");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(3000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
Destination dest = receivedMessage.getJMSReplyTo();
assertTrue(dest instanceof Topic);
assertEquals(myTopicAddress, ((Topic)dest).getTopicName());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Override
public void onMessage(final Message message) {
if (log.isDebugEnabled()) {
log.debug("onMessage, message=" + message);
}
try {
final long sentTimestamp = message.getJMSTimestamp();
final long currentTimestamp = System.currentTimeMillis();
// check if received message is not too old because in case of overload we could have old search-messages
if ((currentTimestamp - sentTimestamp) < receiveTimeout) {
final String correlationID = message.getJMSCorrelationID();
final Destination replyTo = message.getJMSReplyTo();
if (message instanceof ObjectMessage) {
final ObjectMessage objectMessage = (ObjectMessage) message;
final SearchRequest searchRequest = (SearchRequest) objectMessage.getObject();
taskExecutorService.runTask(new Runnable() {
@Override
public void run() {
onSearchMessage(searchRequest, correlationID, replyTo);
}
});
} else if (message instanceof TextMessage) {
final TextMessage testMessage = (TextMessage) message;
final String spellText = testMessage.getText();
taskExecutorService.runTask(new Runnable() {
@Override
public void run() {
onSpellMessage(spellText, correlationID, replyTo);
}
});
}
} else {
// JMS message is too old, discard it (do nothing)
log.warn("JMS message was too old, discard message, timeout=" + receiveTimeout + "ms , received time=" + (currentTimestamp - sentTimestamp) + "ms");
}
} catch (final JMSException e) {
log.error("error when receiving jms messages", e);
return; // signal search not available
} catch (final Error err) {
log.warn("Error in onMessage, ", err);
// OLAT-3973: don't throw exceptions here
} catch (final RuntimeException runEx) {
log.warn("RuntimeException in onMessage, ", runEx);
// OLAT-3973: don't throw exceptions here
}
}
@Override
public void onMessage(final Message message) {
if (log.isDebugEnabled()) {
log.debug("onMessage, message=" + message);
}
try {
final long sentTimestamp = message.getJMSTimestamp();
final long currentTimestamp = System.currentTimeMillis();
// check if received message is not too old because in case of overload we could have old search-messages
if ((currentTimestamp - sentTimestamp) < receiveTimeout) {
final String correlationID = message.getJMSCorrelationID();
final Destination replyTo = message.getJMSReplyTo();
if (message instanceof ObjectMessage) {
final ObjectMessage objectMessage = (ObjectMessage) message;
final SearchRequest searchRequest = (SearchRequest) objectMessage.getObject();
taskExecutorService.runTask(new Runnable() {
@Override
public void run() {
onSearchMessage(searchRequest, correlationID, replyTo);
}
});
} else if (message instanceof TextMessage) {
final TextMessage testMessage = (TextMessage) message;
final String spellText = testMessage.getText();
taskExecutorService.runTask(new Runnable() {
@Override
public void run() {
onSpellMessage(spellText, correlationID, replyTo);
}
});
}
} else {
// JMS message is too old, discard it (do nothing)
log.warn("JMS message was too old, discard message, timeout=" + receiveTimeout + "ms , received time=" + (currentTimestamp - sentTimestamp) + "ms");
}
} catch (final JMSException e) {
log.error("error when receiving jms messages", e);
return; // signal search not available
} catch (final Error err) {
log.warn("Error in onMessage, ", err);
// OLAT-3973: don't throw exceptions here
} catch (final RuntimeException runEx) {
log.warn("RuntimeException in onMessage, ", runEx);
// OLAT-3973: don't throw exceptions here
}
}
private void doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(Class<? extends Destination> destType,
String destPrefix,
String destName,
String replyName,
String destAddress,
String replyAddress,
String annotationName,
Object annotationValue,
String replyAnnotationName,
Object replyAnnotationValue) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Have the test peer provide the destination prefixes as connection properties
Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
properties.put(QUEUE_PREFIX, destPrefix);
properties.put(TOPIC_PREFIX, destPrefix);
Connection connection = testFixture.establishConnecton(testPeer, null, null, properties);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination
Destination dest = null;
if (destType == Topic.class) {
dest= session.createTopic(destName);
} else if (destType == Queue.class) {
dest = session.createQueue(destName);
} else {
fail("non-temporary destination type set");
}
MessageAnnotationsDescribedType msgAnnotations = null;
if (annotationName != null || replyAnnotationName != null) {
msgAnnotations = new MessageAnnotationsDescribedType();
if (annotationName != null) {
msgAnnotations.setSymbolKeyedAnnotation(annotationName, annotationValue);
}
if (replyAnnotationName != null) {
msgAnnotations.setSymbolKeyedAnnotation(replyAnnotationName, replyAnnotationValue);
}
}
PropertiesDescribedType props = new PropertiesDescribedType();
props.setTo(destAddress);
props.setReplyTo(replyAddress);
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
SourceMatcher sourceMatcher = new SourceMatcher();
sourceMatcher.withAddress(equalTo(destAddress));
testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(dest);
Message receivedMessage = messageConsumer.receive(3000);
testPeer.waitForAllHandlersToComplete(2000);
assertNotNull(receivedMessage);
Destination jmsDest = receivedMessage.getJMSDestination();
Destination jmsReplyTo = receivedMessage.getJMSReplyTo();
assertNotNull("Expected JMSDestination but got null", jmsDest);
assertNotNull("Expected JMSReplyTo but got null", jmsReplyTo);
// Verify destination/replyto names on received message
String recievedName = null;
String recievedReplyName = null;
if (destType == Topic.class) {
recievedName = ((Topic) jmsDest).getTopicName();
recievedReplyName = ((Topic) jmsReplyTo).getTopicName();
} else if (destType == Queue.class) {
recievedName = ((Queue) jmsDest).getQueueName();
recievedReplyName = ((Queue) jmsReplyTo).getQueueName();
}
assertEquals("Unexpected name for JMSDestination", destName, recievedName);
assertEquals("Unexpected name for JMSReplyTo", replyName, recievedReplyName);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
/**
* Determine a response destination for the given message.
* <p>The default implementation first checks the JMS Reply-To
* {@link Destination} of the supplied request; if that is not {@code null}
* it is returned; if it is {@code null}, then the configured
* {@link #resolveDefaultResponseDestination default response destination}
* is returned; if this too is {@code null}, then an
* {@link javax.jms.InvalidDestinationException} is thrown.
* @param request the original incoming JMS message
* @param response the outgoing JMS message about to be sent
* @param session the JMS Session to operate on
* @return the response destination (never {@code null})
* @throws JMSException if thrown by JMS API methods
* @throws javax.jms.InvalidDestinationException if no {@link Destination} can be determined
* @see #setDefaultResponseDestination
* @see javax.jms.Message#getJMSReplyTo()
*/
protected Destination getResponseDestination(Message request, Message response, Session session)
throws JMSException {
Destination replyTo = request.getJMSReplyTo();
if (replyTo == null) {
replyTo = resolveDefaultResponseDestination(session);
if (replyTo == null) {
throw new InvalidDestinationException("Cannot determine response destination: " +
"Request message does not contain reply-to destination, and no default response destination set.");
}
}
return replyTo;
}