下面列出了javax.jms.MapMessage#setJMSReplyTo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void updateEntityUsingAmqpManagement(final String name,
final String type,
Map<String, Object> attributes,
Destination replyToDestination,
final Session session)
throws JMSException
{
MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
MapMessage createMessage = session.createMapMessage();
createMessage.setStringProperty("type", type);
createMessage.setStringProperty("operation", "UPDATE");
createMessage.setStringProperty("index", "object-path");
createMessage.setStringProperty("key", name);
createMessage.setJMSReplyTo(replyToDestination);
for (Map.Entry<String, Object> entry : attributes.entrySet())
{
createMessage.setObject(entry.getKey(), entry.getValue());
}
producer.send(createMessage);
if (session.getTransacted())
{
session.commit();
}
producer.close();
}
@Test
public void testReadVirtualHost() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.VirtualHost");
message.setStringProperty("operation", "READ");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "");
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 200);
checkResponseIsMapType(responseMessage);
assertEquals("The name of the virtual host is not as expected",
getVirtualHostName(),
getValueFromMapResponse(responseMessage, "name"));
message.setBooleanProperty("actuals", false);
_producer.send(message);
responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 200);
checkResponseIsMapType(responseMessage);
assertNotNull("Derived attribute (productVersion) should be available",
getValueFromMapResponse(responseMessage, "productVersion"));
}
finally
{
connection.close();
}
}
@Test
public void testReadObject_ObjectNotFound() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Exchange");
message.setStringProperty("operation", "READ");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "not-found-exchange");
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 404);
}
finally
{
connection.close();
}
}
@Test
public void testInvokeOperation_ObjectNotFound() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Exchange");
message.setStringProperty("operation", "getStatistics");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "not-found-exchange");
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 404);
}
finally
{
connection.close();
}
}
@Test
public void testInvokeOperationReturningMap() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getBrokerManagementConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Broker");
message.setStringProperty("operation", "getStatistics");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "");
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 200);
checkResponseIsMapType(responseMessage);
assertNotNull(getValueFromMapResponse(responseMessage, "numberOfLiveThreads"));
}
finally
{
connection.close();
}
}
@Test
public void testInvokeOperationReturningManagedAttributeValue() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getBrokerManagementConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Broker");
message.setStringProperty("operation", "getConnectionMetaData");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "");
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 200);
checkResponseIsMapType(responseMessage);
assertNotNull(getValueFromMapResponse(responseMessage, "port"));
}
finally
{
connection.close();
}
}
@Test
public void testDeleteVirtualHost() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.VirtualHost");
message.setStringProperty("operation", "DELETE");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "");
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 501);
}
finally
{
connection.close();
}
}
@Test
public void testCreateQueueWithQpidType() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "CREATE");
message.setString("name", getTestName());
message.setString("qpid-type", "lvq");
String path = getTestName();
message.setString("object-path", path);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 201);
checkResponseIsMapType(responseMessage);
assertEquals("The created queue did not have the correct type",
"org.apache.qpid.LastValueQueue",
getValueFromMapResponse(responseMessage, "type"));
}
finally
{
connection.close();
}
}
@Test
public void testCreateQueueWithAmqpType() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.SortedQueue");
message.setStringProperty("operation", "CREATE");
message.setString("name", getTestName());
String path = getTestName();
message.setString("object-path", path);
message.setString("sortKey", "foo");
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 201);
checkResponseIsMapType(responseMessage);
assertEquals("The created queue did not have the correct type",
"sorted",
getValueFromMapResponse(responseMessage, "qpid-type"));
}
finally
{
connection.close();
}
}
@Test
public void testCreateExchangeWithoutType() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Exchange");
message.setStringProperty("operation", "CREATE");
message.setString("name", getTestName());
String path = getTestName();
message.setString("object-path", path);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 400);
}
finally
{
connection.close();
}
}
@Test
public void testCreateConnectionOnVirtualHostManagement() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Connection");
message.setStringProperty("operation", "CREATE");
message.setString("name", getTestName());
String path = getTestName();
message.setString("object-path", path);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 501);
}
finally
{
connection.close();
}
}
@Test
public void testCreateConnectionOnBrokerManagement() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getBrokerManagementConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Connection");
message.setStringProperty("operation", "CREATE");
message.setString("name", getTestName());
String path = getTestName();
message.setString("object-path", path);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 501);
}
finally
{
connection.close();
}
}
private Message sendManagementRequestAndGetResponse(final Session session,
final MapMessage request,
final int expectedResponseCode) throws Exception
{
final Queue queue;
final Queue replyConsumer;
Queue replyAddress;
if (getProtocol() == Protocol.AMQP_1_0)
{
queue = session.createQueue("$management");
replyAddress = session.createTemporaryQueue();
replyConsumer = replyAddress;
}
else
{
queue = session.createQueue("ADDR:$management");
replyAddress = session.createQueue("ADDR:!response");
replyConsumer = session.createQueue(
"ADDR:$management ; {assert : never, node: { type: queue }, link:{name: \"!response\"}}");
}
request.setJMSReplyTo(replyAddress);
final MessageConsumer consumer = session.createConsumer(replyConsumer);
final MessageProducer producer = session.createProducer(queue);
producer.send(request);
final Message responseMessage = consumer.receive(getReceiveTimeout());
assertThat("The response code did not indicate success",
responseMessage.getIntProperty("statusCode"), is(equalTo(expectedResponseCode)));
return responseMessage;
}
@Test
public void testCreateQueueOnBrokerManagement() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getBrokerManagementConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "CREATE");
message.setString("name", getTestName());
message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 100L);
String path = getVirtualHostName() + "/" + getVirtualHostName() + "/" + getTestName();
message.setString("object-path", path);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 201);
checkResponseIsMapType(responseMessage);
assertEquals("The created queue was not a standard queue",
"org.apache.qpid.StandardQueue",
getValueFromMapResponse(responseMessage, "type"));
assertEquals("The created queue was not a standard queue",
"standard",
getValueFromMapResponse(responseMessage, "qpid-type"));
assertEquals("the created queue did not have the correct alerting threshold",
100L,
getValueFromMapResponse(responseMessage, ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
Object identity = getValueFromMapResponse(responseMessage, "identity");
message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "UPDATE");
message.setObjectProperty("identity", identity);
message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 250L);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 200);
checkResponseIsMapType(responseMessage);
assertEquals("the created queue did not have the correct alerting threshold",
250L,
getValueFromMapResponse(responseMessage, ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "DELETE");
message.setObjectProperty("index", "object-path");
message.setObjectProperty("key", path);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 204);
message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "READ");
message.setObjectProperty("identity", identity);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 404);
}
finally
{
connection.close();
}
}
public Map<String, Object> createEntityAndAssertResponse(final String name,
final String type,
final Map<String, Object> attributes,
final Session session)
throws JMSException
{
Destination replyToDestination;
Destination replyConsumerDestination;
if (_protocol == Protocol.AMQP_1_0)
{
replyToDestination = session.createTemporaryQueue();
replyConsumerDestination = replyToDestination;
}
else
{
replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);
}
MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
MapMessage createMessage = session.createMapMessage();
createMessage.setStringProperty("type", type);
createMessage.setStringProperty("operation", "CREATE");
createMessage.setString("name", name);
createMessage.setString("object-path", name);
createMessage.setJMSReplyTo(replyToDestination);
for (Map.Entry<String, Object> entry : attributes.entrySet())
{
createMessage.setObject(entry.getKey(), entry.getValue());
}
producer.send(createMessage);
if (session.getTransacted())
{
session.commit();
}
producer.close();
return receiveManagementResponse(consumer, replyToDestination, 201);
}
public void run() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
try {
Connection connection = connectionFactory.createConnection();
// The Retailer's session is non-trasacted.
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination vendorOrderQueue = session.createQueue("VendorOrderQueue");
TemporaryQueue retailerConfirmQueue = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(vendorOrderQueue);
MessageConsumer replyConsumer = session.createConsumer(retailerConfirmQueue);
connection.start();
for (int i = 0; i < 5; i++) {
MapMessage message = session.createMapMessage();
message.setString("Item", "Computer(s)");
int quantity = (int)(Math.random() * 4) + 1;
message.setInt("Quantity", quantity);
message.setJMSReplyTo(retailerConfirmQueue);
producer.send(message);
System.out.println("Retailer: Ordered " + quantity + " computers.");
MapMessage reply = (MapMessage) replyConsumer.receive();
if (reply.getBoolean("OrderAccepted")) {
System.out.println("Retailer: Order Filled");
} else {
System.out.println("Retailer: Order Not Filled");
}
}
// Send a non-MapMessage to signal the end
producer.send(session.createMessage());
replyConsumer.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* <p>Extenders should first call superclass' marshal-method.
* Marshalling should use JMS properties (e.g. setStringProperty)
* for non-payload data and JMS MapMessage values (e.g. setString)
* for payload data ie. small data as properties and big data
* as values. There are no hard reasons for this, but this way we
* give hints to JMS implementation on how to handle our data and
* also make debugging easier, as properties are printed out to logs,
* but values are not. For this reason, sensitive data should be stored
* as values. Also only properties can be accessed with message selectors.</p>
*
* @param to
* @throws JMSException
*/
public void marshal(MapMessage to) throws JMSException {
to.setStringProperty(KEY_MESSAGE_ID, messageID);
to.setStringProperty(KEY_CLASS, this.getClass().getCanonicalName());
to.setStringProperty(KEY_USERNAME, username);
to.setJMSReplyTo(replyTo);
to.setStringProperty(KEY_MULTIPLEX_CHANNEL, multiplexChannel);
to.setStringProperty(KEY_SESSION_ID, sessionID);
}