下面列出了javax.jms.Message#getJMSCorrelationIDAsBytes ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private ClientMessage buildClientMessage(final Message message) throws JMSException
{
String jmsMessageID = message.getJMSMessageID();
String jmsCorrelationID = message.getJMSCorrelationID();
byte[] jmsCorrelationIDAsBytes;
try
{
jmsCorrelationIDAsBytes = message.getJMSCorrelationIDAsBytes();
}
catch (JMSException e)
{
jmsCorrelationIDAsBytes = null;
}
long jmsTimestamp = message.getJMSTimestamp();
int jmsDeliveryMode = message.getJMSDeliveryMode();
boolean jmsRedelivered = message.getJMSRedelivered();
String jmsType = message.getJMSType();
long jmsExpiration = message.getJMSExpiration();
int jmsPriority = message.getJMSPriority();
return new JMSMessageAdaptor(jmsMessageID,
jmsTimestamp,
jmsCorrelationID,
jmsCorrelationIDAsBytes,
jmsDeliveryMode,
jmsRedelivered,
jmsType,
jmsExpiration,
jmsPriority);
}
/**
* Gets the key to use for the Kafka Connect SourceRecord.
*
* @param context the JMS context to use for building messages
* @param topic the Kafka topic
* @param message the message
*
* @return the Kafka Connect SourceRecord's key
*
* @throws JMSException Message could not be converted
*/
public SchemaAndValue getKey(JMSContext context, String topic, Message message) throws JMSException {
Schema keySchema = null;
Object key = null;
String keystr;
switch (keyheader) {
case MESSAGE_ID:
keySchema = Schema.OPTIONAL_STRING_SCHEMA;
keystr = message.getJMSMessageID();
if (keystr.startsWith("ID:", 0)) {
key = keystr.substring(3);
}
else {
key = keystr;
}
break;
case CORRELATION_ID:
keySchema = Schema.OPTIONAL_STRING_SCHEMA;
keystr = message.getJMSCorrelationID();
if (keystr.startsWith("ID:", 0)) {
key = keystr.substring(3);
}
else {
key = keystr;
}
break;
case CORRELATION_ID_AS_BYTES:
keySchema = Schema.OPTIONAL_BYTES_SCHEMA;
key = message.getJMSCorrelationIDAsBytes();
break;
case DESTINATION:
keySchema = Schema.OPTIONAL_STRING_SCHEMA;
key = message.getJMSDestination().toString();
break;
default:
break;
}
return new SchemaAndValue(keySchema, key);
}
private static void verifyMessageHeaders(final MessageDescription messageDescription,
final Message message) throws VerificationException
{
try
{
for (Map.Entry<MessageDescription.MessageHeader, Serializable> entry : messageDescription.getHeaders()
.entrySet())
{
Object actualValue;
switch (entry.getKey())
{
case DESTINATION:
actualValue = message.getJMSDestination();
break;
case DELIVERY_MODE:
actualValue = message.getJMSDeliveryMode();
break;
case MESSAGE_ID:
actualValue = message.getJMSMessageID();
break;
case TIMESTAMP:
actualValue = message.getJMSTimestamp();
break;
case CORRELATION_ID:
if (entry.getValue() instanceof byte[])
{
actualValue = message.getJMSCorrelationIDAsBytes();
}
else
{
actualValue = message.getJMSCorrelationID();
}
break;
case REPLY_TO:
actualValue = message.getJMSReplyTo();
break;
case REDELIVERED:
actualValue = message.getJMSRedelivered();
break;
case TYPE:
actualValue = message.getJMSType();
break;
case EXPIRATION:
actualValue = message.getJMSExpiration();
break;
case PRIORITY:
actualValue = message.getJMSPriority();
break;
default:
throw new RuntimeException(String.format("unexpected message header '%s'", entry.getKey()));
}
verifyEquals(String.format("Unexpected message header '%s'", entry.getKey()),
entry.getValue(),
actualValue);
}
}
catch (JMSException e)
{
throw new RuntimeException("Unexpected exception during message header verification", e);
}
}
@Test
public void testForeignMessageCorrelationID() throws Exception {
System.setProperty(ActiveMQJMSConstants.JMS_ACTIVEMQ_ENABLE_BYTE_ARRAY_JMS_CORRELATION_ID_PROPERTY_NAME, "true");
SimpleJMSMessage msg = new SimpleJMSMessage();
msg.setJMSCorrelationID("mycorrelationid");
byte[] bytes = new byte[]{1, 4, 3, 6, 8};
msg.setJMSCorrelationIDAsBytes(bytes);
queueProd.send(msg);
Message rec = queueCons.receive();
ProxyAssertSupport.assertNotNull(rec);
//Bytes correlation id takes precedence
byte[] bytesrec = rec.getJMSCorrelationIDAsBytes();
assertByteArraysEqual(bytes, bytesrec);
Assert.assertNull(rec.getJMSCorrelationID());
}