下面列出了javax.jms.Message#setJMSType ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void doSend(Destination destination, Message message) throws JMSException {
if (message == null) {
throw new MessageFormatException("Message must not be null");
}
for (Map.Entry<String, Object> entry : messageProperties.entrySet()) {
message.setObjectProperty(entry.getKey(), entry.getValue());
}
if (correlationId != null) {
message.setJMSCorrelationID(correlationId);
}
if (correlationIdBytes != null) {
message.setJMSCorrelationIDAsBytes(correlationIdBytes);
}
if (type != null) {
message.setJMSType(type);
}
if (replyTo != null) {
message.setJMSReplyTo(replyTo);
}
session.send(producer, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryDelay, completionListener);
}
/**
* Creates a request message for the given parameters.
*
* @param operation The name of the operation to invoke or {@code null} if the message
* should not have a subject.
* @param applicationProperties Application properties to set on the request message or
* {@code null} if no properties should be set.
* @param payload Payload to include or {@code null} if the message should have no body.
* @return A succeeded future containing the created message or a failed future if there was an exception
* creating the message.
*/
protected Future<Message> createRequestMessage(final String operation,
final Map<String, Object> applicationProperties, final Buffer payload) {
try {
final Message request = createMessage(payload);
if (operation != null) {
request.setJMSType(operation);
}
if (applicationProperties != null) {
for (final Map.Entry<String, Object> entry : applicationProperties.entrySet()) {
if (entry.getValue() instanceof String) {
request.setStringProperty(entry.getKey(), (String) entry.getValue());
} else {
request.setObjectProperty(entry.getKey(), entry.getValue());
}
}
}
return Future.succeededFuture(request);
} catch (final JMSException e) {
return Future.failedFuture(getServiceInvocationException(e));
}
}
public void testJMSPropertySelectors() throws Exception {
Message message = createMessage();
message.setJMSType("selector-test");
message.setJMSMessageID("id:test:1:1:1:1");
assertSelector(message, "JMSType = 'selector-test'", true);
assertSelector(message, "JMSType = 'crap'", false);
assertSelector(message, "JMSMessageID = 'id:test:1:1:1:1'", true);
assertSelector(message, "JMSMessageID = 'id:not-test:1:1:1:1'", false);
message = createMessage();
message.setJMSType("1001");
assertSelector(message, "JMSType='1001'", true);
assertSelector(message, "JMSType='1001' OR JMSType='1002'", true);
assertSelector(message, "JMSType = 'crap'", false);
}
protected Message createMessage() throws JMSException {
Message message = createMessage("FOO.BAR");
message.setJMSType("selector-test");
message.setJMSMessageID("connection:1:1:1:1");
message.setObjectProperty("name", "James");
message.setObjectProperty("location", "London");
message.setByteProperty("byteProp", (byte) 123);
message.setByteProperty("byteProp2", (byte) 33);
message.setShortProperty("shortProp", (short) 123);
message.setIntProperty("intProp", 123);
message.setLongProperty("longProp", 123);
message.setFloatProperty("floatProp", 123);
message.setDoubleProperty("doubleProp", 123);
message.setIntProperty("rank", 123);
message.setIntProperty("version", 2);
message.setStringProperty("quote", "'In God We Trust'");
message.setStringProperty("foo", "_foo");
message.setStringProperty("punctuation", "!#$&()*+,-./:;<=>[email protected][\\]^`{|}~");
message.setBooleanProperty("trueProp", true);
message.setBooleanProperty("falseProp", false);
return message;
}
private void doSend(Destination destination, Message message) throws JMSException {
if (message == null) {
throw new MessageFormatException("Message must not be null");
}
for (Map.Entry<String, Object> entry : messageProperties.entrySet()) {
message.setObjectProperty(entry.getKey(), entry.getValue());
}
if (correlationId != null) {
message.setJMSCorrelationID(correlationId);
}
if (correlationIdBytes != null) {
message.setJMSCorrelationIDAsBytes(correlationIdBytes);
}
if (type != null) {
message.setJMSType(type);
}
if (replyTo != null) {
message.setJMSReplyTo(replyTo);
}
session.send(producer, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryDelay, completionListener);
}
private void doSend(Destination destination, Message message) throws JMSException {
if (message == null) {
throw new MessageFormatException("Message must not be null");
}
for (Map.Entry<String, Object> entry : messageProperties.entrySet()) {
message.setObjectProperty(entry.getKey(), entry.getValue());
}
if (correlationId != null) {
message.setJMSCorrelationID(correlationId);
}
if (correlationIdBytes != null) {
message.setJMSCorrelationIDAsBytes(correlationIdBytes);
}
if (type != null) {
message.setJMSType(type);
}
if (replyTo != null) {
message.setJMSReplyTo(replyTo);
}
if (completionListener != null) {
producer.send(destination, message, deliveryMode, priority, timeToLive, completionListener);
} else {
producer.send(destination, message, deliveryMode, priority, timeToLive);
}
}
@Test
public void testRoutingWithSubjectSetAsJMSMessageType() throws Exception
{
assumeThat("AMQP 1.0 test", getProtocol(), is(equalTo(Protocol.AMQP_1_0)));
prepare();
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination sendingDestination = session.createTopic(EXCHANGE_NAME);
Destination receivingDestination = session.createQueue(QUEUE_NAME);
Message message = session.createTextMessage("test");
message.setJMSType(ROUTING_KEY);
MessageProducer messageProducer = session.createProducer(sendingDestination);
messageProducer.send(message);
MessageConsumer messageConsumer = session.createConsumer(receivingDestination);
Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
assertNotNull("Message not received", receivedMessage);
assertEquals("test", ((TextMessage) message).getText());
}
finally
{
connection.close();
}
}
@Test
public void testAnonymousRelayRoutingWithSubjectSetAsJMSMessageType() throws Exception
{
assumeThat("AMQP 1.0 test", getProtocol(), is(equalTo(Protocol.AMQP_1_0)));
prepare();
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination sendingDestination = session.createTopic(EXCHANGE_NAME);
Destination receivingDestination = session.createQueue(QUEUE_NAME);
Message message = session.createTextMessage("test");
message.setJMSType(ROUTING_KEY);
MessageProducer messageProducer = session.createProducer(null);
messageProducer.send(sendingDestination, message);
MessageConsumer messageConsumer = session.createConsumer(receivingDestination);
Message receivedMessage = messageConsumer.receive(getReceiveTimeout());
assertNotNull("Message not received", receivedMessage);
assertEquals("test", ((TextMessage) message).getText());
}
finally
{
connection.close();
}
}
@Test
public void testJMSType() throws Exception {
Message m = queueProducerSession.createMessage();
String originalType = "TYPE1";
m.setJMSType(originalType);
queueProducer.send(m);
String gotType = queueConsumer.receive(1000).getJMSType();
ProxyAssertSupport.assertEquals(originalType, gotType);
}
protected void prepareMessage(final Message m) throws JMSException {
m.setBooleanProperty("booleanProperty", true);
m.setByteProperty("byteProperty", (byte) 3);
m.setDoubleProperty("doubleProperty", 4.0);
m.setFloatProperty("floatProperty", 5.0f);
m.setIntProperty("intProperty", 6);
m.setLongProperty("longProperty", 7);
m.setShortProperty("shortProperty", (short) 8);
m.setStringProperty("stringProperty", "this is a String property");
m.setJMSCorrelationID("this is the correlation ID");
m.setJMSReplyTo(ActiveMQServerTestCase.topic1);
m.setJMSType("someArbitraryType");
}
@Override
public JMSProducer send(Destination destination, Message message) {
if (message == null) {
throw new MessageFormatRuntimeException("null message");
}
try {
if (jmsHeaderCorrelationID != null) {
message.setJMSCorrelationID(jmsHeaderCorrelationID);
}
if (jmsHeaderCorrelationIDAsBytes != null && jmsHeaderCorrelationIDAsBytes.length > 0) {
message.setJMSCorrelationIDAsBytes(jmsHeaderCorrelationIDAsBytes);
}
if (jmsHeaderReplyTo != null) {
message.setJMSReplyTo(jmsHeaderReplyTo);
}
if (jmsHeaderType != null) {
message.setJMSType(jmsHeaderType);
}
// XXX HORNETQ-1209 "JMS 2.0" can this be a foreign msg?
// if so, then "SimpleString" properties will trigger an error.
setProperties(message);
if (completionListener != null) {
CompletionListener wrapped = new CompletionListenerWrapper(completionListener);
producer.send(destination, message, wrapped);
} else {
producer.send(destination, message);
}
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
return this;
}
@Override
public JMSProducer send(final Destination destination, final Message message) {
if (message == null) {
throw new MessageFormatRuntimeException("null message");
}
try {
if (jmsHeaderCorrelationID != null) {
message.setJMSCorrelationID(jmsHeaderCorrelationID);
}
if (jmsHeaderCorrelationIDAsBytes != null && jmsHeaderCorrelationIDAsBytes.length > 0) {
message.setJMSCorrelationIDAsBytes(jmsHeaderCorrelationIDAsBytes);
}
if (jmsHeaderReplyTo != null) {
message.setJMSReplyTo(jmsHeaderReplyTo);
}
if (jmsHeaderType != null) {
message.setJMSType(jmsHeaderType);
}
setProperties(message);
if (completionListener != null) {
producer.send(destination, message, completionListener);
} else {
producer.send(destination, message);
}
} catch (final JMSException e) {
throw toRuntimeException(e);
}
return this;
}
private static void setJmsHeader(final MessageDescription messageDescription,
final Message message)
throws JMSException
{
final HashMap<MessageDescription.MessageHeader, Serializable> header =
messageDescription.getHeaders();
if (header == null)
{
return;
}
for (Map.Entry<MessageDescription.MessageHeader, Serializable> entry : header.entrySet())
{
try
{
switch (entry.getKey())
{
case DESTINATION:
message.setJMSDestination((Destination) entry.getValue());
break;
case DELIVERY_MODE:
message.setJMSDeliveryMode((Integer) entry.getValue());
break;
case MESSAGE_ID:
message.setJMSMessageID((String) entry.getValue());
break;
case TIMESTAMP:
message.setJMSTimestamp((Long) entry.getValue());
break;
case CORRELATION_ID:
if (entry.getValue() instanceof byte[])
{
message.setJMSCorrelationIDAsBytes((byte[]) entry.getValue());
}
else
{
message.setJMSCorrelationID((String) entry.getValue());
}
break;
case REPLY_TO:
throw new RuntimeException("The Test should not set the replyTo header."
+ " It should rather use the dedicated method");
case REDELIVERED:
message.setJMSRedelivered((Boolean) entry.getValue());
break;
case TYPE:
message.setJMSType((String) entry.getValue());
break;
case EXPIRATION:
message.setJMSExpiration((Long) entry.getValue());
break;
case PRIORITY:
message.setJMSPriority((Integer) entry.getValue());
break;
default:
throw new RuntimeException(String.format("unexpected message header '%s'", entry.getKey()));
}
}
catch (ClassCastException e)
{
throw new RuntimeException(String.format("Could not set message header '%s' to this value: %s",
entry.getKey(),
entry.getValue()), e);
}
}
}
@Test
public void headers() throws Exception
{
final Queue queue = createQueue(getTestName());
final Destination replyTo = createQueue(getTestName() + "_replyTo");
final Connection consumerConnection = getConnection();
try
{
final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = consumerSession.createConsumer(queue);
final String correlationId = "testCorrelationId";
final String jmsType = "testJmsType";
final int priority = 1;
final long timeToLive = 30 * 60 * 1000;
final Connection producerConnection = getConnection();
try
{
final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = producerSession.createProducer(queue);
final Message message = producerSession.createMessage();
message.setJMSCorrelationID(correlationId);
message.setJMSType(jmsType);
message.setJMSReplyTo(replyTo);
long currentTime = System.currentTimeMillis();
producer.send(message, DeliveryMode.NON_PERSISTENT, priority, timeToLive);
consumerConnection.start();
Message receivedMessage = consumer.receive(getReceiveTimeout());
assertNotNull(receivedMessage);
assertEquals("JMSCorrelationID mismatch", correlationId, receivedMessage.getJMSCorrelationID());
assertEquals("JMSType mismatch", message.getJMSType(), receivedMessage.getJMSType());
assertEquals("JMSReply To mismatch", message.getJMSReplyTo(), receivedMessage.getJMSReplyTo());
assertTrue("JMSMessageID does not start 'ID:'", receivedMessage.getJMSMessageID().startsWith("ID:"));
assertEquals("JMSPriority mismatch", priority, receivedMessage.getJMSPriority());
assertTrue(String.format(
"Unexpected JMSExpiration: got '%d', but expected value equals or greater than '%d'",
receivedMessage.getJMSExpiration(),
currentTime + timeToLive),
receivedMessage.getJMSExpiration() >= currentTime + timeToLive
&& receivedMessage.getJMSExpiration() <= System.currentTimeMillis() + timeToLive);
}
finally
{
producerConnection.close();
}
}
finally
{
consumerConnection.close();
}
}
/**
* Sends a request for an operation.
*
* @param operation The name of the operation to invoke or {@code null} if the message
* should not have a subject.
* @param applicationProperties Application properties to set on the request message or
* {@code null} if no properties should be set.
* @param payload Payload to include or {@code null} if the message should have no body.
* @return A future indicating the outcome of the operation.
*/
public Future<JsonObject> sendRequest(
final String operation,
final Map<String, Object> applicationProperties,
final Buffer payload) {
try {
final Message request = createMessage(payload);
if (operation != null) {
request.setJMSType(operation);
}
if (applicationProperties != null) {
for (Map.Entry<String, Object> entry : applicationProperties.entrySet()) {
if (entry.getValue() instanceof String) {
request.setStringProperty(entry.getKey(), (String) entry.getValue());
} else {
request.setObjectProperty(entry.getKey(), entry.getValue());
}
}
}
return send(request)
.compose(registrationResult -> {
final Promise<JsonObject> result = Promise.promise();
switch (registrationResult.getStatus()) {
case HttpURLConnection.HTTP_OK:
result.complete(registrationResult.getPayload());
break;
case HttpURLConnection.HTTP_NOT_FOUND:
result.fail(new ClientErrorException(registrationResult.getStatus(), "no such device"));
break;
default:
result.fail(StatusCodeMapper.from(registrationResult));
}
return result.future();
});
} catch (JMSException e) {
return Future.failedFuture(getServiceInvocationException(e));
}
}
/**
* Sends a request for an operation.
*
* @param operation The name of the operation to invoke or {@code null} if the message
* should not have a subject.
* @param applicationProperties Application properties to set on the request message or
* {@code null} if no properties should be set.
* @param payload Payload to include or {@code null} if the message should have no body.
* @return A future indicating the outcome of the operation.
*/
public Future<CredentialsObject> sendRequest(
final String operation,
final Map<String, Object> applicationProperties,
final Buffer payload) {
try {
final Message request = createMessage(payload);
if (operation != null) {
request.setJMSType(operation);
}
if (applicationProperties != null) {
for (Map.Entry<String, Object> entry : applicationProperties.entrySet()) {
if (entry.getValue() instanceof String) {
request.setStringProperty(entry.getKey(), (String) entry.getValue());
} else {
request.setObjectProperty(entry.getKey(), entry.getValue());
}
}
}
return send(request)
.compose(credentialsResult -> {
final Promise<CredentialsObject> result = Promise.promise();
switch (credentialsResult.getStatus()) {
case HttpURLConnection.HTTP_OK:
case HttpURLConnection.HTTP_CREATED:
result.complete(credentialsResult.getPayload());
break;
case HttpURLConnection.HTTP_NOT_FOUND:
result.fail(new ClientErrorException(credentialsResult.getStatus(), "no such credentials"));
break;
default:
result.fail(StatusCodeMapper.from(credentialsResult));
}
return result.future();
});
} catch (JMSException e) {
return Future.failedFuture(getServiceInvocationException(e));
}
}