下面列出了javax.jms.Message#getJMSMessageID ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void testVerifyConsumerAndSessionCloseRedeliveryWithFailoverTransport() throws Throwable {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive(1000);
String id = message.getJMSMessageID();
assertNotNull(message);
LOG.info("got message " + message);
consumer.close();
session.close();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumer = session.createConsumer(destination);
message = consumer.receive(1000);
session.commit();
assertNotNull(message);
assertEquals("redelivered message", id, message.getJMSMessageID());
assertEquals(2, message.getLongProperty("JMSXDeliveryCount"));
}
/**
* Determine a Jackson JavaType for the given JMS Message,
* typically parsing a type id message property.
* <p>The default implementation parses the configured type id property name
* and consults the configured type id mapping. This can be overridden with
* a different strategy, e.g. doing some heuristics based on message origin.
* @param message the JMS Message to set the type id on
* @throws JMSException if thrown by JMS methods
* @see #setTypeIdOnMessage(Object, javax.jms.Message)
* @see #setTypeIdPropertyName(String)
* @see #setTypeIdMappings(java.util.Map)
*/
protected JavaType getJavaTypeForMessage(Message message) throws JMSException {
String typeId = message.getStringProperty(this.typeIdPropertyName);
if (typeId == null) {
throw new MessageConversionException(
"Could not find type id property [" + this.typeIdPropertyName + "] on message [" +
message.getJMSMessageID() + "] from destination [" + message.getJMSDestination() + "]");
}
Class<?> mappedClass = this.idClassMappings.get(typeId);
if (mappedClass != null) {
return this.objectMapper.getTypeFactory().constructType(mappedClass);
}
try {
Class<?> typeClass = ClassUtils.forName(typeId, this.beanClassLoader);
return this.objectMapper.getTypeFactory().constructType(typeClass);
}
catch (Throwable ex) {
throw new MessageConversionException("Failed to resolve type id [" + typeId + "]", ex);
}
}
public static String[] sendMessages(final ConnectionFactory cf,
final Destination destination,
final int messagesToSend) throws Exception {
String[] messageIDs = new String[messagesToSend];
Connection conn = cf.createConnection();
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = s.createProducer(destination);
for (int i = 0; i < messagesToSend; i++) {
Message m = s.createTextMessage(RandomUtil.randomString());
producer.send(m);
messageIDs[i] = m.getJMSMessageID();
}
conn.close();
return messageIDs;
}
@Test
public void resentJMSMessageGetsReplacementJMSMessageID() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
final Message sentMessage = session.createMessage();
producer.send(sentMessage);
final String originalId = sentMessage.getJMSMessageID();
assertNotNull("JMSMessageID must be set after first publish", originalId);
producer.send(sentMessage);
final String firstResendID = sentMessage.getJMSMessageID();
assertNotNull("JMSMessageID must be set after first resend", firstResendID);
assertNotSame("JMSMessageID must be changed second publish", originalId, firstResendID);
}
finally
{
connection.close();
}
}
/**
* Gets the message ID from a JMS message.
*
* @param message The message.
* @return The ID or {@code null} if the message does not contain the corresponding property.
*/
public static String getMessageID(final Message message) {
try {
return message.getJMSMessageID();
} catch (final JMSException e) {
return null;
}
}
@Override
public void onMessage(Message message) {
try {
order = Jms.getEntity(message, Order.class);
messageID = message.getJMSMessageID();
} catch (Exception e) {
e.printStackTrace();
}
latch.countDown();
}
private void addMessageIDInHeader(final Message msg) throws Exception {
// We concatenate the old message id as a header in the message
// This allows the target to then use this as the JMSCorrelationID of any response message
// thus enabling a distributed request-response pattern.
// Each bridge (if there are more than one) in the chain can concatenate the message id
// So in the case of multiple bridges having routed the message this can be used in a multi-hop
// distributed request/response
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Adding old message id in Message header");
}
JMSBridgeImpl.copyProperties(msg);
String val = null;
val = msg.getStringProperty(ActiveMQJMSConstants.AMQ_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
if (val == null) {
val = msg.getJMSMessageID();
} else {
StringBuffer sb = new StringBuffer(val);
sb.append(",").append(msg.getJMSMessageID());
val = sb.toString();
}
msg.setStringProperty(ActiveMQJMSConstants.AMQ_MESSAGING_BRIDGE_MESSAGE_ID_LIST, val);
}
@Nullable static String messageId(Message message) {
try {
return message.getJMSMessageID();
} catch (Throwable t) {
propagateIfFatal(t);
log(t, "error getting getJMSMessageID of message {0}", message, null);
}
return null;
}
/**
* Sends and consumes the messages.
*
* @throws Exception
*/
public void testRecover() throws Exception {
String text = "TEST";
Message sendMessage = session.createTextMessage(text);
if (verbose) {
LOG.info("About to send a message: " + sendMessage + " with text: " + text);
}
producer.send(producerDestination, sendMessage);
// receive but don't acknowledge
Message unackMessage = consumer.receive(initRedeliveryDelay + 1000);
assertNotNull(unackMessage);
String unackId = unackMessage.getJMSMessageID();
assertEquals(((TextMessage) unackMessage).getText(), text);
assertFalse(unackMessage.getJMSRedelivered());
// assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"),1);
// receive then acknowledge
consumeSession.recover();
Message ackMessage = consumer.receive(initRedeliveryDelay + 1000);
assertNotNull(ackMessage);
ackMessage.acknowledge();
String ackId = ackMessage.getJMSMessageID();
assertEquals(((TextMessage) ackMessage).getText(), text);
assertTrue(ackMessage.getJMSRedelivered());
// assertEquals(ackMessage.getIntProperty("JMSXDeliveryCount"),2);
assertEquals(unackId, ackId);
consumeSession.recover();
assertNull(consumer.receiveNoWait());
}
public MessageDump toDumpMessage(Message msg) throws JMSException{
MessageDump dump = new MessageDump();
dump.JMSCorrelationID = msg.getJMSCorrelationID();
dump.JMSMessageID = msg.getJMSMessageID();
dump.JMSType = msg.getJMSType();
dump.JMSDeliveryMode = msg.getJMSDeliveryMode();
dump.JMSExpiration = msg.getJMSExpiration();
dump.JMSRedelivered = msg.getJMSRedelivered();
dump.JMSTimestamp = msg.getJMSTimestamp();
dump.JMSPriority = msg.getJMSPriority();
@SuppressWarnings("rawtypes")
Enumeration propertyNames = msg.getPropertyNames();
while(propertyNames.hasMoreElements()){
String property = (String) propertyNames.nextElement();
Object propertyValue = msg.getObjectProperty(property);
if( propertyValue instanceof String){
dump.stringProperties.put(property, (String)propertyValue);
} else if ( propertyValue instanceof Integer ){
dump.intProperties.put(property, (Integer)propertyValue);
} else if ( propertyValue instanceof Long) {
dump.longProperties.put(property, (Long)propertyValue);
} else if( propertyValue instanceof Double) {
dump.doubleProperties.put(property, (Double) propertyValue);
} else if (propertyValue instanceof Short) {
dump.shortProperties.put(property, (Short)propertyValue);
} else if (propertyValue instanceof Float) {
dump.floatProperties.put(property, (Float) propertyValue);
} else if (propertyValue instanceof Byte) {
dump.byteProperties.put(property, (Byte)propertyValue);
} else if (propertyValue instanceof Boolean) {
dump.boolProperties.put(property, (Boolean)propertyValue);
} else if (propertyValue instanceof Serializable){
// Object property.. if it's on Classpath and Serializable
byte[] propBytes = SerializationUtils.serialize((Serializable) propertyValue);
dump.objectProperties.put(property, Base64.encodeBase64String(propBytes));
} else {
// Corner case.
throw new IllegalArgumentException("Property of key '"+ property +"' is not serializable. Type is: " + propertyValue.getClass().getCanonicalName());
}
}
dump.body = "";
dump.type = "";
if (msg instanceof TextMessage) {
dump.body = ((TextMessage)msg).getText();
dump.type = "TextMessage";
} else if (msg instanceof BytesMessage) {
BytesMessage bm = (BytesMessage)msg;
byte[] bytes = new byte[(int) bm.getBodyLength()];
bm.readBytes(bytes);
dump.body = Base64.encodeBase64String(bytes);
dump.type = "BytesMessage";
} else if (msg instanceof ObjectMessage) {
ObjectMessage om = (ObjectMessage)msg;
byte[] objectBytes = SerializationUtils.serialize(om.getObject());
dump.body = Base64.encodeBase64String(objectBytes);
dump.type = "ObjectMessage";
}
return dump;
}
@Test(timeout=20000)
public void testSendingMessageWithPrefixedUUIDStringMessageIdFormat() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// DONT create a test fixture, we will drive everything directly.
String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=PREFIXED_UUID_STRING";
JmsConnectionFactory factory = new JmsConnectionFactory(uri);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.expectSenderAttach();
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
String text = "myMessage";
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withMessageId(isA(String.class));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
testPeer.expectClose();
Message message = session.createTextMessage(text);
assertNull("JMSMessageID should not yet be set", message.getJMSMessageID());
producer.send(message);
String jmsMessageID = message.getJMSMessageID();
assertNotNull("JMSMessageID should be set", jmsMessageID);
assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
// Get the value that was actually transmitted/received, verify it is a String,
// verify it is the "ID:" prefix followed by the UUID toString, check the
// JMSMessageID value that we have locally matches exactly.
Object receivedMessageId = propsMatcher.getReceivedMessageId();
String uuidToString = jmsMessageID.substring("ID:".length());
UUID.fromString(uuidToString);
assertTrue("Expected String message id to be sent", receivedMessageId instanceof String);
assertEquals("Expected UUID toString value to be present in AMQP message", jmsMessageID, receivedMessageId);
}
}
public static Map<String, String> createAttributeMap(final Message message) throws JMSException {
final Map<String, String> attributes = new HashMap<>();
final Enumeration<?> enumeration = message.getPropertyNames();
while (enumeration.hasMoreElements()) {
final String propName = (String) enumeration.nextElement();
final Object value = message.getObjectProperty(propName);
if (value == null) {
attributes.put(ATTRIBUTE_PREFIX + propName, "");
attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, "Unknown");
continue;
}
final String valueString = value.toString();
attributes.put(ATTRIBUTE_PREFIX + propName, valueString);
final String propType;
if (value instanceof String) {
propType = PROP_TYPE_STRING;
} else if (value instanceof Double) {
propType = PROP_TYPE_DOUBLE;
} else if (value instanceof Float) {
propType = PROP_TYPE_FLOAT;
} else if (value instanceof Long) {
propType = PROP_TYPE_LONG;
} else if (value instanceof Integer) {
propType = PROP_TYPE_INTEGER;
} else if (value instanceof Short) {
propType = PROP_TYPE_SHORT;
} else if (value instanceof Byte) {
propType = PROP_TYPE_BYTE;
} else if (value instanceof Boolean) {
propType = PROP_TYPE_BOOLEAN;
} else {
propType = PROP_TYPE_OBJECT;
}
attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, propType);
}
if (message.getJMSCorrelationID() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
}
if (message.getJMSDestination() != null) {
String destinationName;
if (message.getJMSDestination() instanceof Queue) {
destinationName = ((Queue) message.getJMSDestination()).getQueueName();
} else {
destinationName = ((Topic) message.getJMSDestination()).getTopicName();
}
attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName);
}
if (message.getJMSMessageID() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());
}
if (message.getJMSReplyTo() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_REPLY_TO, message.getJMSReplyTo().toString());
}
if (message.getJMSType() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_TYPE, message.getJMSType());
}
attributes.put(ATTRIBUTE_PREFIX + JMS_DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode()));
attributes.put(ATTRIBUTE_PREFIX + JMS_EXPIRATION, String.valueOf(message.getJMSExpiration()));
attributes.put(ATTRIBUTE_PREFIX + JMS_PRIORITY, String.valueOf(message.getJMSPriority()));
attributes.put(ATTRIBUTE_PREFIX + JMS_REDELIVERED, String.valueOf(message.getJMSRedelivered()));
attributes.put(ATTRIBUTE_PREFIX + JMS_TIMESTAMP, String.valueOf(message.getJMSTimestamp()));
return attributes;
}
/**
* Gets the key to use for the Kafka Connect SourceRecord.
*
* @param context the JMS context to use for building messages
* @param topic the Kafka topic
* @param message the message
*
* @return the Kafka Connect SourceRecord's key
*
* @throws JMSException Message could not be converted
*/
public SchemaAndValue getKey(JMSContext context, String topic, Message message) throws JMSException {
Schema keySchema = null;
Object key = null;
String keystr;
switch (keyheader) {
case MESSAGE_ID:
keySchema = Schema.OPTIONAL_STRING_SCHEMA;
keystr = message.getJMSMessageID();
if (keystr.startsWith("ID:", 0)) {
key = keystr.substring(3);
}
else {
key = keystr;
}
break;
case CORRELATION_ID:
keySchema = Schema.OPTIONAL_STRING_SCHEMA;
keystr = message.getJMSCorrelationID();
if (keystr.startsWith("ID:", 0)) {
key = keystr.substring(3);
}
else {
key = keystr;
}
break;
case CORRELATION_ID_AS_BYTES:
keySchema = Schema.OPTIONAL_BYTES_SCHEMA;
key = message.getJMSCorrelationIDAsBytes();
break;
case DESTINATION:
keySchema = Schema.OPTIONAL_STRING_SCHEMA;
key = message.getJMSDestination().toString();
break;
default:
break;
}
return new SchemaAndValue(keySchema, key);
}
private static void verifyMessageHeaders(final MessageDescription messageDescription,
final Message message) throws VerificationException
{
try
{
for (Map.Entry<MessageDescription.MessageHeader, Serializable> entry : messageDescription.getHeaders()
.entrySet())
{
Object actualValue;
switch (entry.getKey())
{
case DESTINATION:
actualValue = message.getJMSDestination();
break;
case DELIVERY_MODE:
actualValue = message.getJMSDeliveryMode();
break;
case MESSAGE_ID:
actualValue = message.getJMSMessageID();
break;
case TIMESTAMP:
actualValue = message.getJMSTimestamp();
break;
case CORRELATION_ID:
if (entry.getValue() instanceof byte[])
{
actualValue = message.getJMSCorrelationIDAsBytes();
}
else
{
actualValue = message.getJMSCorrelationID();
}
break;
case REPLY_TO:
actualValue = message.getJMSReplyTo();
break;
case REDELIVERED:
actualValue = message.getJMSRedelivered();
break;
case TYPE:
actualValue = message.getJMSType();
break;
case EXPIRATION:
actualValue = message.getJMSExpiration();
break;
case PRIORITY:
actualValue = message.getJMSPriority();
break;
default:
throw new RuntimeException(String.format("unexpected message header '%s'", entry.getKey()));
}
verifyEquals(String.format("Unexpected message header '%s'", entry.getKey()),
entry.getValue(),
actualValue);
}
}
catch (JMSException e)
{
throw new RuntimeException("Unexpected exception during message header verification", e);
}
}
@Test(timeout=20000)
public void testSendingMessageWithUUIDMessageIdFormat() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// DONT create a test fixture, we will drive everything directly.
String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=UUID";
JmsConnectionFactory factory = new JmsConnectionFactory(uri);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.expectSenderAttach();
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
String text = "myMessage";
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withMessageId(isA(UUID.class));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
testPeer.expectClose();
Message message = session.createTextMessage(text);
assertNull("JMSMessageID should not yet be set", message.getJMSMessageID());
producer.send(message);
String jmsMessageID = message.getJMSMessageID();
assertNotNull("JMSMessageID should be set", jmsMessageID);
assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
String uuidEncodingPrefix = AmqpMessageIdHelper.JMS_ID_PREFIX + AmqpMessageIdHelper.AMQP_UUID_PREFIX;
assertTrue("The 'UUID prefix' encoding hint was not found", jmsMessageID.startsWith(uuidEncodingPrefix));
connection.close();
// Get the value that was actually transmitted/received, verify it is a UUID, compare to what we have locally
testPeer.waitForAllHandlersToComplete(1000);
Object receivedMessageId = propsMatcher.getReceivedMessageId();
assertTrue("Expected UUID message id to be sent", receivedMessageId instanceof UUID);
assertTrue("Expected JMSMessageId value to be present in AMQP message", jmsMessageID.endsWith(receivedMessageId.toString()));
}
}
private void recieveMessageIdSendCorrelationIdTestImpl(Object amqpIdObject, String expectedMessageId) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
PropertiesDescribedType props = new PropertiesDescribedType();
props.setMessageId(amqpIdObject);
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(3000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
String jmsMessageID = receivedMessage.getJMSMessageID();
assertEquals("Unexpected value for JMSMessageID", expectedMessageId, jmsMessageID);
//Now take the received JMSMessageID, and send a message with it set
//as the JMSCorrelationID and verify we send the same AMQP id as we started with.
testPeer.expectSenderAttach();
MessageProducer producer = session.createProducer(queue);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
//Set matcher to validate the correlation-id on the wire matches the previous message-id
propsMatcher.withCorrelationId(equalTo(amqpIdObject));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
testPeer.expectTransfer(messageMatcher);
Message message = session.createTextMessage();
message.setJMSCorrelationID(jmsMessageID);
producer.send(message);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testSendingMessageWithUUIDStringMessageIdFormat() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// DONT create a test fixture, we will drive everything directly.
String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=UUID_STRING";
JmsConnectionFactory factory = new JmsConnectionFactory(uri);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.expectSenderAttach();
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
String text = "myMessage";
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withMessageId(isA(String.class));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
testPeer.expectClose();
Message message = session.createTextMessage(text);
assertNull("JMSMessageID should not yet be set", message.getJMSMessageID());
producer.send(message);
String jmsMessageID = message.getJMSMessageID();
assertNotNull("JMSMessageID should be set", jmsMessageID);
assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
String noIdPrefix = AmqpMessageIdHelper.JMS_ID_PREFIX + AmqpMessageIdHelper.AMQP_NO_PREFIX;
assertTrue("The 'No ID prefix' encoding hint was not found", jmsMessageID.startsWith(noIdPrefix));
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
// Get the value that was actually transmitted/received, verify it is a String,
// verify it is only the UUID toString and has no "ID", check the encoded
// JMSMessageID value that we have locally.
Object receivedMessageId = propsMatcher.getReceivedMessageId();
String expected = jmsMessageID.substring(noIdPrefix.length());
UUID.fromString(expected);
assertTrue("Expected String message id to be sent", receivedMessageId instanceof String);
assertEquals("Expected UUID toString value to be present in AMQP message", expected, receivedMessageId);
}
}
public static Map<String, String> createAttributeMap(final Message message) throws JMSException {
final Map<String, String> attributes = new HashMap<>();
final Enumeration<?> enumeration = message.getPropertyNames();
while (enumeration.hasMoreElements()) {
final String propName = (String) enumeration.nextElement();
final Object value = message.getObjectProperty(propName);
if (value == null) {
attributes.put(ATTRIBUTE_PREFIX + propName, "");
attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, "Unknown");
continue;
}
final String valueString = value.toString();
attributes.put(ATTRIBUTE_PREFIX + propName, valueString);
final String propType;
if (value instanceof String) {
propType = PROP_TYPE_STRING;
} else if (value instanceof Double) {
propType = PROP_TYPE_DOUBLE;
} else if (value instanceof Float) {
propType = PROP_TYPE_FLOAT;
} else if (value instanceof Long) {
propType = PROP_TYPE_LONG;
} else if (value instanceof Integer) {
propType = PROP_TYPE_INTEGER;
} else if (value instanceof Short) {
propType = PROP_TYPE_SHORT;
} else if (value instanceof Byte) {
propType = PROP_TYPE_BYTE;
} else if (value instanceof Boolean) {
propType = PROP_TYPE_BOOLEAN;
} else {
propType = PROP_TYPE_OBJECT;
}
attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, propType);
}
if (message.getJMSCorrelationID() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
}
if (message.getJMSDestination() != null) {
String destinationName;
if (message.getJMSDestination() instanceof Queue) {
destinationName = ((Queue) message.getJMSDestination()).getQueueName();
} else {
destinationName = ((Topic) message.getJMSDestination()).getTopicName();
}
attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName);
}
if (message.getJMSMessageID() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());
}
if (message.getJMSReplyTo() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_REPLY_TO, message.getJMSReplyTo().toString());
}
if (message.getJMSType() != null) {
attributes.put(ATTRIBUTE_PREFIX + JMS_TYPE, message.getJMSType());
}
attributes.put(ATTRIBUTE_PREFIX + JMS_DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode()));
attributes.put(ATTRIBUTE_PREFIX + JMS_EXPIRATION, String.valueOf(message.getJMSExpiration()));
attributes.put(ATTRIBUTE_PREFIX + JMS_PRIORITY, String.valueOf(message.getJMSPriority()));
attributes.put(ATTRIBUTE_PREFIX + JMS_REDELIVERED, String.valueOf(message.getJMSRedelivered()));
attributes.put(ATTRIBUTE_PREFIX + JMS_TIMESTAMP, String.valueOf(message.getJMSTimestamp()));
return attributes;
}
/**
* Create the invocation result response message.
* <p>The default implementation creates a JMS ObjectMessage for the given
* RemoteInvocationResult object. It sets the response's correlation id
* to the request message's correlation id, if any; otherwise to the
* request message id.
* @param request the original request message
* @param session the JMS session to use
* @param result the invocation result
* @return the message response to send
* @throws javax.jms.JMSException if creating the message failed
*/
protected Message createResponseMessage(Message request, Session session, RemoteInvocationResult result)
throws JMSException {
Message response = this.messageConverter.toMessage(result, session);
String correlation = request.getJMSCorrelationID();
if (correlation == null) {
correlation = request.getJMSMessageID();
}
response.setJMSCorrelationID(correlation);
return response;
}
/**
* Create the invocation result response message.
* <p>The default implementation creates a JMS ObjectMessage for the given
* RemoteInvocationResult object. It sets the response's correlation id
* to the request message's correlation id, if any; otherwise to the
* request message id.
* @param request the original request message
* @param session the JMS session to use
* @param result the invocation result
* @return the message response to send
* @throws javax.jms.JMSException if creating the message failed
*/
protected Message createResponseMessage(Message request, Session session, RemoteInvocationResult result)
throws JMSException {
Message response = this.messageConverter.toMessage(result, session);
String correlation = request.getJMSCorrelationID();
if (correlation == null) {
correlation = request.getJMSMessageID();
}
response.setJMSCorrelationID(correlation);
return response;
}