下面列出了javax.jms.MapMessage#setStringProperty ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Test ConvertJMSMapToXML
*
* @throws Exception
*/
@Test
public void testConvertJMSMapToXML() throws Exception {
String queueName = "testHandler";
Properties jmsProperties = JMSTestsUtils.getJMSPropertiesForDestination(queueName, PROVIDER_URL, false);
JMSBrokerController brokerController = new JMSBrokerController(PROVIDER_URL, jmsProperties);
try {
brokerController.startProcess();
brokerController.connect(queueName, true);
MapMessage mapMessage = brokerController.createMapMessage();
mapMessage.setStringProperty("MessageFormat", "Person");
mapMessage.setString("NAME", queueName);
mapMessage.setInt("COUNT", 10);
mapMessage.setDouble("PRICE", 12.00);
OMElement result = JMSInjectHandler.convertJMSMapToXML(mapMessage);
Assert.assertEquals("The converted XML is not correct", "10", ((OMElement) result.
getChildrenWithLocalName("COUNT").next()).getText());
Assert.assertEquals("The converted XML is not correct", queueName, ((OMElement) result.
getChildrenWithLocalName("NAME").next()).getText());
} finally {
brokerController.stopProcess();
}
}
@Override
public void onMessage(Message message) {
try {
TextMessage receiveMessage = (TextMessage) message;
String keys = receiveMessage.getText();
LOGGER.info("keys = " + keys);
MapMessage returnMess = session.createMapMessage();
returnMess.setStringProperty("/a2/m1", "zhaohui");
returnMess.setStringProperty("/a3/m1/v2", "nanjing");
returnMess.setStringProperty("/a3/m1/v2/t2", "zhaohui");
QueueSender sender = session.createSender((Queue) message.getJMSReplyTo());
sender.send(returnMess);
} catch (Exception e) {
LOGGER.error("onMessage error", e);
}
}
private UUID getConnectionUUID(final Connection connection) throws Exception
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MapMessage message = session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.VirtualHost");
message.setStringProperty("operation", "getConnectionMetaData");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "");
MapMessage response = (MapMessage) sendManagementRequestAndGetResponse(session, message, 200);
return UUID.fromString(String.valueOf(response.getObject("connectionId")));
}
private void closeConnectionUsingAmqpManagement(final UUID targetConnectionId) throws Exception
{
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MapMessage message = session.createMapMessage();
message.setStringProperty("operation", "DELETE");
message.setStringProperty("type", "org.apache.qpid.Connection");
message.setStringProperty("identity", String.valueOf(targetConnectionId));
sendManagementRequestAndGetResponse(session, message, 204);
}
finally
{
connection.close();
}
}
public void createEntityUsingAmqpManagement(final String name,
final Session session,
final String type,
Map<String, Object> attributes)
throws JMSException
{
MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
MapMessage createMessage = session.createMapMessage();
createMessage.setStringProperty("type", type);
createMessage.setStringProperty("operation", "CREATE");
createMessage.setString("name", name);
createMessage.setString("object-path", name);
for (Map.Entry<String, Object> entry : attributes.entrySet())
{
createMessage.setObject(entry.getKey(), entry.getValue());
}
producer.send(createMessage);
if (session.getTransacted())
{
session.commit();
}
producer.close();
}
public void deleteEntityUsingAmqpManagement(final String name, final Session session, final String type)
throws JMSException
{
MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
MapMessage createMessage = session.createMapMessage();
createMessage.setStringProperty("type", type);
createMessage.setStringProperty("operation", "DELETE");
createMessage.setStringProperty("index", "object-path");
createMessage.setStringProperty("key", name);
producer.send(createMessage);
if (session.getTransacted())
{
session.commit();
}
}
@Test
public void testReadVirtualHost() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.VirtualHost");
message.setStringProperty("operation", "READ");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "");
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 200);
checkResponseIsMapType(responseMessage);
assertEquals("The name of the virtual host is not as expected",
getVirtualHostName(),
getValueFromMapResponse(responseMessage, "name"));
message.setBooleanProperty("actuals", false);
_producer.send(message);
responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 200);
checkResponseIsMapType(responseMessage);
assertNotNull("Derived attribute (productVersion) should be available",
getValueFromMapResponse(responseMessage, "productVersion"));
}
finally
{
connection.close();
}
}
@Test
public void testReadObject_ObjectNotFound() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Exchange");
message.setStringProperty("operation", "READ");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "not-found-exchange");
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 404);
}
finally
{
connection.close();
}
}
@Test
public void testInvokeOperation_ObjectNotFound() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Exchange");
message.setStringProperty("operation", "getStatistics");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "not-found-exchange");
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 404);
}
finally
{
connection.close();
}
}
@Test
public void testInvokeOperationReturningMap() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getBrokerManagementConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Broker");
message.setStringProperty("operation", "getStatistics");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "");
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 200);
checkResponseIsMapType(responseMessage);
assertNotNull(getValueFromMapResponse(responseMessage, "numberOfLiveThreads"));
}
finally
{
connection.close();
}
}
@Test
public void testInvokeOperationReturningManagedAttributeValue() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getBrokerManagementConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Broker");
message.setStringProperty("operation", "getConnectionMetaData");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "");
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 200);
checkResponseIsMapType(responseMessage);
assertNotNull(getValueFromMapResponse(responseMessage, "port"));
}
finally
{
connection.close();
}
}
@Test
public void testDeleteVirtualHost() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.VirtualHost");
message.setStringProperty("operation", "DELETE");
message.setStringProperty("index", "object-path");
message.setStringProperty("key", "");
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 501);
}
finally
{
connection.close();
}
}
@Test
public void testCreateQueueWithQpidType() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "CREATE");
message.setString("name", getTestName());
message.setString("qpid-type", "lvq");
String path = getTestName();
message.setString("object-path", path);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 201);
checkResponseIsMapType(responseMessage);
assertEquals("The created queue did not have the correct type",
"org.apache.qpid.LastValueQueue",
getValueFromMapResponse(responseMessage, "type"));
}
finally
{
connection.close();
}
}
@Test
public void testCreateConnectionOnVirtualHostManagement() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Connection");
message.setStringProperty("operation", "CREATE");
message.setString("name", getTestName());
String path = getTestName();
message.setString("object-path", path);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 501);
}
finally
{
connection.close();
}
}
@Test
public void testCreateConnectionOnBrokerManagement() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getBrokerManagementConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Connection");
message.setStringProperty("operation", "CREATE");
message.setString("name", getTestName());
String path = getTestName();
message.setString("object-path", path);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 501);
}
finally
{
connection.close();
}
}
@Test
public void testCreateQueueOnBrokerManagement() throws Exception
{
assumeThat(isSupportedClient(), is(true));
Connection connection = getBrokerManagementConnection();
try
{
setUp(connection);
MapMessage message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "CREATE");
message.setString("name", getTestName());
message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 100L);
String path = getVirtualHostName() + "/" + getVirtualHostName() + "/" + getTestName();
message.setString("object-path", path);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
Message responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 201);
checkResponseIsMapType(responseMessage);
assertEquals("The created queue was not a standard queue",
"org.apache.qpid.StandardQueue",
getValueFromMapResponse(responseMessage, "type"));
assertEquals("The created queue was not a standard queue",
"standard",
getValueFromMapResponse(responseMessage, "qpid-type"));
assertEquals("the created queue did not have the correct alerting threshold",
100L,
getValueFromMapResponse(responseMessage, ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
Object identity = getValueFromMapResponse(responseMessage, "identity");
message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "UPDATE");
message.setObjectProperty("identity", identity);
message.setLong(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 250L);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 200);
checkResponseIsMapType(responseMessage);
assertEquals("the created queue did not have the correct alerting threshold",
250L,
getValueFromMapResponse(responseMessage, ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES));
message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "DELETE");
message.setObjectProperty("index", "object-path");
message.setObjectProperty("key", path);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 204);
message = _session.createMapMessage();
message.setStringProperty("type", "org.apache.qpid.Queue");
message.setStringProperty("operation", "READ");
message.setObjectProperty("identity", identity);
message.setJMSReplyTo(_replyAddress);
_producer.send(message);
responseMessage = _consumer.receive(getReceiveTimeout());
assertResponseCode(responseMessage, 404);
}
finally
{
connection.close();
}
}
@Override
public void marshal(MapMessage to) throws JMSException {
super.marshal(to);
to.setStringProperty(KEY_MODULE_NAME, this.getModuleName());
to.setString(KEY_MODULE, XmlUtil.xmlToString(moduleXml));
}
@Test
public void testSendEmptyMessages() throws Exception {
Queue dest = new ActiveMQQueue(queueName);
QueueSession defaultQueueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender defaultSender = defaultQueueSession.createSender(dest);
defaultSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
connection.start();
Message msg = defaultQueueSession.createMessage();
msg.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(msg);
QueueReceiver queueReceiver = defaultQueueSession.createReceiver(dest);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//bytes
BytesMessage bytesMessage = defaultQueueSession.createBytesMessage();
bytesMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(bytesMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//map
MapMessage mapMessage = defaultQueueSession.createMapMessage();
mapMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(mapMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//object
ObjectMessage objMessage = defaultQueueSession.createObjectMessage();
objMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(objMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//stream
StreamMessage streamMessage = defaultQueueSession.createStreamMessage();
streamMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(streamMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//text
TextMessage textMessage = defaultQueueSession.createTextMessage();
textMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(textMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
}
public Map<String, Object> createEntityAndAssertResponse(final String name,
final String type,
final Map<String, Object> attributes,
final Session session)
throws JMSException
{
Destination replyToDestination;
Destination replyConsumerDestination;
if (_protocol == Protocol.AMQP_1_0)
{
replyToDestination = session.createTemporaryQueue();
replyConsumerDestination = replyToDestination;
}
else
{
replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);
}
MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
MapMessage createMessage = session.createMapMessage();
createMessage.setStringProperty("type", type);
createMessage.setStringProperty("operation", "CREATE");
createMessage.setString("name", name);
createMessage.setString("object-path", name);
createMessage.setJMSReplyTo(replyToDestination);
for (Map.Entry<String, Object> entry : attributes.entrySet())
{
createMessage.setObject(entry.getKey(), entry.getValue());
}
producer.send(createMessage);
if (session.getTransacted())
{
session.commit();
}
producer.close();
return receiveManagementResponse(consumer, replyToDestination, 201);
}
/**
* <p>Extenders should first call superclass' marshal-method.
* Marshalling should use JMS properties (e.g. setStringProperty)
* for non-payload data and JMS MapMessage values (e.g. setString)
* for payload data ie. small data as properties and big data
* as values. There are no hard reasons for this, but this way we
* give hints to JMS implementation on how to handle our data and
* also make debugging easier, as properties are printed out to logs,
* but values are not. For this reason, sensitive data should be stored
* as values. Also only properties can be accessed with message selectors.</p>
*
* @param to
* @throws JMSException
*/
public void marshal(MapMessage to) throws JMSException {
to.setStringProperty(KEY_MESSAGE_ID, messageID);
to.setStringProperty(KEY_CLASS, this.getClass().getCanonicalName());
to.setStringProperty(KEY_USERNAME, username);
to.setJMSReplyTo(replyTo);
to.setStringProperty(KEY_MULTIPLEX_CHANNEL, multiplexChannel);
to.setStringProperty(KEY_SESSION_ID, sessionID);
}