javax.jms.Message#setJMSCorrelationID ( )源码实例Demo

下面列出了javax.jms.Message#setJMSCorrelationID ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: qpid-jms   文件: JmsProducer.java
private void doSend(Destination destination, Message message) throws JMSException {

        if (message == null) {
            throw new MessageFormatException("Message must not be null");
        }

        for (Map.Entry<String, Object> entry : messageProperties.entrySet()) {
            message.setObjectProperty(entry.getKey(), entry.getValue());
        }

        if (correlationId != null) {
            message.setJMSCorrelationID(correlationId);
        }
        if (correlationIdBytes != null) {
            message.setJMSCorrelationIDAsBytes(correlationIdBytes);
        }
        if (type != null) {
            message.setJMSType(type);
        }
        if (replyTo != null) {
            message.setJMSReplyTo(replyTo);
        }

        session.send(producer, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryDelay, completionListener);
    }
 
源代码2 项目: pooled-jms   文件: MockJMSProducer.java
private void doSend(Destination destination, Message message) throws JMSException {

        if (message == null) {
            throw new MessageFormatException("Message must not be null");
        }

        for (Map.Entry<String, Object> entry : messageProperties.entrySet()) {
            message.setObjectProperty(entry.getKey(), entry.getValue());
        }

        if (correlationId != null) {
            message.setJMSCorrelationID(correlationId);
        }
        if (correlationIdBytes != null) {
            message.setJMSCorrelationIDAsBytes(correlationIdBytes);
        }
        if (type != null) {
            message.setJMSType(type);
        }
        if (replyTo != null) {
            message.setJMSReplyTo(replyTo);
        }

        session.send(producer, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryDelay, completionListener);
    }
 
源代码3 项目: hazelcast-simulator   文件: CoordinatorClient.java
private void run() throws JMSException {
    Message message = remoteBroker.session.createMessage();

    if (requestId != null) {
        message.setJMSReplyTo(remoteBroker.replyQueue);
        message.setJMSCorrelationID(requestId);
    }

    message.setStringProperty("source", coordinatorAddress().toString());
    message.setStringProperty("target", target.toString());
    message.setStringProperty("payload", OperationCodec.toJson(op));
    message.setIntProperty("operationType", getOperationType(op).toInt());

    switch (target.getAddressLevel()) {
        case AGENT:
            remoteBroker.agentProducer.send(message);
            break;
        case WORKER:
            remoteBroker.workerProducer.send(message);
            break;
        default:
            throw new RuntimeException("unhandled target:" + target);
    }
}
 
源代码4 项目: activemq-artemis   文件: MessagePropertyTest.java
/**
 * Test that the <code>Message.getPropertyNames()</code> method does not return
 * the name of the JMS standard header fields (e.g. <code>JMSCorrelationID</code>).
 */
@Test
public void testGetPropertyNames() {
   try {
      Message message = senderSession.createMessage();
      message.setJMSCorrelationID("foo");
      Enumeration enumeration = message.getPropertyNames();
      while (enumeration.hasMoreElements()) {
         String propName = (String) enumeration.nextElement();
         boolean valid = !propName.startsWith("JMS") || propName.startsWith("JMSX");
         Assert.assertTrue("sec. 3.5.6 The getPropertyNames method does not return the names of " + "the JMS standard header field [e.g. JMSCorrelationID]: " +
                              propName, valid);
      }
   } catch (JMSException e) {
      fail(e);
   }
}
 
源代码5 项目: olat   文件: JmsSearchProvider.java
private void sendErrorResponse(final String jmsResponseStatus, final String correlationID, final Destination replyTo) {
    Session session = null;
    try {
        session = acquireSession();
        final Message responseMessage = session.createObjectMessage();
        responseMessage.setJMSCorrelationID(correlationID);
        responseMessage.setStringProperty(SearchClientJMSProxy.JMS_RESPONSE_STATUS_PROPERTY_NAME, jmsResponseStatus);
        final MessageProducer producer = session.createProducer(replyTo);
        if (log.isDebugEnabled()) {
            log.debug("onSearchMessage, send ResponseMessage=" + responseMessage + " to replyTo=" + replyTo);
        }
        producer.send(responseMessage);
        producer.close();
        return;

    } catch (final JMSException e) {
        log.error("error when receiving jms messages", e);
        return; // signal search not available
    } finally {
        releaseSession(session);
    }
}
 
源代码6 项目: olat   文件: JmsSearchProvider.java
private void sendErrorResponse(final String jmsResponseStatus, final String correlationID, final Destination replyTo) {
    Session session = null;
    try {
        session = acquireSession();
        final Message responseMessage = session.createObjectMessage();
        responseMessage.setJMSCorrelationID(correlationID);
        responseMessage.setStringProperty(SearchClientProxy.JMS_RESPONSE_STATUS_PROPERTY_NAME, jmsResponseStatus);
        final MessageProducer producer = session.createProducer(replyTo);
        if (log.isDebugEnabled()) {
            log.debug("onSearchMessage, send ResponseMessage=" + responseMessage + " to replyTo=" + replyTo);
        }
        producer.send(responseMessage);
        producer.close();
        return;

    } catch (final JMSException e) {
        log.error("error when receiving jms messages", e);
        return; // signal search not available
    } finally {
        releaseSession(session);
    }
}
 
源代码7 项目: qpid-jms   文件: MessageIntegrationTest.java
private void sentMessageWithCorrelationIdTestImpl(String stringCorrelationId, Object correlationIdForAmqpMessageClass) throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        Connection connection = testFixture.establishConnecton(testPeer);
        testPeer.expectBegin();
        testPeer.expectSenderAttach();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        String queueName = "myQueue";
        Queue queue = session.createQueue(queueName);
        MessageProducer producer = session.createProducer(queue);

        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);

        //Set matcher to validate the correlation-id
        propsMatcher.withCorrelationId(equalTo(correlationIdForAmqpMessageClass));

        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(headersMatcher);
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
        messageMatcher.setPropertiesMatcher(propsMatcher);
        messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
        testPeer.expectTransfer(messageMatcher);

        Message message = session.createTextMessage();
        message.setJMSCorrelationID(stringCorrelationId);

        producer.send(message);

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(3000);
    }
}
 
源代码8 项目: tomee   文件: JMSProducerImpl.java
@Override
public JMSProducer send(final Destination destination, final Message message) {
    if (message == null) {
        throw new MessageFormatRuntimeException("null message");
    }

    try {
        if (jmsHeaderCorrelationID != null) {
            message.setJMSCorrelationID(jmsHeaderCorrelationID);
        }
        if (jmsHeaderCorrelationIDAsBytes != null && jmsHeaderCorrelationIDAsBytes.length > 0) {
            message.setJMSCorrelationIDAsBytes(jmsHeaderCorrelationIDAsBytes);
        }
        if (jmsHeaderReplyTo != null) {
            message.setJMSReplyTo(jmsHeaderReplyTo);
        }
        if (jmsHeaderType != null) {
            message.setJMSType(jmsHeaderType);
        }

        setProperties(message);
        if (completionListener != null) {
            producer.send(destination, message, completionListener);
        } else {
            producer.send(destination, message);
        }
    } catch (final JMSException e) {
        throw toRuntimeException(e);
    }
    return this;
}
 
源代码9 项目: pooled-jms   文件: JmsPoolJMSProducer.java
private void doSend(Destination destination, Message message) throws JMSException {

        if (message == null) {
            throw new MessageFormatException("Message must not be null");
        }

        for (Map.Entry<String, Object> entry : messageProperties.entrySet()) {
            message.setObjectProperty(entry.getKey(), entry.getValue());
        }

        if (correlationId != null) {
            message.setJMSCorrelationID(correlationId);
        }
        if (correlationIdBytes != null) {
            message.setJMSCorrelationIDAsBytes(correlationIdBytes);
        }
        if (type != null) {
            message.setJMSType(type);
        }
        if (replyTo != null) {
            message.setJMSReplyTo(replyTo);
        }

        if (completionListener != null) {
            producer.send(destination, message, deliveryMode, priority, timeToLive, completionListener);
        } else {
            producer.send(destination, message, deliveryMode, priority, timeToLive);
        }
    }
 
@Test
public void testJMSDestination() throws Exception {
   Message m1 = queueProducerSession.createMessage();

   // Test with correlation id containing a message id
   final String messageID = "ID:812739812378";
   m1.setJMSCorrelationID(messageID);

   queueProducer.send(m1);
   Message m2 = queueConsumer.receive();
   ProxyAssertSupport.assertEquals(messageID, m2.getJMSCorrelationID());

   // Test with correlation id containing an application defined string
   Message m3 = queueProducerSession.createMessage();
   final String appDefinedID = "oiwedjiwjdoiwejdoiwjd";
   m3.setJMSCorrelationID(appDefinedID);

   queueProducer.send(m3);
   Message m4 = queueConsumer.receive();
   ProxyAssertSupport.assertEquals(appDefinedID, m4.getJMSCorrelationID());

   // Test with correlation id containing a byte[]
   Message m5 = queueProducerSession.createMessage();
   final byte[] bytes = new byte[]{-111, 45, 106, 3, -44};
   m5.setJMSCorrelationIDAsBytes(bytes);

   queueProducer.send(m5);
   Message m6 = queueConsumer.receive();
   assertByteArraysEqual(bytes, m6.getJMSCorrelationIDAsBytes());

}
 
源代码11 项目: activemq-artemis   文件: MessageTestBase.java
protected void prepareMessage(final Message m) throws JMSException {
   m.setBooleanProperty("booleanProperty", true);
   m.setByteProperty("byteProperty", (byte) 3);
   m.setDoubleProperty("doubleProperty", 4.0);
   m.setFloatProperty("floatProperty", 5.0f);
   m.setIntProperty("intProperty", 6);
   m.setLongProperty("longProperty", 7);
   m.setShortProperty("shortProperty", (short) 8);
   m.setStringProperty("stringProperty", "this is a String property");

   m.setJMSCorrelationID("this is the correlation ID");
   m.setJMSReplyTo(ActiveMQServerTestCase.topic1);
   m.setJMSType("someArbitraryType");
}
 
源代码12 项目: olat   文件: JmsSearchProvider.java
void onSpellMessage(final String spellText, final String correlationID, final Destination replyTo) {
    Session session = null;
    try {
        final Set<String> spellStrings = this.spellCheck(spellText);
        if (spellStrings != null) {
            final ArrayList<String> spellStringList = new ArrayList<String>(spellStrings);
            session = acquireSession();
            final Message responseMessage = session.createObjectMessage(spellStringList);
            responseMessage.setJMSCorrelationID(correlationID);
            final MessageProducer producer = session.createProducer(replyTo);
            producer.send(responseMessage);
            producer.close();
            return;
        }
        return; // signal search not available
    } catch (final JMSException e) {
        log.error("error when receiving jms messages", e);
        return; // signal search not available
        // do not throw exceptions here throw new OLATRuntimeException();
    } catch (final Throwable th) {
        log.error("error at ClusteredSearchProvider.receive()", th);
        return;// signal search not available
        // do not throw exceptions throw new OLATRuntimeException();
    } finally {
        releaseSession(session);
        DBFactory.getInstance().commitAndCloseSession();
    }
}
 
源代码13 项目: qpid-broker-j   文件: MessageCreator.java
private static void setJmsHeader(final MessageDescription messageDescription,
                                 final Message message)
        throws JMSException
{
    final HashMap<MessageDescription.MessageHeader, Serializable> header =
            messageDescription.getHeaders();

    if (header == null)
    {
        return;
    }

    for (Map.Entry<MessageDescription.MessageHeader, Serializable> entry : header.entrySet())
    {
        try
        {
            switch (entry.getKey())
            {
                case DESTINATION:
                    message.setJMSDestination((Destination) entry.getValue());
                    break;
                case DELIVERY_MODE:
                    message.setJMSDeliveryMode((Integer) entry.getValue());
                    break;
                case MESSAGE_ID:
                    message.setJMSMessageID((String) entry.getValue());
                    break;
                case TIMESTAMP:
                    message.setJMSTimestamp((Long) entry.getValue());
                    break;
                case CORRELATION_ID:
                    if (entry.getValue() instanceof byte[])
                    {
                        message.setJMSCorrelationIDAsBytes((byte[]) entry.getValue());
                    }
                    else
                    {
                        message.setJMSCorrelationID((String) entry.getValue());
                    }
                    break;
                case REPLY_TO:
                    throw new RuntimeException("The Test should not set the replyTo header."
                                               + " It should rather use the dedicated method");
                case REDELIVERED:
                    message.setJMSRedelivered((Boolean) entry.getValue());
                    break;
                case TYPE:
                    message.setJMSType((String) entry.getValue());
                    break;
                case EXPIRATION:
                    message.setJMSExpiration((Long) entry.getValue());
                    break;
                case PRIORITY:
                    message.setJMSPriority((Integer) entry.getValue());
                    break;
                default:
                    throw new RuntimeException(String.format("unexpected message header '%s'", entry.getKey()));
            }
        }
        catch (ClassCastException e)
        {
            throw new RuntimeException(String.format("Could not set message header '%s' to this value: %s",
                                                     entry.getKey(),
                                                     entry.getValue()), e);
        }
    }
}
 
源代码14 项目: olat   文件: JmsSearchProvider.java
void onSearchMessage(final SearchRequest searchRequest, final String correlationID, final Destination replyTo) {
    if (log.isDebugEnabled()) {
        log.debug("onSearchMessage, correlationID=" + correlationID + " , replyTo=" + replyTo + " , searchRequest=" + searchRequest);
    }
    Session session = null;
    try {
        final Identity identity = baseSecurity.loadIdentityByKey(searchRequest.getIdentityId());

        final SearchResults searchResults = this.doSearch(searchRequest.getQueryString(), searchRequest.getCondQueries(), identity, searchRequest.getRoles(),
                searchRequest.getFirstResult(), searchRequest.getMaxResults(), searchRequest.isDoHighlighting());
        if (log.isDebugEnabled()) {
            log.debug("searchResults: " + searchResults.getLength());
        }
        if (searchResults != null) {
            session = acquireSession();
            final Message responseMessage = session.createObjectMessage(searchResults);
            responseMessage.setJMSCorrelationID(correlationID);
            responseMessage.setStringProperty(SearchClientJMSProxy.JMS_RESPONSE_STATUS_PROPERTY_NAME, SearchClientJMSProxy.JMS_RESPONSE_STATUS_OK);
            final MessageProducer producer = session.createProducer(replyTo);
            if (log.isDebugEnabled()) {
                log.debug("onSearchMessage, send ResponseMessage=" + responseMessage + " to replyTo=" + replyTo);
            }
            producer.send(responseMessage);
            producer.close();
            return;
        } else {
            log.info("onSearchMessage, no searchResults (searchResults=null)");
        }
    } catch (final JMSException e) {
        log.error("error when receiving jms messages", e);
        return; // signal search not available
        // do not throw exceptions here throw new OLATRuntimeException();
    } catch (final ServiceNotAvailableException sex) {
        sendErrorResponse(SearchClientJMSProxy.JMS_RESPONSE_STATUS_SERVICE_NOT_AVAILABLE_EXCEPTION, correlationID, replyTo);
    } catch (final QueryException qex) {
        sendErrorResponse(SearchClientJMSProxy.JMS_RESPONSE_STATUS_QUERY_EXCEPTION, correlationID, replyTo);
    } catch (final Throwable th) {
        log.error("error at ClusteredSearchProvider.receive()", th);
        return;// signal search not available
        // do not throw exceptions throw new OLATRuntimeException();
    } finally {
        releaseSession(session);
        DBFactory.getInstance().commitAndCloseSession();
    }
}
 
@Test
public void headers() throws Exception
{
    final Queue queue = createQueue(getTestName());
    final Destination replyTo = createQueue(getTestName() + "_replyTo");
    final Connection consumerConnection = getConnection();
    try
    {
        final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        final MessageConsumer consumer = consumerSession.createConsumer(queue);

        final String correlationId = "testCorrelationId";
        final String jmsType = "testJmsType";

        final int priority = 1;
        final long timeToLive = 30 * 60 * 1000;
        final Connection producerConnection = getConnection();
        try
        {
            final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            final MessageProducer producer = producerSession.createProducer(queue);

            final Message message = producerSession.createMessage();
            message.setJMSCorrelationID(correlationId);
            message.setJMSType(jmsType);
            message.setJMSReplyTo(replyTo);

            long currentTime = System.currentTimeMillis();
            producer.send(message, DeliveryMode.NON_PERSISTENT, priority, timeToLive);

            consumerConnection.start();

            Message receivedMessage = consumer.receive(getReceiveTimeout());
            assertNotNull(receivedMessage);

            assertEquals("JMSCorrelationID mismatch", correlationId, receivedMessage.getJMSCorrelationID());
            assertEquals("JMSType mismatch", message.getJMSType(), receivedMessage.getJMSType());
            assertEquals("JMSReply To mismatch", message.getJMSReplyTo(), receivedMessage.getJMSReplyTo());
            assertTrue("JMSMessageID does not start 'ID:'", receivedMessage.getJMSMessageID().startsWith("ID:"));
            assertEquals("JMSPriority mismatch", priority, receivedMessage.getJMSPriority());
            assertTrue(String.format(
                    "Unexpected JMSExpiration: got '%d', but expected value equals or greater than '%d'",
                    receivedMessage.getJMSExpiration(),
                    currentTime + timeToLive),

                       receivedMessage.getJMSExpiration() >= currentTime + timeToLive
                       && receivedMessage.getJMSExpiration() <= System.currentTimeMillis() + timeToLive);
        }
        finally
        {
            producerConnection.close();
        }
    }
    finally
    {
        consumerConnection.close();
    }
}
 
源代码16 项目: cxf   文件: JMSTestUtil.java
/**
 * @param testcase
 * @param session
 * @param rtd
 * @return
 * @throws JMSException
 */
public static Message buildJMSMessageFromTestCase(TestCaseType testcase, Session session,
                                                  Destination rtd) throws JMSException {
    MessagePropertiesType messageProperties = testcase.getRequestMessage();
    Message jmsMessage = null;
    String messageType = messageProperties.getMessageType();
    if ("text".equals(messageType)) {
        jmsMessage = session.createTextMessage();
        ((TextMessage)jmsMessage).setText("test");
    } else if ("byte".equals(messageType)) {
        jmsMessage = session.createBytesMessage();
    } else if ("stream".equals(messageType)) {
        jmsMessage = session.createStreamMessage();
        ((StreamMessage)jmsMessage).writeString("test");
    } else {
        jmsMessage = session.createBytesMessage();
    }

    jmsMessage.setJMSReplyTo(rtd);

    if (messageProperties.isSetDeliveryMode()) {
        jmsMessage.setJMSDeliveryMode(messageProperties.getDeliveryMode());
    }
    if (messageProperties.isSetExpiration()) {
        jmsMessage.setJMSExpiration(messageProperties.getExpiration());
    }
    if (messageProperties.isSetPriority()) {
        jmsMessage.setJMSPriority(messageProperties.getPriority());
    }
    if (messageProperties.isSetExpiration()) {
        jmsMessage.setJMSPriority(messageProperties.getExpiration());
    }
    if (messageProperties.isSetCorrelationID()) {
        jmsMessage.setJMSCorrelationID(messageProperties.getCorrelationID());
    }

    if (messageProperties.isSetTargetService()
        && !"".equals(messageProperties.getTargetService().trim())) {
        jmsMessage.setStringProperty(JMSSpecConstants.TARGETSERVICE_FIELD, messageProperties
            .getTargetService().trim());
    }

    if (messageProperties.isSetBindingVersion()
        && !"".equals(messageProperties.getBindingVersion().trim())) {
        jmsMessage.setStringProperty(JMSSpecConstants.BINDINGVERSION_FIELD, messageProperties
                                     .getBindingVersion().trim());
    }

    if (messageProperties.isSetContentType()
        && !"".equals(messageProperties.getContentType().trim())) {
        jmsMessage.setStringProperty(JMSSpecConstants.CONTENTTYPE_FIELD, messageProperties
            .getContentType().trim());
    }

    if (messageProperties.isSetSoapAction()
        && !"".equals(messageProperties.getSoapAction().trim())) {
        jmsMessage.setStringProperty(JMSSpecConstants.SOAPACTION_FIELD, messageProperties
            .getSoapAction().trim());
    }

    if (messageProperties.isSetRequestURI()
        && !"".equals(messageProperties.getRequestURI().trim())) {
        jmsMessage.setStringProperty(JMSSpecConstants.REQUESTURI_FIELD, messageProperties
            .getRequestURI().trim());
    }
    return jmsMessage;
}
 
源代码17 项目: mdw   文件: ExternalEventListener.java
public void run() {
    QueueConnection connection = null;
    StandardLogger logger = LoggerUtil.getStandardLogger();
    try {
        String txt = message.getText();
        if (logger.isDebugEnabled()) {
            logger.debug("JMS Listener receives request: " + txt);
        }
        String resp;
        ListenerHelper helper = new ListenerHelper();
        Map<String, String> metaInfo = new HashMap<>();
        metaInfo.put(Listener.METAINFO_PROTOCOL, Listener.METAINFO_PROTOCOL_JMS);
        metaInfo.put(Listener.METAINFO_REQUEST_PATH, getQueueName());
        metaInfo.put(Listener.METAINFO_SERVICE_CLASS, this.getClass().getName());
        metaInfo.put(Listener.METAINFO_REQUEST_ID, message.getJMSMessageID());
        metaInfo.put(Listener.METAINFO_CORRELATION_ID, message.getJMSCorrelationID());
        if (message.getJMSReplyTo() != null)
            metaInfo.put("ReplyTo", message.getJMSReplyTo().toString());

            resp = helper.processRequest(txt, metaInfo);
            Queue respQueue = (Queue) message.getJMSReplyTo();
            String correlId = message.getJMSCorrelationID();
            if (resp != null && respQueue != null) {
                // String msgId = jmsMessage.getJMSMessageID();
                QueueConnectionFactory qcf
                    = JMSServices.getInstance().getQueueConnectionFactory(null);
                connection = qcf.createQueueConnection();

                Message respMsg;
                try (QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)) {
                    try (QueueSender sender = session.createSender(respQueue)) {
                        respMsg = session.createTextMessage(resp);
                        respMsg.setJMSCorrelationID(correlId);
                        sender.send(respMsg);
                    }
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("JMS Listener sends response (corr id='" +
                            correlId + "'): " + resp);
                }
            }

    }
    catch (Throwable ex) {
        logger.error(ex.getMessage(), ex);
    }
    finally {
        if (connection != null) {
            try {
                connection.close();
            } catch (JMSException e) {
                logger.error(e.getMessage(), e);
            }
        }
    }
}
 
源代码18 项目: olat   文件: SearchClientProxy.java
private Message doSearchRequest(final Session session, final Message message) throws JMSException {
    final Destination replyQueue = acquireTempQueue(session);
    if (log.isDebugEnabled()) {
        log.debug("doSearchRequest replyQueue=" + replyQueue);
    }
    try {
        final MessageConsumer responseConsumer = session.createConsumer(replyQueue);

        message.setJMSReplyTo(replyQueue);
        final String correlationId = createRandomString();
        message.setJMSCorrelationID(correlationId);

        final MessageProducer producer = session.createProducer(searchQueue_);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        producer.setTimeToLive(timeToLive_);
        if (log.isDebugEnabled()) {
            log.debug("Sending search request message with correlationId=" + correlationId);
        }
        producer.send(message);
        producer.close();

        Message returnedMessage = null;
        final long start = System.currentTimeMillis();
        while (true) {
            final long diff = (start + receiveTimeout_) - System.currentTimeMillis();
            if (diff <= 0) {
                // timeout
                log.info("Timeout in search. Remaining time zero or negative.");
                break;
            }
            if (log.isDebugEnabled()) {
                log.debug("doSearchRequest: call receive with timeout=" + diff);
            }
            returnedMessage = responseConsumer.receive(diff);
            if (returnedMessage == null) {
                // timeout case, we're stopping now with a reply...
                log.info("Timeout in search. Repy was null.");
                break;
            } else if (!correlationId.equals(returnedMessage.getJMSCorrelationID())) {
                // we got an old reply from a previous search request
                log.info("Got a response with a wrong correlationId. Ignoring and waiting for the next");
                continue;
            } else {
                // we got a valid reply
                break;
            }
        }
        responseConsumer.close();
        if (log.isDebugEnabled()) {
            log.debug("doSearchRequest: returnedMessage=" + returnedMessage);
        }
        return returnedMessage;
    } finally {
        releaseTempQueue(replyQueue);
    }
}
 
/**
 * Post-process the given response message before it will be sent.
 * <p>The default implementation sets the response's correlation id
 * to the request message's correlation id, if any; otherwise to the
 * request message id.
 * @param request the original incoming JMS message
 * @param response the outgoing JMS message about to be sent
 * @throws JMSException if thrown by JMS API methods
 * @see javax.jms.Message#setJMSCorrelationID
 */
protected void postProcessResponse(Message request, Message response) throws JMSException {
	String correlation = request.getJMSCorrelationID();
	if (correlation == null) {
		correlation = request.getJMSMessageID();
	}
	response.setJMSCorrelationID(correlation);
}
 
/**
 * Post-process the given response message before it will be sent.
 * <p>The default implementation sets the response's correlation id
 * to the request message's correlation id, if any; otherwise to the
 * request message id.
 * @param request the original incoming JMS message
 * @param response the outgoing JMS message about to be sent
 * @throws JMSException if thrown by JMS API methods
 * @see javax.jms.Message#setJMSCorrelationID
 */
protected void postProcessResponse(Message request, Message response) throws JMSException {
	String correlation = request.getJMSCorrelationID();
	if (correlation == null) {
		correlation = request.getJMSMessageID();
	}
	response.setJMSCorrelationID(correlation);
}