下面列出了怎么用javax.jms.MapMessage的API类实例代码及写法,或者点击链接到github查看源代码。
public void onMessage(Message arg0) {
try {
if (!(arg0 instanceof MapMessage)) {
new Exception("Wrong message type: " + arg0).printStackTrace(System.out);
System.exit(1);
}
MapMessage message = (MapMessage) arg0;
int receivedCounter = message.getInt("Counter");
System.out.println("Received counter=" + receivedCounter);
if (receivedCounter != counter_) {
new Exception("Out of order, expected " + counter_ + ", but got " + receivedCounter).printStackTrace(System.out);
System.exit(1);
}
counter_++;
} catch (JMSException e) {
e.printStackTrace(System.out);
System.exit(1);
}
}
/**
* Method to infer the JMS message type.
*
* @param msg the message to be inferred
* @return the type of the JMS message
*/
public static String inferJMSMessageType(Message msg) {
if (isTextMessage(msg)) {
return TextMessage.class.getName();
} else if (isBytesMessage(msg)) {
return BytesMessage.class.getName();
} else if (isObjectMessage(msg)) {
return ObjectMessage.class.getName();
} else if (isStreamMessage(msg)) {
return StreamMessage.class.getName();
} else if (isMapMessage(msg)) {
return MapMessage.class.getName();
} else {
return null;
}
}
private void sendTestMapMessage(ActiveMQConnectionFactory factory, String message) throws JMSException {
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setBoolean("boolean-type", true);
mapMessage.setByte("byte-type", (byte) 10);
mapMessage.setBytes("bytes-type", TEXT.getBytes());
mapMessage.setChar("char-type", 'A');
mapMessage.setDouble("double-type", 55.3D);
mapMessage.setFloat("float-type", 79.1F);
mapMessage.setInt("int-type", 37);
mapMessage.setLong("long-type", 56652L);
mapMessage.setObject("object-type", new String("VVVV"));
mapMessage.setShort("short-type", (short) 333);
mapMessage.setString("string-type", TEXT);
producer.send(mapMessage);
connection.close();
}
@org.junit.Ignore
public void testSendMapToQueue() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "tcp://localhost:61616");
runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
final MapMessage message = jmsSession.createMapMessage();
message.setString("foo!", "bar");
message.setString("bacon", "meat");
producer.send(message);
jmsSession.commit();
producer.close();
jmsSession.close();
}
/**
* Test convertXMLtoJMSMap
*
* @throws Exception
*/
@Test
public void testConvertXMLtoJMSMap() throws Exception {
String queueName = "testHandler1";
Properties jmsProperties = JMSTestsUtils.getJMSPropertiesForDestination(queueName, PROVIDER_URL, false);
JMSBrokerController brokerController = new JMSBrokerController(PROVIDER_URL, jmsProperties);
try {
brokerController.startProcess();
brokerController.connect(queueName, true);
MapMessage mapMessage = brokerController.createMapMessage();
OMElement omElement = AXIOMUtil.stringToOM(
"<JMSMap xmlns=\"http://axis.apache.org/axis2/java/transports/jms/map-payload\">"
+ "<PRICE>12.0</PRICE>" + "<COUNT>10</COUNT>" + "<NAME>" + queueName + "</NAME>"
+ "</JMSMap>");
JMSUtils.convertXMLtoJMSMap(omElement, mapMessage);
Assert.assertEquals("The converted JMS Map is not correct", "12.0", ((ActiveMQMapMessage) mapMessage).
getContentMap().get("PRICE"));
Assert.assertEquals("The converted JMS Map is not correct", queueName, ((ActiveMQMapMessage) mapMessage).
getContentMap().get("NAME"));
} finally {
brokerController.stopProcess();
}
}
/**
* This implementation converts a TextMessage back to a String, a
* ByteMessage back to a byte array, a MapMessage back to a Map,
* and an ObjectMessage back to a Serializable object. Returns
* the plain Message object in case of an unknown message type.
* @see #extractStringFromMessage
* @see #extractByteArrayFromMessage
* @see #extractMapFromMessage
* @see #extractSerializableFromMessage
*/
@Override
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
if (message instanceof TextMessage) {
return extractStringFromMessage((TextMessage) message);
}
else if (message instanceof BytesMessage) {
return extractByteArrayFromMessage((BytesMessage) message);
}
else if (message instanceof MapMessage) {
return extractMapFromMessage((MapMessage) message);
}
else if (message instanceof ObjectMessage) {
return extractSerializableFromMessage((ObjectMessage) message);
}
else {
return message;
}
}
@Test
public void testMapConversionWhereMapHasNonStringTypesForKeys() throws JMSException {
MapMessage message = mock(MapMessage.class);
final Session session = mock(Session.class);
given(session.createMapMessage()).willReturn(message);
final Map<Integer, String> content = new HashMap<Integer, String>(1);
content.put(1, "value1");
final SimpleMessageConverter converter = new SimpleMessageConverter();
try {
converter.toMessage(content, session);
fail("expected MessageConversionException");
} catch (MessageConversionException ex) { /* expected */ }
}
@Test
public void testMapConversion() throws JMSException {
Session session = mock(Session.class);
MapMessage message = mock(MapMessage.class);
Map<String, String> content = new HashMap<>(2);
content.put("key1", "value1");
content.put("key2", "value2");
given(session.createMapMessage()).willReturn(message);
given(message.getMapNames()).willReturn(Collections.enumeration(content.keySet()));
given(message.getObject("key1")).willReturn("value1");
given(message.getObject("key2")).willReturn("value2");
SimpleMessageConverter converter = new SimpleMessageConverter();
Message msg = converter.toMessage(content, session);
assertEquals(content, converter.fromMessage(msg));
verify(message).setObject("key1", "value1");
verify(message).setObject("key2", "value2");
}
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61616");
Connection connection = connectionFactory.createQueueConnection();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = new ActiveMQQueue("/atestqueue");
MessageProducer producer = session1.createProducer(destination);
MessageConsumer consumer = session2.createConsumer(destination);
consumer.setMessageListener(new MessageOrderingTest());
connection.start();
for (int i = 0; i < 10000; i++) {
MapMessage message = session1.createMapMessage();
message.setInt("Counter", i);
producer.send(message);
System.out.println("Sent counter=" + i);
}
}
private void sendMapMessageUsingOpenWire() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
final ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setBoolean("aboolean", true);
mapMessage.setByte("abyte", (byte) 4);
mapMessage.setBytes("abytes", new byte[]{4, 5});
mapMessage.setChar("achar", 'a');
mapMessage.setDouble("adouble", 4.4);
mapMessage.setFloat("afloat", 4.5f);
mapMessage.setInt("aint", 40);
mapMessage.setLong("along", 80L);
mapMessage.setShort("ashort", (short) 65);
mapMessage.setString("astring", "hello");
producer.send(mapMessage);
}
public void testBrokerStats() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
Queue query = session.createQueue(StatisticsBroker.STATS_BROKER_PREFIX);
MessageProducer producer = session.createProducer(query);
Message msg = session.createMessage();
msg.setJMSReplyTo(replyTo);
producer.send(msg);
MapMessage reply = (MapMessage) consumer.receive(10 * 1000);
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
assertTrue(reply.getJMSTimestamp() > 0);
assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());
/*
for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
String name = e.nextElement().toString();
System.err.println(name+"="+reply.getObject(name));
}
*/
}
@Test
public void testMapConversionWhereMapHasNonStringTypesForKeys() throws JMSException {
MapMessage message = mock(MapMessage.class);
Session session = mock(Session.class);
given(session.createMapMessage()).willReturn(message);
Map<Integer, String> content = new HashMap<>(1);
content.put(1, "value1");
SimpleMessageConverter converter = new SimpleMessageConverter();
try {
converter.toMessage(content, session);
fail("expected MessageConversionException");
}
catch (MessageConversionException ex) { /* expected */ }
}
@Test
public void testMapConversionWhereMapHasNNullForKey() throws JMSException {
MapMessage message = mock(MapMessage.class);
Session session = mock(Session.class);
given(session.createMapMessage()).willReturn(message);
Map<Object, String> content = new HashMap<>(1);
content.put(null, "value1");
SimpleMessageConverter converter = new SimpleMessageConverter();
try {
converter.toMessage(content, session);
fail("expected MessageConversionException");
}
catch (MessageConversionException ex) { /* expected */ }
}
/**
* 处理评测机完成单个测试点的消息.
* @param mapMessage - 消息队列中收到的MapMessage对象
* @throws JMSException
*/
private void testPointFinishedHandler(MapMessage mapMessage) throws JMSException {
long submissionId = mapMessage.getLong("submissionId");
int checkpointId = mapMessage.getInt("checkpointId");
String runtimeResult = mapMessage.getString("runtimeResult");
int usedTime = mapMessage.getInt("usedTime");
int usedMemory = mapMessage.getInt("usedMemory");
int score = mapMessage.getInt("score");
String message = String.format("- Test Point #%d: %s, Time = %d ms, Memory = %d KB, Score = %d\n",
new Object[] { checkpointId, runtimeResult, usedTime, usedMemory, score });
eventPublisher.publishEvent(new SubmissionEvent(this, submissionId, "Running", message, false));
LOGGER.info(String.format("Submission #%d/ CheckPoint#%d returned [%s] (Time = %dms, Memory = %d KB, Score = %d).",
new Object[] { submissionId, checkpointId, runtimeResult, usedTime, usedMemory, score }));
}
public void createEntityUsingAmqpManagement(final String name,
final Session session,
final String type,
Map<String, Object> attributes)
throws JMSException
{
MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
MapMessage createMessage = session.createMapMessage();
createMessage.setStringProperty("type", type);
createMessage.setStringProperty("operation", "CREATE");
createMessage.setString("name", name);
createMessage.setString("object-path", name);
for (Map.Entry<String, Object> entry : attributes.entrySet())
{
createMessage.setObject(entry.getKey(), entry.getValue());
}
producer.send(createMessage);
if (session.getTransacted())
{
session.commit();
}
producer.close();
}
@Override
public void onMessage(Message message) {
try {
TextMessage receiveMessage = (TextMessage) message;
String keys = receiveMessage.getText();
LOGGER.info("keys = " + keys);
MapMessage returnMess = session.createMapMessage();
returnMess.setStringProperty("/a2/m1", "zhaohui");
returnMess.setStringProperty("/a3/m1/v2", "nanjing");
returnMess.setStringProperty("/a3/m1/v2/t2", "zhaohui");
QueueSender sender = session.createSender((Queue) message.getJMSReplyTo());
sender.send(returnMess);
} catch (Exception e) {
LOGGER.error("onMessage error", e);
}
}
@Override
public void marshal(MapMessage to) throws JMSException {
super.marshal(to);
// add parameters
int i = 0;
for (String parameter : parameters) {
logger.debug("populated map message with " + KEY_PARAMETER + i + ": " + new String(parameter));
to.setString(KEY_PARAMETER + i, parameter);
i++;
}
// add named parameters
logger.debug("Marshalling named parameters: " + namedParameters.keySet());
int j = 0;
for (String key : namedParameters.keySet()) {
to.setString(KEY_NAMED_PARAMETER_KEY + j, key);
to.setString(KEY_NAMED_PARAMETER_VALUE + j, namedParameters.get(key));
j++;
}
}
@Test
public void publishMapMessage() throws Exception
{
final Map<String, Object> content = new HashMap<>();
content.put("key1", "astring");
content.put("key2", Integer.MIN_VALUE);
content.put("key3", Long.MAX_VALUE);
content.put("key4", null);
MapMessage message = publishMessageWithContent(content, MapMessage.class);
final Enumeration mapNames = message.getMapNames();
int entryCount = 0;
while(mapNames.hasMoreElements())
{
String key = (String) mapNames.nextElement();
assertThat("Unexpected map content for key : " + key, message.getObject(key), is(equalTo(content.get(key))));
entryCount++;
}
assertThat("Unexpected number of key/value pairs in map message", entryCount, is(equalTo(content.size())));
}
public void unmarshal(MapMessage from) throws JMSException {
super.unmarshal(from);
try {
String messageString = from.getString(KEY_URL_LIST);
String[] urlArray = messageString.split(URL_DELIMITER);
urlList = new LinkedList<URL>();
for (String urlString : urlArray) {
urlList.add(new URL(urlString));
}
} catch (MalformedURLException e) {
handleException(e);
}
}
private MapMessage map(final org.apache.logging.log4j.message.MapMessage<?, ?> log4jMapMessage,
final MapMessage jmsMapMessage) {
// Map without calling org.apache.logging.log4j.message.MapMessage#getData() which makes a copy of the map.
log4jMapMessage.forEach(new BiConsumer<String, Object>() {
@Override
public void accept(final String key, final Object value) {
try {
jmsMapMessage.setObject(key, value);
} catch (final JMSException e) {
throw new IllegalArgumentException(String.format("%s mapping key '%s' to value '%s': %s",
e.getClass(), key, value, e.getLocalizedMessage()), e);
}
}
});
return jmsMapMessage;
}
/**
* The basic message sending method. Sends a message without reply possibility.
* Not multithread safe.
*/
public void sendMessage(ChipsterMessage message) throws JMSException {
// log
logger.debug("sending " + message);
// marshal message to MapMessage
MapMessage mapMessage = session.createMapMessage();
message.marshal(mapMessage);
MessageProducer producer = null;
try {
producer = session.createProducer(topic);
producer.send(mapMessage);
} finally {
try {
producer.close();
} catch (Exception e) {
}
}
}
/**
* Send a <code>MapMessage</code> with 2 Java primitives in its body (a <code>
* String</code> and a <code>double</code>).
* <br />
* Receive it and test that the values of the primitives of the body are correct
*/
@Test
public void testMapMessage_2() {
try {
MapMessage message = senderSession.createMapMessage();
message.setString("name", "pi");
message.setDouble("value", 3.14159);
sender.send(message);
Message m = receiver.receive(TestConfig.TIMEOUT);
Assert.assertTrue("The message should be an instance of MapMessage.\n", m instanceof MapMessage);
MapMessage msg = (MapMessage) m;
Assert.assertEquals("pi", msg.getString("name"));
Assert.assertEquals(3.14159, msg.getDouble("value"), 0);
} catch (JMSException e) {
fail(e);
}
}
/**
* MessageListener回调函数.
*/
@Override
public void onMessage(Message message) {
try {
MapMessage mapMessage = (MapMessage) message;
// 打印消息详情
logger.info("UserName:{}, Email:{}", mapMessage.getString("userName"), mapMessage.getString("email"));
// 发送邮件
// if (simpleMailService != null) {
// simpleMailService.sendNotificationMail(mapMessage.getString("userName"));
// }
} catch (Exception e) {
logger.error("处理消息时发生异常.", e);
}
}
@Override
public void marshal(MapMessage to) throws JMSException {
super.marshal(to);
// marshal urls into a single string
String marshalledLog = Strings.delimit(logs, ";");
// log
logger.debug("Marshalling: " + KEY_DETAILS + " : " + details);
logger.debug("Marshalling: " + KEY_EMAIL + " : " + email);
logger.debug("Marshalling: " + KEY_SESSION + " : " + url);
logger.debug("Marshalling: " + KEY_LOGS + " : " + marshalledLog);
// add details
to.setString(KEY_DETAILS, details);
to.setString(KEY_EMAIL, email);
to.setString(KEY_SESSION, url);
to.setString(KEY_LOGS, marshalledLog);
}
public static Map<String, String> createMapMessageValues(final MapMessage mapMessage) throws JMSException {
final Map<String, String> valueMap = new HashMap<>();
final Enumeration<?> enumeration = mapMessage.getMapNames();
while (enumeration.hasMoreElements()) {
final String name = (String) enumeration.nextElement();
final Object value = mapMessage.getObject(name);
if (value == null) {
valueMap.put(MAP_MESSAGE_PREFIX + name, "");
} else {
valueMap.put(MAP_MESSAGE_PREFIX + name, value.toString());
}
}
return valueMap;
}
@Override
public void unmarshal(MapMessage from) throws JMSException {
super.unmarshal(from);
// load parameters
this.parameters = new ArrayList<String>();
String input;
for (int i = 0; from.itemExists(KEY_PARAMETER + i); i++) {
input = from.getString(KEY_PARAMETER + i);
logger.debug("parameter " + (KEY_PARAMETER + i) + " is " + new String(input));
this.parameters.add(input);
}
// load named parameters
String key;
String value;
for (int i = 0; from.itemExists(KEY_NAMED_PARAMETER_KEY + i); i++) {
key = from.getString(KEY_NAMED_PARAMETER_KEY + i);
value = from.getString(KEY_NAMED_PARAMETER_VALUE + i);
logger.debug("Unmarshalled named parameter: " + key + " = " + value);
this.namedParameters.put(key, value);
}
}
private MapMessage createLargeMessage() throws JMSException {
MapMessage message = session.createMapMessage();
for (int i = 0; i < 10; i++) {
message.setBytes("test" + i, new byte[1024 * 1024]);
}
return message;
}
private Callable<Boolean> isMessagesConsumed(final JMSQueueMessageConsumer consumer) {
return new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
Message msg = consumer.popRawMessage();
if (msg instanceof MapMessage) {
messages.add(msg);
}
return messages.size() == NUM_OF_MESSAGES;
}
};
}
/**
* Send a <code>MapMessage</code> with an empty body.
* <br />
* Receive it and test if the message is effectively an instance of
* <code>MapMessage</code>
*/
@Test
public void testMapMessage_1() {
try {
MapMessage message = senderSession.createMapMessage();
sender.send(message);
Message msg = receiver.receive(TestConfig.TIMEOUT);
Assert.assertTrue("The message should be an instance of MapMessage.\n", msg instanceof MapMessage);
} catch (JMSException e) {
fail(e);
}
}
@Override
public MapMessage createMapMessage() {
checkSession();
try {
return session.createMapMessage();
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
}