javax.jms.Message#getJMSMessageID ( )源码实例Demo

下面列出了javax.jms.Message#getJMSMessageID ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

public void testVerifyConsumerAndSessionCloseRedeliveryWithFailoverTransport() throws Throwable {
   Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
   MessageConsumer consumer = session.createConsumer(destination);

   Message message = consumer.receive(1000);
   String id = message.getJMSMessageID();
   assertNotNull(message);
   LOG.info("got message " + message);
   consumer.close();
   session.close();
   session = connection.createSession(true, Session.SESSION_TRANSACTED);
   consumer = session.createConsumer(destination);

   message = consumer.receive(1000);
   session.commit();
   assertNotNull(message);
   assertEquals("redelivered message", id, message.getJMSMessageID());
   assertEquals(2, message.getLongProperty("JMSXDeliveryCount"));
}
 
/**
 * Determine a Jackson JavaType for the given JMS Message,
 * typically parsing a type id message property.
 * <p>The default implementation parses the configured type id property name
 * and consults the configured type id mapping. This can be overridden with
 * a different strategy, e.g. doing some heuristics based on message origin.
 * @param message the JMS Message to set the type id on
 * @throws JMSException if thrown by JMS methods
 * @see #setTypeIdOnMessage(Object, javax.jms.Message)
 * @see #setTypeIdPropertyName(String)
 * @see #setTypeIdMappings(java.util.Map)
 */
protected JavaType getJavaTypeForMessage(Message message) throws JMSException {
	String typeId = message.getStringProperty(this.typeIdPropertyName);
	if (typeId == null) {
		throw new MessageConversionException(
				"Could not find type id property [" + this.typeIdPropertyName + "] on message [" +
				message.getJMSMessageID() + "] from destination [" + message.getJMSDestination() + "]");
	}
	Class<?> mappedClass = this.idClassMappings.get(typeId);
	if (mappedClass != null) {
		return this.objectMapper.getTypeFactory().constructType(mappedClass);
	}
	try {
		Class<?> typeClass = ClassUtils.forName(typeId, this.beanClassLoader);
		return this.objectMapper.getTypeFactory().constructType(typeClass);
	}
	catch (Throwable ex) {
		throw new MessageConversionException("Failed to resolve type id [" + typeId + "]", ex);
	}
}
 
源代码3 项目: activemq-artemis   文件: JMSUtil.java
public static String[] sendMessages(final ConnectionFactory cf,
                                    final Destination destination,
                                    final int messagesToSend) throws Exception {
   String[] messageIDs = new String[messagesToSend];

   Connection conn = cf.createConnection();

   Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageProducer producer = s.createProducer(destination);

   for (int i = 0; i < messagesToSend; i++) {
      Message m = s.createTextMessage(RandomUtil.randomString());
      producer.send(m);
      messageIDs[i] = m.getJMSMessageID();
   }

   conn.close();

   return messageIDs;
}
 
@Test
public void resentJMSMessageGetsReplacementJMSMessageID() throws Exception
{
    Queue queue = createQueue(getTestName());
    Connection connection = getConnection();
    try
    {
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        MessageProducer producer = session.createProducer(queue);

        final Message sentMessage = session.createMessage();
        producer.send(sentMessage);

        final String originalId = sentMessage.getJMSMessageID();
        assertNotNull("JMSMessageID must be set after first publish", originalId);

        producer.send(sentMessage);
        final String firstResendID = sentMessage.getJMSMessageID();
        assertNotNull("JMSMessageID must be set after first resend", firstResendID);
        assertNotSame("JMSMessageID must be changed second publish", originalId, firstResendID);
    }
    finally
    {
        connection.close();
    }
}
 
源代码5 项目: hono   文件: JmsBasedRequestResponseClient.java
/**
 * Gets the message ID from a JMS message.
 *
 * @param message The message.
 * @return The ID or {@code null} if the message does not contain the corresponding property.
 */
public static String getMessageID(final Message message) {
    try {
        return message.getJMSMessageID();
    } catch (final JMSException e) {
        return null;
    }
}
 
源代码6 项目: activemq-artemis   文件: JMSTest.java
@Override
public void onMessage(Message message) {
   try {
      order = Jms.getEntity(message, Order.class);
      messageID = message.getJMSMessageID();
   } catch (Exception e) {
      e.printStackTrace();
   }
   latch.countDown();
}
 
源代码7 项目: activemq-artemis   文件: JMSBridgeImpl.java
private void addMessageIDInHeader(final Message msg) throws Exception {
   // We concatenate the old message id as a header in the message
   // This allows the target to then use this as the JMSCorrelationID of any response message
   // thus enabling a distributed request-response pattern.
   // Each bridge (if there are more than one) in the chain can concatenate the message id
   // So in the case of multiple bridges having routed the message this can be used in a multi-hop
   // distributed request/response
   if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
      ActiveMQJMSBridgeLogger.LOGGER.trace("Adding old message id in Message header");
   }

   JMSBridgeImpl.copyProperties(msg);

   String val = null;

   val = msg.getStringProperty(ActiveMQJMSConstants.AMQ_MESSAGING_BRIDGE_MESSAGE_ID_LIST);

   if (val == null) {
      val = msg.getJMSMessageID();
   } else {
      StringBuffer sb = new StringBuffer(val);

      sb.append(",").append(msg.getJMSMessageID());

      val = sb.toString();
   }

   msg.setStringProperty(ActiveMQJMSConstants.AMQ_MESSAGING_BRIDGE_MESSAGE_ID_LIST, val);
}
 
源代码8 项目: brave   文件: MessageParser.java
@Nullable  static String messageId(Message message) {
  try {
    return message.getJMSMessageID();
  } catch (Throwable t) {
    propagateIfFatal(t);
    log(t, "error getting getJMSMessageID of message {0}", message, null);
  }
  return null;
}
 
源代码9 项目: activemq-artemis   文件: JmsTopicRedeliverTest.java
/**
 * Sends and consumes the messages.
 *
 * @throws Exception
 */
public void testRecover() throws Exception {
   String text = "TEST";
   Message sendMessage = session.createTextMessage(text);

   if (verbose) {
      LOG.info("About to send a message: " + sendMessage + " with text: " + text);
   }
   producer.send(producerDestination, sendMessage);

   // receive but don't acknowledge
   Message unackMessage = consumer.receive(initRedeliveryDelay + 1000);
   assertNotNull(unackMessage);
   String unackId = unackMessage.getJMSMessageID();
   assertEquals(((TextMessage) unackMessage).getText(), text);
   assertFalse(unackMessage.getJMSRedelivered());
   // assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"),1);

   // receive then acknowledge
   consumeSession.recover();
   Message ackMessage = consumer.receive(initRedeliveryDelay + 1000);
   assertNotNull(ackMessage);
   ackMessage.acknowledge();
   String ackId = ackMessage.getJMSMessageID();
   assertEquals(((TextMessage) ackMessage).getText(), text);
   assertTrue(ackMessage.getJMSRedelivered());
   // assertEquals(ackMessage.getIntProperty("JMSXDeliveryCount"),2);
   assertEquals(unackId, ackId);
   consumeSession.recover();
   assertNull(consumer.receiveNoWait());
}
 
源代码10 项目: a   文件: MessageDumpWriter.java
public MessageDump toDumpMessage(Message msg) throws JMSException{
	
	MessageDump dump = new MessageDump();
	dump.JMSCorrelationID = msg.getJMSCorrelationID();
	dump.JMSMessageID = msg.getJMSMessageID();
	dump.JMSType = msg.getJMSType();
	dump.JMSDeliveryMode =  msg.getJMSDeliveryMode();
	dump.JMSExpiration = msg.getJMSExpiration();
	dump.JMSRedelivered = msg.getJMSRedelivered();
	dump.JMSTimestamp =  msg.getJMSTimestamp();
	dump.JMSPriority = msg.getJMSPriority();
	
	@SuppressWarnings("rawtypes")
	Enumeration propertyNames = msg.getPropertyNames();
	while(propertyNames.hasMoreElements()){
		String property = (String) propertyNames.nextElement();
		Object propertyValue = msg.getObjectProperty(property);
		if( propertyValue instanceof String){
			dump.stringProperties.put(property, (String)propertyValue);
		} else if ( propertyValue instanceof Integer ){
			dump.intProperties.put(property, (Integer)propertyValue);
		} else if ( propertyValue instanceof Long) {
			dump.longProperties.put(property, (Long)propertyValue);
		} else if( propertyValue instanceof Double) {
			dump.doubleProperties.put(property, (Double) propertyValue);
		} else if (propertyValue instanceof Short) {
			dump.shortProperties.put(property, (Short)propertyValue);
		} else if (propertyValue instanceof Float) {
			dump.floatProperties.put(property, (Float) propertyValue);
		} else if (propertyValue instanceof Byte) {
			dump.byteProperties.put(property, (Byte)propertyValue);
		} else if (propertyValue instanceof Boolean) {
			dump.boolProperties.put(property, (Boolean)propertyValue);
		} else if (propertyValue instanceof Serializable){
			// Object property.. if it's on Classpath and Serializable
			byte[] propBytes = SerializationUtils.serialize((Serializable) propertyValue);
			dump.objectProperties.put(property, Base64.encodeBase64String(propBytes));
		} else {
			// Corner case.
			throw new IllegalArgumentException("Property of key '"+ property +"' is not serializable. Type is: " + propertyValue.getClass().getCanonicalName());
		}
	}
	
	dump.body = "";
	dump.type = "";
	
	if (msg instanceof TextMessage) {
		dump.body = ((TextMessage)msg).getText();
		dump.type = "TextMessage";
	} else if (msg instanceof BytesMessage) {
		BytesMessage bm = (BytesMessage)msg;
		byte[] bytes = new byte[(int) bm.getBodyLength()];
		bm.readBytes(bytes);
		dump.body = Base64.encodeBase64String(bytes);
		dump.type = "BytesMessage";
	} else if (msg instanceof ObjectMessage) {
		ObjectMessage om = (ObjectMessage)msg;
		byte[] objectBytes = SerializationUtils.serialize(om.getObject());
		dump.body = Base64.encodeBase64String(objectBytes);
		dump.type = "ObjectMessage";
	}
	return dump;
}
 
源代码11 项目: qpid-jms   文件: ProducerIntegrationTest.java
@Test(timeout=20000)
public void testSendingMessageWithPrefixedUUIDStringMessageIdFormat() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        // DONT create a test fixture, we will drive everything directly.
        String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=PREFIXED_UUID_STRING";
        JmsConnectionFactory factory = new JmsConnectionFactory(uri);

        testPeer.expectSaslAnonymous();
        testPeer.expectOpen();
        testPeer.expectBegin();
        testPeer.expectBegin();
        testPeer.expectSenderAttach();

        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        String queueName = "myQueue";
        Queue queue = session.createQueue(queueName);
        MessageProducer producer = session.createProducer(queue);

        String text = "myMessage";
        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withMessageId(isA(String.class));
        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(headersMatcher);
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
        messageMatcher.setPropertiesMatcher(propsMatcher);
        messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
        testPeer.expectTransfer(messageMatcher);
        testPeer.expectClose();

        Message message = session.createTextMessage(text);

        assertNull("JMSMessageID should not yet be set", message.getJMSMessageID());

        producer.send(message);

        String jmsMessageID = message.getJMSMessageID();
        assertNotNull("JMSMessageID should be set", jmsMessageID);
        assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));

        connection.close();
        testPeer.waitForAllHandlersToComplete(1000);

        // Get the value that was actually transmitted/received, verify it is a String,
        // verify it is the "ID:" prefix followed by the UUID toString, check the
        // JMSMessageID value that we have locally matches exactly.
        Object receivedMessageId = propsMatcher.getReceivedMessageId();

        String uuidToString = jmsMessageID.substring("ID:".length());
        UUID.fromString(uuidToString);
        assertTrue("Expected String message id to be sent", receivedMessageId instanceof String);
        assertEquals("Expected UUID toString value to be present in AMQP message", jmsMessageID, receivedMessageId);
    }
}
 
源代码12 项目: localization_nifi   文件: JmsFactory.java
public static Map<String, String> createAttributeMap(final Message message) throws JMSException {
    final Map<String, String> attributes = new HashMap<>();

    final Enumeration<?> enumeration = message.getPropertyNames();
    while (enumeration.hasMoreElements()) {
        final String propName = (String) enumeration.nextElement();

        final Object value = message.getObjectProperty(propName);

        if (value == null) {
            attributes.put(ATTRIBUTE_PREFIX + propName, "");
            attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, "Unknown");
            continue;
        }

        final String valueString = value.toString();
        attributes.put(ATTRIBUTE_PREFIX + propName, valueString);

        final String propType;
        if (value instanceof String) {
            propType = PROP_TYPE_STRING;
        } else if (value instanceof Double) {
            propType = PROP_TYPE_DOUBLE;
        } else if (value instanceof Float) {
            propType = PROP_TYPE_FLOAT;
        } else if (value instanceof Long) {
            propType = PROP_TYPE_LONG;
        } else if (value instanceof Integer) {
            propType = PROP_TYPE_INTEGER;
        } else if (value instanceof Short) {
            propType = PROP_TYPE_SHORT;
        } else if (value instanceof Byte) {
            propType = PROP_TYPE_BYTE;
        } else if (value instanceof Boolean) {
            propType = PROP_TYPE_BOOLEAN;
        } else {
            propType = PROP_TYPE_OBJECT;
        }

        attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, propType);
    }

    if (message.getJMSCorrelationID() != null) {
        attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
    }
    if (message.getJMSDestination() != null) {
        String destinationName;
        if (message.getJMSDestination() instanceof Queue) {
            destinationName = ((Queue) message.getJMSDestination()).getQueueName();
        } else {
            destinationName = ((Topic) message.getJMSDestination()).getTopicName();
        }
        attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName);
    }
    if (message.getJMSMessageID() != null) {
        attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());
    }
    if (message.getJMSReplyTo() != null) {
        attributes.put(ATTRIBUTE_PREFIX + JMS_REPLY_TO, message.getJMSReplyTo().toString());
    }
    if (message.getJMSType() != null) {
        attributes.put(ATTRIBUTE_PREFIX + JMS_TYPE, message.getJMSType());
    }

    attributes.put(ATTRIBUTE_PREFIX + JMS_DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode()));
    attributes.put(ATTRIBUTE_PREFIX + JMS_EXPIRATION, String.valueOf(message.getJMSExpiration()));
    attributes.put(ATTRIBUTE_PREFIX + JMS_PRIORITY, String.valueOf(message.getJMSPriority()));
    attributes.put(ATTRIBUTE_PREFIX + JMS_REDELIVERED, String.valueOf(message.getJMSRedelivered()));
    attributes.put(ATTRIBUTE_PREFIX + JMS_TIMESTAMP, String.valueOf(message.getJMSTimestamp()));
    return attributes;
}
 
/**
 * Gets the key to use for the Kafka Connect SourceRecord.
 *
 * @param context            the JMS context to use for building messages
 * @param topic              the Kafka topic
 * @param message            the message
 *
 * @return the Kafka Connect SourceRecord's key
 *
 * @throws JMSException      Message could not be converted
 */
public SchemaAndValue getKey(JMSContext context, String topic, Message message) throws JMSException {
    Schema keySchema = null;
    Object key = null;
    String keystr;

    switch (keyheader) {
        case MESSAGE_ID:
            keySchema = Schema.OPTIONAL_STRING_SCHEMA;
            keystr = message.getJMSMessageID();
            if (keystr.startsWith("ID:", 0)) {
                key = keystr.substring(3);
            }
            else {
                key = keystr;
            }
            break;
        case CORRELATION_ID:
            keySchema = Schema.OPTIONAL_STRING_SCHEMA;
            keystr = message.getJMSCorrelationID();
            if (keystr.startsWith("ID:", 0)) {
                key = keystr.substring(3);
            }
            else {
                key = keystr;
            }
            break;
        case CORRELATION_ID_AS_BYTES:
            keySchema = Schema.OPTIONAL_BYTES_SCHEMA;
            key = message.getJMSCorrelationIDAsBytes();
            break;
        case DESTINATION:
            keySchema = Schema.OPTIONAL_STRING_SCHEMA;
            key = message.getJMSDestination().toString();
            break;
        default:
            break;
    }

    return new SchemaAndValue(keySchema, key);
}
 
源代码14 项目: qpid-broker-j   文件: MessageVerifier.java
private static void verifyMessageHeaders(final MessageDescription messageDescription,
                                         final Message message) throws VerificationException
{
    try
    {
        for (Map.Entry<MessageDescription.MessageHeader, Serializable> entry : messageDescription.getHeaders()
                                                                                                 .entrySet())
        {
            Object actualValue;

            switch (entry.getKey())
            {
                case DESTINATION:
                    actualValue = message.getJMSDestination();
                    break;
                case DELIVERY_MODE:
                    actualValue = message.getJMSDeliveryMode();
                    break;
                case MESSAGE_ID:
                    actualValue = message.getJMSMessageID();
                    break;
                case TIMESTAMP:
                    actualValue = message.getJMSTimestamp();
                    break;
                case CORRELATION_ID:
                    if (entry.getValue() instanceof byte[])
                    {
                        actualValue = message.getJMSCorrelationIDAsBytes();
                    }
                    else
                    {
                        actualValue = message.getJMSCorrelationID();
                    }
                    break;
                case REPLY_TO:
                    actualValue = message.getJMSReplyTo();
                    break;
                case REDELIVERED:
                    actualValue = message.getJMSRedelivered();
                    break;
                case TYPE:
                    actualValue = message.getJMSType();
                    break;
                case EXPIRATION:
                    actualValue = message.getJMSExpiration();
                    break;
                case PRIORITY:
                    actualValue = message.getJMSPriority();
                    break;
                default:
                    throw new RuntimeException(String.format("unexpected message header '%s'", entry.getKey()));
            }

            verifyEquals(String.format("Unexpected message header '%s'", entry.getKey()),
                         entry.getValue(),
                         actualValue);
        }
    }
    catch (JMSException e)
    {
        throw new RuntimeException("Unexpected exception during message header verification", e);
    }
}
 
源代码15 项目: qpid-jms   文件: ProducerIntegrationTest.java
@Test(timeout=20000)
public void testSendingMessageWithUUIDMessageIdFormat() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        // DONT create a test fixture, we will drive everything directly.
        String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=UUID";
        JmsConnectionFactory factory = new JmsConnectionFactory(uri);

        testPeer.expectSaslAnonymous();
        testPeer.expectOpen();
        testPeer.expectBegin();
        testPeer.expectBegin();
        testPeer.expectSenderAttach();

        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        String queueName = "myQueue";
        Queue queue = session.createQueue(queueName);
        MessageProducer producer = session.createProducer(queue);

        String text = "myMessage";
        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withMessageId(isA(UUID.class));
        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(headersMatcher);
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
        messageMatcher.setPropertiesMatcher(propsMatcher);
        messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
        testPeer.expectTransfer(messageMatcher);
        testPeer.expectClose();

        Message message = session.createTextMessage(text);

        assertNull("JMSMessageID should not yet be set", message.getJMSMessageID());

        producer.send(message);

        String jmsMessageID = message.getJMSMessageID();
        assertNotNull("JMSMessageID should be set", jmsMessageID);
        assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
        String uuidEncodingPrefix = AmqpMessageIdHelper.JMS_ID_PREFIX + AmqpMessageIdHelper.AMQP_UUID_PREFIX;
        assertTrue("The 'UUID prefix' encoding hint was not found", jmsMessageID.startsWith(uuidEncodingPrefix));

        connection.close();

        // Get the value that was actually transmitted/received, verify it is a UUID, compare to what we have locally
        testPeer.waitForAllHandlersToComplete(1000);

        Object receivedMessageId = propsMatcher.getReceivedMessageId();

        assertTrue("Expected UUID message id to be sent", receivedMessageId instanceof UUID);
        assertTrue("Expected JMSMessageId value to be present in AMQP message", jmsMessageID.endsWith(receivedMessageId.toString()));
    }
}
 
源代码16 项目: qpid-jms   文件: MessageIntegrationTest.java
private void recieveMessageIdSendCorrelationIdTestImpl(Object amqpIdObject, String expectedMessageId) throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        Connection connection = testFixture.establishConnecton(testPeer);
        connection.start();

        testPeer.expectBegin();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("myQueue");

        PropertiesDescribedType props = new PropertiesDescribedType();
        props.setMessageId(amqpIdObject);
        DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);

        testPeer.expectReceiverAttach();
        testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
        testPeer.expectDispositionThatIsAcceptedAndSettled();

        MessageConsumer messageConsumer = session.createConsumer(queue);
        Message receivedMessage = messageConsumer.receive(3000);
        testPeer.waitForAllHandlersToComplete(3000);

        assertNotNull(receivedMessage);

        String jmsMessageID = receivedMessage.getJMSMessageID();
        assertEquals("Unexpected value for JMSMessageID", expectedMessageId, jmsMessageID);

        //Now take the received JMSMessageID, and send a message with it set
        //as the JMSCorrelationID and verify we send the same AMQP id as we started with.

        testPeer.expectSenderAttach();
        MessageProducer producer = session.createProducer(queue);

        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);

        //Set matcher to validate the correlation-id on the wire matches the previous message-id
        propsMatcher.withCorrelationId(equalTo(amqpIdObject));

        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(headersMatcher);
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
        messageMatcher.setPropertiesMatcher(propsMatcher);
        messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
        testPeer.expectTransfer(messageMatcher);

        Message message = session.createTextMessage();
        message.setJMSCorrelationID(jmsMessageID);

        producer.send(message);

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(3000);
    }
}
 
源代码17 项目: qpid-jms   文件: ProducerIntegrationTest.java
@Test(timeout=20000)
public void testSendingMessageWithUUIDStringMessageIdFormat() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        // DONT create a test fixture, we will drive everything directly.
        String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=UUID_STRING";
        JmsConnectionFactory factory = new JmsConnectionFactory(uri);

        testPeer.expectSaslAnonymous();
        testPeer.expectOpen();
        testPeer.expectBegin();
        testPeer.expectBegin();
        testPeer.expectSenderAttach();

        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        String queueName = "myQueue";
        Queue queue = session.createQueue(queueName);
        MessageProducer producer = session.createProducer(queue);

        String text = "myMessage";
        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withMessageId(isA(String.class));
        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(headersMatcher);
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
        messageMatcher.setPropertiesMatcher(propsMatcher);
        messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
        testPeer.expectTransfer(messageMatcher);
        testPeer.expectClose();

        Message message = session.createTextMessage(text);

        assertNull("JMSMessageID should not yet be set", message.getJMSMessageID());

        producer.send(message);

        String jmsMessageID = message.getJMSMessageID();
        assertNotNull("JMSMessageID should be set", jmsMessageID);
        assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
        String noIdPrefix = AmqpMessageIdHelper.JMS_ID_PREFIX + AmqpMessageIdHelper.AMQP_NO_PREFIX;
        assertTrue("The 'No ID prefix' encoding hint was not found", jmsMessageID.startsWith(noIdPrefix));

        connection.close();
        testPeer.waitForAllHandlersToComplete(1000);

        // Get the value that was actually transmitted/received, verify it is a String,
        // verify it is only the UUID toString and has no "ID", check the encoded
        // JMSMessageID value that we have locally.
        Object receivedMessageId = propsMatcher.getReceivedMessageId();

        String expected = jmsMessageID.substring(noIdPrefix.length());
        UUID.fromString(expected);
        assertTrue("Expected String message id to be sent", receivedMessageId instanceof String);
        assertEquals("Expected UUID toString value to be present in AMQP message", expected, receivedMessageId);
    }
}
 
源代码18 项目: nifi   文件: JmsFactory.java
public static Map<String, String> createAttributeMap(final Message message) throws JMSException {
    final Map<String, String> attributes = new HashMap<>();

    final Enumeration<?> enumeration = message.getPropertyNames();
    while (enumeration.hasMoreElements()) {
        final String propName = (String) enumeration.nextElement();

        final Object value = message.getObjectProperty(propName);

        if (value == null) {
            attributes.put(ATTRIBUTE_PREFIX + propName, "");
            attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, "Unknown");
            continue;
        }

        final String valueString = value.toString();
        attributes.put(ATTRIBUTE_PREFIX + propName, valueString);

        final String propType;
        if (value instanceof String) {
            propType = PROP_TYPE_STRING;
        } else if (value instanceof Double) {
            propType = PROP_TYPE_DOUBLE;
        } else if (value instanceof Float) {
            propType = PROP_TYPE_FLOAT;
        } else if (value instanceof Long) {
            propType = PROP_TYPE_LONG;
        } else if (value instanceof Integer) {
            propType = PROP_TYPE_INTEGER;
        } else if (value instanceof Short) {
            propType = PROP_TYPE_SHORT;
        } else if (value instanceof Byte) {
            propType = PROP_TYPE_BYTE;
        } else if (value instanceof Boolean) {
            propType = PROP_TYPE_BOOLEAN;
        } else {
            propType = PROP_TYPE_OBJECT;
        }

        attributes.put(ATTRIBUTE_PREFIX + propName + ATTRIBUTE_TYPE_SUFFIX, propType);
    }

    if (message.getJMSCorrelationID() != null) {
        attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID());
    }
    if (message.getJMSDestination() != null) {
        String destinationName;
        if (message.getJMSDestination() instanceof Queue) {
            destinationName = ((Queue) message.getJMSDestination()).getQueueName();
        } else {
            destinationName = ((Topic) message.getJMSDestination()).getTopicName();
        }
        attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName);
    }
    if (message.getJMSMessageID() != null) {
        attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID());
    }
    if (message.getJMSReplyTo() != null) {
        attributes.put(ATTRIBUTE_PREFIX + JMS_REPLY_TO, message.getJMSReplyTo().toString());
    }
    if (message.getJMSType() != null) {
        attributes.put(ATTRIBUTE_PREFIX + JMS_TYPE, message.getJMSType());
    }

    attributes.put(ATTRIBUTE_PREFIX + JMS_DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode()));
    attributes.put(ATTRIBUTE_PREFIX + JMS_EXPIRATION, String.valueOf(message.getJMSExpiration()));
    attributes.put(ATTRIBUTE_PREFIX + JMS_PRIORITY, String.valueOf(message.getJMSPriority()));
    attributes.put(ATTRIBUTE_PREFIX + JMS_REDELIVERED, String.valueOf(message.getJMSRedelivered()));
    attributes.put(ATTRIBUTE_PREFIX + JMS_TIMESTAMP, String.valueOf(message.getJMSTimestamp()));
    return attributes;
}
 
/**
 * Create the invocation result response message.
 * <p>The default implementation creates a JMS ObjectMessage for the given
 * RemoteInvocationResult object. It sets the response's correlation id
 * to the request message's correlation id, if any; otherwise to the
 * request message id.
 * @param request the original request message
 * @param session the JMS session to use
 * @param result the invocation result
 * @return the message response to send
 * @throws javax.jms.JMSException if creating the message failed
 */
protected Message createResponseMessage(Message request, Session session, RemoteInvocationResult result)
		throws JMSException {

	Message response = this.messageConverter.toMessage(result, session);
	String correlation = request.getJMSCorrelationID();
	if (correlation == null) {
		correlation = request.getJMSMessageID();
	}
	response.setJMSCorrelationID(correlation);
	return response;
}
 
/**
 * Create the invocation result response message.
 * <p>The default implementation creates a JMS ObjectMessage for the given
 * RemoteInvocationResult object. It sets the response's correlation id
 * to the request message's correlation id, if any; otherwise to the
 * request message id.
 * @param request the original request message
 * @param session the JMS session to use
 * @param result the invocation result
 * @return the message response to send
 * @throws javax.jms.JMSException if creating the message failed
 */
protected Message createResponseMessage(Message request, Session session, RemoteInvocationResult result)
		throws JMSException {

	Message response = this.messageConverter.toMessage(result, session);
	String correlation = request.getJMSCorrelationID();
	if (correlation == null) {
		correlation = request.getJMSMessageID();
	}
	response.setJMSCorrelationID(correlation);
	return response;
}