下面列出了javax.jms.Message#setJMSCorrelationID ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void doSend(Destination destination, Message message) throws JMSException {
if (message == null) {
throw new MessageFormatException("Message must not be null");
}
for (Map.Entry<String, Object> entry : messageProperties.entrySet()) {
message.setObjectProperty(entry.getKey(), entry.getValue());
}
if (correlationId != null) {
message.setJMSCorrelationID(correlationId);
}
if (correlationIdBytes != null) {
message.setJMSCorrelationIDAsBytes(correlationIdBytes);
}
if (type != null) {
message.setJMSType(type);
}
if (replyTo != null) {
message.setJMSReplyTo(replyTo);
}
session.send(producer, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryDelay, completionListener);
}
private void doSend(Destination destination, Message message) throws JMSException {
if (message == null) {
throw new MessageFormatException("Message must not be null");
}
for (Map.Entry<String, Object> entry : messageProperties.entrySet()) {
message.setObjectProperty(entry.getKey(), entry.getValue());
}
if (correlationId != null) {
message.setJMSCorrelationID(correlationId);
}
if (correlationIdBytes != null) {
message.setJMSCorrelationIDAsBytes(correlationIdBytes);
}
if (type != null) {
message.setJMSType(type);
}
if (replyTo != null) {
message.setJMSReplyTo(replyTo);
}
session.send(producer, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryDelay, completionListener);
}
private void run() throws JMSException {
Message message = remoteBroker.session.createMessage();
if (requestId != null) {
message.setJMSReplyTo(remoteBroker.replyQueue);
message.setJMSCorrelationID(requestId);
}
message.setStringProperty("source", coordinatorAddress().toString());
message.setStringProperty("target", target.toString());
message.setStringProperty("payload", OperationCodec.toJson(op));
message.setIntProperty("operationType", getOperationType(op).toInt());
switch (target.getAddressLevel()) {
case AGENT:
remoteBroker.agentProducer.send(message);
break;
case WORKER:
remoteBroker.workerProducer.send(message);
break;
default:
throw new RuntimeException("unhandled target:" + target);
}
}
/**
* Test that the <code>Message.getPropertyNames()</code> method does not return
* the name of the JMS standard header fields (e.g. <code>JMSCorrelationID</code>).
*/
@Test
public void testGetPropertyNames() {
try {
Message message = senderSession.createMessage();
message.setJMSCorrelationID("foo");
Enumeration enumeration = message.getPropertyNames();
while (enumeration.hasMoreElements()) {
String propName = (String) enumeration.nextElement();
boolean valid = !propName.startsWith("JMS") || propName.startsWith("JMSX");
Assert.assertTrue("sec. 3.5.6 The getPropertyNames method does not return the names of " + "the JMS standard header field [e.g. JMSCorrelationID]: " +
propName, valid);
}
} catch (JMSException e) {
fail(e);
}
}
private void sendErrorResponse(final String jmsResponseStatus, final String correlationID, final Destination replyTo) {
Session session = null;
try {
session = acquireSession();
final Message responseMessage = session.createObjectMessage();
responseMessage.setJMSCorrelationID(correlationID);
responseMessage.setStringProperty(SearchClientJMSProxy.JMS_RESPONSE_STATUS_PROPERTY_NAME, jmsResponseStatus);
final MessageProducer producer = session.createProducer(replyTo);
if (log.isDebugEnabled()) {
log.debug("onSearchMessage, send ResponseMessage=" + responseMessage + " to replyTo=" + replyTo);
}
producer.send(responseMessage);
producer.close();
return;
} catch (final JMSException e) {
log.error("error when receiving jms messages", e);
return; // signal search not available
} finally {
releaseSession(session);
}
}
private void sendErrorResponse(final String jmsResponseStatus, final String correlationID, final Destination replyTo) {
Session session = null;
try {
session = acquireSession();
final Message responseMessage = session.createObjectMessage();
responseMessage.setJMSCorrelationID(correlationID);
responseMessage.setStringProperty(SearchClientProxy.JMS_RESPONSE_STATUS_PROPERTY_NAME, jmsResponseStatus);
final MessageProducer producer = session.createProducer(replyTo);
if (log.isDebugEnabled()) {
log.debug("onSearchMessage, send ResponseMessage=" + responseMessage + " to replyTo=" + replyTo);
}
producer.send(responseMessage);
producer.close();
return;
} catch (final JMSException e) {
log.error("error when receiving jms messages", e);
return; // signal search not available
} finally {
releaseSession(session);
}
}
private void sentMessageWithCorrelationIdTestImpl(String stringCorrelationId, Object correlationIdForAmqpMessageClass) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
//Set matcher to validate the correlation-id
propsMatcher.withCorrelationId(equalTo(correlationIdForAmqpMessageClass));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
testPeer.expectTransfer(messageMatcher);
Message message = session.createTextMessage();
message.setJMSCorrelationID(stringCorrelationId);
producer.send(message);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Override
public JMSProducer send(final Destination destination, final Message message) {
if (message == null) {
throw new MessageFormatRuntimeException("null message");
}
try {
if (jmsHeaderCorrelationID != null) {
message.setJMSCorrelationID(jmsHeaderCorrelationID);
}
if (jmsHeaderCorrelationIDAsBytes != null && jmsHeaderCorrelationIDAsBytes.length > 0) {
message.setJMSCorrelationIDAsBytes(jmsHeaderCorrelationIDAsBytes);
}
if (jmsHeaderReplyTo != null) {
message.setJMSReplyTo(jmsHeaderReplyTo);
}
if (jmsHeaderType != null) {
message.setJMSType(jmsHeaderType);
}
setProperties(message);
if (completionListener != null) {
producer.send(destination, message, completionListener);
} else {
producer.send(destination, message);
}
} catch (final JMSException e) {
throw toRuntimeException(e);
}
return this;
}
private void doSend(Destination destination, Message message) throws JMSException {
if (message == null) {
throw new MessageFormatException("Message must not be null");
}
for (Map.Entry<String, Object> entry : messageProperties.entrySet()) {
message.setObjectProperty(entry.getKey(), entry.getValue());
}
if (correlationId != null) {
message.setJMSCorrelationID(correlationId);
}
if (correlationIdBytes != null) {
message.setJMSCorrelationIDAsBytes(correlationIdBytes);
}
if (type != null) {
message.setJMSType(type);
}
if (replyTo != null) {
message.setJMSReplyTo(replyTo);
}
if (completionListener != null) {
producer.send(destination, message, deliveryMode, priority, timeToLive, completionListener);
} else {
producer.send(destination, message, deliveryMode, priority, timeToLive);
}
}
@Test
public void testJMSDestination() throws Exception {
Message m1 = queueProducerSession.createMessage();
// Test with correlation id containing a message id
final String messageID = "ID:812739812378";
m1.setJMSCorrelationID(messageID);
queueProducer.send(m1);
Message m2 = queueConsumer.receive();
ProxyAssertSupport.assertEquals(messageID, m2.getJMSCorrelationID());
// Test with correlation id containing an application defined string
Message m3 = queueProducerSession.createMessage();
final String appDefinedID = "oiwedjiwjdoiwejdoiwjd";
m3.setJMSCorrelationID(appDefinedID);
queueProducer.send(m3);
Message m4 = queueConsumer.receive();
ProxyAssertSupport.assertEquals(appDefinedID, m4.getJMSCorrelationID());
// Test with correlation id containing a byte[]
Message m5 = queueProducerSession.createMessage();
final byte[] bytes = new byte[]{-111, 45, 106, 3, -44};
m5.setJMSCorrelationIDAsBytes(bytes);
queueProducer.send(m5);
Message m6 = queueConsumer.receive();
assertByteArraysEqual(bytes, m6.getJMSCorrelationIDAsBytes());
}
protected void prepareMessage(final Message m) throws JMSException {
m.setBooleanProperty("booleanProperty", true);
m.setByteProperty("byteProperty", (byte) 3);
m.setDoubleProperty("doubleProperty", 4.0);
m.setFloatProperty("floatProperty", 5.0f);
m.setIntProperty("intProperty", 6);
m.setLongProperty("longProperty", 7);
m.setShortProperty("shortProperty", (short) 8);
m.setStringProperty("stringProperty", "this is a String property");
m.setJMSCorrelationID("this is the correlation ID");
m.setJMSReplyTo(ActiveMQServerTestCase.topic1);
m.setJMSType("someArbitraryType");
}
void onSpellMessage(final String spellText, final String correlationID, final Destination replyTo) {
Session session = null;
try {
final Set<String> spellStrings = this.spellCheck(spellText);
if (spellStrings != null) {
final ArrayList<String> spellStringList = new ArrayList<String>(spellStrings);
session = acquireSession();
final Message responseMessage = session.createObjectMessage(spellStringList);
responseMessage.setJMSCorrelationID(correlationID);
final MessageProducer producer = session.createProducer(replyTo);
producer.send(responseMessage);
producer.close();
return;
}
return; // signal search not available
} catch (final JMSException e) {
log.error("error when receiving jms messages", e);
return; // signal search not available
// do not throw exceptions here throw new OLATRuntimeException();
} catch (final Throwable th) {
log.error("error at ClusteredSearchProvider.receive()", th);
return;// signal search not available
// do not throw exceptions throw new OLATRuntimeException();
} finally {
releaseSession(session);
DBFactory.getInstance().commitAndCloseSession();
}
}
private static void setJmsHeader(final MessageDescription messageDescription,
final Message message)
throws JMSException
{
final HashMap<MessageDescription.MessageHeader, Serializable> header =
messageDescription.getHeaders();
if (header == null)
{
return;
}
for (Map.Entry<MessageDescription.MessageHeader, Serializable> entry : header.entrySet())
{
try
{
switch (entry.getKey())
{
case DESTINATION:
message.setJMSDestination((Destination) entry.getValue());
break;
case DELIVERY_MODE:
message.setJMSDeliveryMode((Integer) entry.getValue());
break;
case MESSAGE_ID:
message.setJMSMessageID((String) entry.getValue());
break;
case TIMESTAMP:
message.setJMSTimestamp((Long) entry.getValue());
break;
case CORRELATION_ID:
if (entry.getValue() instanceof byte[])
{
message.setJMSCorrelationIDAsBytes((byte[]) entry.getValue());
}
else
{
message.setJMSCorrelationID((String) entry.getValue());
}
break;
case REPLY_TO:
throw new RuntimeException("The Test should not set the replyTo header."
+ " It should rather use the dedicated method");
case REDELIVERED:
message.setJMSRedelivered((Boolean) entry.getValue());
break;
case TYPE:
message.setJMSType((String) entry.getValue());
break;
case EXPIRATION:
message.setJMSExpiration((Long) entry.getValue());
break;
case PRIORITY:
message.setJMSPriority((Integer) entry.getValue());
break;
default:
throw new RuntimeException(String.format("unexpected message header '%s'", entry.getKey()));
}
}
catch (ClassCastException e)
{
throw new RuntimeException(String.format("Could not set message header '%s' to this value: %s",
entry.getKey(),
entry.getValue()), e);
}
}
}
void onSearchMessage(final SearchRequest searchRequest, final String correlationID, final Destination replyTo) {
if (log.isDebugEnabled()) {
log.debug("onSearchMessage, correlationID=" + correlationID + " , replyTo=" + replyTo + " , searchRequest=" + searchRequest);
}
Session session = null;
try {
final Identity identity = baseSecurity.loadIdentityByKey(searchRequest.getIdentityId());
final SearchResults searchResults = this.doSearch(searchRequest.getQueryString(), searchRequest.getCondQueries(), identity, searchRequest.getRoles(),
searchRequest.getFirstResult(), searchRequest.getMaxResults(), searchRequest.isDoHighlighting());
if (log.isDebugEnabled()) {
log.debug("searchResults: " + searchResults.getLength());
}
if (searchResults != null) {
session = acquireSession();
final Message responseMessage = session.createObjectMessage(searchResults);
responseMessage.setJMSCorrelationID(correlationID);
responseMessage.setStringProperty(SearchClientJMSProxy.JMS_RESPONSE_STATUS_PROPERTY_NAME, SearchClientJMSProxy.JMS_RESPONSE_STATUS_OK);
final MessageProducer producer = session.createProducer(replyTo);
if (log.isDebugEnabled()) {
log.debug("onSearchMessage, send ResponseMessage=" + responseMessage + " to replyTo=" + replyTo);
}
producer.send(responseMessage);
producer.close();
return;
} else {
log.info("onSearchMessage, no searchResults (searchResults=null)");
}
} catch (final JMSException e) {
log.error("error when receiving jms messages", e);
return; // signal search not available
// do not throw exceptions here throw new OLATRuntimeException();
} catch (final ServiceNotAvailableException sex) {
sendErrorResponse(SearchClientJMSProxy.JMS_RESPONSE_STATUS_SERVICE_NOT_AVAILABLE_EXCEPTION, correlationID, replyTo);
} catch (final QueryException qex) {
sendErrorResponse(SearchClientJMSProxy.JMS_RESPONSE_STATUS_QUERY_EXCEPTION, correlationID, replyTo);
} catch (final Throwable th) {
log.error("error at ClusteredSearchProvider.receive()", th);
return;// signal search not available
// do not throw exceptions throw new OLATRuntimeException();
} finally {
releaseSession(session);
DBFactory.getInstance().commitAndCloseSession();
}
}
@Test
public void headers() throws Exception
{
final Queue queue = createQueue(getTestName());
final Destination replyTo = createQueue(getTestName() + "_replyTo");
final Connection consumerConnection = getConnection();
try
{
final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = consumerSession.createConsumer(queue);
final String correlationId = "testCorrelationId";
final String jmsType = "testJmsType";
final int priority = 1;
final long timeToLive = 30 * 60 * 1000;
final Connection producerConnection = getConnection();
try
{
final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = producerSession.createProducer(queue);
final Message message = producerSession.createMessage();
message.setJMSCorrelationID(correlationId);
message.setJMSType(jmsType);
message.setJMSReplyTo(replyTo);
long currentTime = System.currentTimeMillis();
producer.send(message, DeliveryMode.NON_PERSISTENT, priority, timeToLive);
consumerConnection.start();
Message receivedMessage = consumer.receive(getReceiveTimeout());
assertNotNull(receivedMessage);
assertEquals("JMSCorrelationID mismatch", correlationId, receivedMessage.getJMSCorrelationID());
assertEquals("JMSType mismatch", message.getJMSType(), receivedMessage.getJMSType());
assertEquals("JMSReply To mismatch", message.getJMSReplyTo(), receivedMessage.getJMSReplyTo());
assertTrue("JMSMessageID does not start 'ID:'", receivedMessage.getJMSMessageID().startsWith("ID:"));
assertEquals("JMSPriority mismatch", priority, receivedMessage.getJMSPriority());
assertTrue(String.format(
"Unexpected JMSExpiration: got '%d', but expected value equals or greater than '%d'",
receivedMessage.getJMSExpiration(),
currentTime + timeToLive),
receivedMessage.getJMSExpiration() >= currentTime + timeToLive
&& receivedMessage.getJMSExpiration() <= System.currentTimeMillis() + timeToLive);
}
finally
{
producerConnection.close();
}
}
finally
{
consumerConnection.close();
}
}
/**
* @param testcase
* @param session
* @param rtd
* @return
* @throws JMSException
*/
public static Message buildJMSMessageFromTestCase(TestCaseType testcase, Session session,
Destination rtd) throws JMSException {
MessagePropertiesType messageProperties = testcase.getRequestMessage();
Message jmsMessage = null;
String messageType = messageProperties.getMessageType();
if ("text".equals(messageType)) {
jmsMessage = session.createTextMessage();
((TextMessage)jmsMessage).setText("test");
} else if ("byte".equals(messageType)) {
jmsMessage = session.createBytesMessage();
} else if ("stream".equals(messageType)) {
jmsMessage = session.createStreamMessage();
((StreamMessage)jmsMessage).writeString("test");
} else {
jmsMessage = session.createBytesMessage();
}
jmsMessage.setJMSReplyTo(rtd);
if (messageProperties.isSetDeliveryMode()) {
jmsMessage.setJMSDeliveryMode(messageProperties.getDeliveryMode());
}
if (messageProperties.isSetExpiration()) {
jmsMessage.setJMSExpiration(messageProperties.getExpiration());
}
if (messageProperties.isSetPriority()) {
jmsMessage.setJMSPriority(messageProperties.getPriority());
}
if (messageProperties.isSetExpiration()) {
jmsMessage.setJMSPriority(messageProperties.getExpiration());
}
if (messageProperties.isSetCorrelationID()) {
jmsMessage.setJMSCorrelationID(messageProperties.getCorrelationID());
}
if (messageProperties.isSetTargetService()
&& !"".equals(messageProperties.getTargetService().trim())) {
jmsMessage.setStringProperty(JMSSpecConstants.TARGETSERVICE_FIELD, messageProperties
.getTargetService().trim());
}
if (messageProperties.isSetBindingVersion()
&& !"".equals(messageProperties.getBindingVersion().trim())) {
jmsMessage.setStringProperty(JMSSpecConstants.BINDINGVERSION_FIELD, messageProperties
.getBindingVersion().trim());
}
if (messageProperties.isSetContentType()
&& !"".equals(messageProperties.getContentType().trim())) {
jmsMessage.setStringProperty(JMSSpecConstants.CONTENTTYPE_FIELD, messageProperties
.getContentType().trim());
}
if (messageProperties.isSetSoapAction()
&& !"".equals(messageProperties.getSoapAction().trim())) {
jmsMessage.setStringProperty(JMSSpecConstants.SOAPACTION_FIELD, messageProperties
.getSoapAction().trim());
}
if (messageProperties.isSetRequestURI()
&& !"".equals(messageProperties.getRequestURI().trim())) {
jmsMessage.setStringProperty(JMSSpecConstants.REQUESTURI_FIELD, messageProperties
.getRequestURI().trim());
}
return jmsMessage;
}
public void run() {
QueueConnection connection = null;
StandardLogger logger = LoggerUtil.getStandardLogger();
try {
String txt = message.getText();
if (logger.isDebugEnabled()) {
logger.debug("JMS Listener receives request: " + txt);
}
String resp;
ListenerHelper helper = new ListenerHelper();
Map<String, String> metaInfo = new HashMap<>();
metaInfo.put(Listener.METAINFO_PROTOCOL, Listener.METAINFO_PROTOCOL_JMS);
metaInfo.put(Listener.METAINFO_REQUEST_PATH, getQueueName());
metaInfo.put(Listener.METAINFO_SERVICE_CLASS, this.getClass().getName());
metaInfo.put(Listener.METAINFO_REQUEST_ID, message.getJMSMessageID());
metaInfo.put(Listener.METAINFO_CORRELATION_ID, message.getJMSCorrelationID());
if (message.getJMSReplyTo() != null)
metaInfo.put("ReplyTo", message.getJMSReplyTo().toString());
resp = helper.processRequest(txt, metaInfo);
Queue respQueue = (Queue) message.getJMSReplyTo();
String correlId = message.getJMSCorrelationID();
if (resp != null && respQueue != null) {
// String msgId = jmsMessage.getJMSMessageID();
QueueConnectionFactory qcf
= JMSServices.getInstance().getQueueConnectionFactory(null);
connection = qcf.createQueueConnection();
Message respMsg;
try (QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)) {
try (QueueSender sender = session.createSender(respQueue)) {
respMsg = session.createTextMessage(resp);
respMsg.setJMSCorrelationID(correlId);
sender.send(respMsg);
}
}
if (logger.isDebugEnabled()) {
logger.debug("JMS Listener sends response (corr id='" +
correlId + "'): " + resp);
}
}
}
catch (Throwable ex) {
logger.error(ex.getMessage(), ex);
}
finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
}
}
}
private Message doSearchRequest(final Session session, final Message message) throws JMSException {
final Destination replyQueue = acquireTempQueue(session);
if (log.isDebugEnabled()) {
log.debug("doSearchRequest replyQueue=" + replyQueue);
}
try {
final MessageConsumer responseConsumer = session.createConsumer(replyQueue);
message.setJMSReplyTo(replyQueue);
final String correlationId = createRandomString();
message.setJMSCorrelationID(correlationId);
final MessageProducer producer = session.createProducer(searchQueue_);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setTimeToLive(timeToLive_);
if (log.isDebugEnabled()) {
log.debug("Sending search request message with correlationId=" + correlationId);
}
producer.send(message);
producer.close();
Message returnedMessage = null;
final long start = System.currentTimeMillis();
while (true) {
final long diff = (start + receiveTimeout_) - System.currentTimeMillis();
if (diff <= 0) {
// timeout
log.info("Timeout in search. Remaining time zero or negative.");
break;
}
if (log.isDebugEnabled()) {
log.debug("doSearchRequest: call receive with timeout=" + diff);
}
returnedMessage = responseConsumer.receive(diff);
if (returnedMessage == null) {
// timeout case, we're stopping now with a reply...
log.info("Timeout in search. Repy was null.");
break;
} else if (!correlationId.equals(returnedMessage.getJMSCorrelationID())) {
// we got an old reply from a previous search request
log.info("Got a response with a wrong correlationId. Ignoring and waiting for the next");
continue;
} else {
// we got a valid reply
break;
}
}
responseConsumer.close();
if (log.isDebugEnabled()) {
log.debug("doSearchRequest: returnedMessage=" + returnedMessage);
}
return returnedMessage;
} finally {
releaseTempQueue(replyQueue);
}
}
/**
* Post-process the given response message before it will be sent.
* <p>The default implementation sets the response's correlation id
* to the request message's correlation id, if any; otherwise to the
* request message id.
* @param request the original incoming JMS message
* @param response the outgoing JMS message about to be sent
* @throws JMSException if thrown by JMS API methods
* @see javax.jms.Message#setJMSCorrelationID
*/
protected void postProcessResponse(Message request, Message response) throws JMSException {
String correlation = request.getJMSCorrelationID();
if (correlation == null) {
correlation = request.getJMSMessageID();
}
response.setJMSCorrelationID(correlation);
}
/**
* Post-process the given response message before it will be sent.
* <p>The default implementation sets the response's correlation id
* to the request message's correlation id, if any; otherwise to the
* request message id.
* @param request the original incoming JMS message
* @param response the outgoing JMS message about to be sent
* @throws JMSException if thrown by JMS API methods
* @see javax.jms.Message#setJMSCorrelationID
*/
protected void postProcessResponse(Message request, Message response) throws JMSException {
String correlation = request.getJMSCorrelationID();
if (correlation == null) {
correlation = request.getJMSMessageID();
}
response.setJMSCorrelationID(correlation);
}