下面列出了怎么用javax.jms.Message的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Tests if unacknowledged messages are being re-delivered when the consumer connects again.
*
* @throws JMSException
*/
public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException {
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello"));
// Consume the message...
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(1000);
assertNotNull(msg);
// Don't ack the message.
// Reset the session. This should cause the unacknowledged message to be re-delivered.
session.close();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Attempt to Consume the message...
consumer = session.createConsumer(queue);
msg = consumer.receive(2000);
assertNotNull(msg);
msg.acknowledge();
session.close();
}
public void testLikeComparisons() throws Exception {
Message message = createMessage();
assertSelector(message, "quote LIKE '''In G_d We Trust'''", true);
assertSelector(message, "quote LIKE '''In Gd_ We Trust'''", false);
assertSelector(message, "quote NOT LIKE '''In G_d We Trust'''", false);
assertSelector(message, "quote NOT LIKE '''In Gd_ We Trust'''", true);
assertSelector(message, "foo LIKE '%oo'", true);
assertSelector(message, "foo LIKE '%ar'", false);
assertSelector(message, "foo NOT LIKE '%oo'", false);
assertSelector(message, "foo NOT LIKE '%ar'", true);
assertSelector(message, "foo LIKE '!_%' ESCAPE '!'", true);
assertSelector(message, "quote LIKE '!_%' ESCAPE '!'", false);
assertSelector(message, "foo NOT LIKE '!_%' ESCAPE '!'", false);
assertSelector(message, "quote NOT LIKE '!_%' ESCAPE '!'", true);
assertSelector(message, "punctuation LIKE '!#$&()*+,-./:;<=>[email protected][\\]^`{|}~'", true);
}
@Override
public void onMessage(Message message) {
try {
if (TextMessage.class.isAssignableFrom(message.getClass())) {
String receiverClassName = getClass().getName();
logger.info("Receiver {}, sending message.", receiverClassName);
logger.debug("Event message: {}", message);
TextMessage textMessage = (TextMessage) message;
T event = gson.fromJson(textMessage.getText(), eventClass);
logger.debug("{} event {}", receiverClassName, event);
handleEvent(event);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
private Object[] wrapMessageForAmq5(final Object[] args) {
if (args == null || args.length != 1 || DelegateMessage.class.isInstance(args[0])) {
return args;
}
if (isAmq == null) {
synchronized (this) {
if (isAmq == null) {
isAmq = args[0].getClass().getName().startsWith("org.apache.activemq.");
}
}
}
if (isAmq) {
args[0] = JMS2.wrap(Message.class.cast(args[0]));
}
return args;
}
@Override
protected void verifyPublishedMessage() throws Exception {
final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
verify(messageProducer, timeout(1000)).send(messageCaptor.capture(), any(CompletionListener.class));
final Message message = messageCaptor.getValue();
assertThat(message).isNotNull();
assertThat(message.getStringProperty("thing_id")).isEqualTo(TestConstants.Things.THING_ID.toString());
assertThat(message.getStringProperty("suffixed_thing_id")).isEqualTo(
TestConstants.Things.THING_ID + ".some.suffix");
assertThat(message.getStringProperty("prefixed_thing_id")).isEqualTo(
"some.prefix." + TestConstants.Things.THING_ID);
assertThat(message.getStringProperty("eclipse")).isEqualTo("ditto");
assertThat(message.getStringProperty("device_id"))
.isEqualTo(TestConstants.Things.THING_ID.toString());
}
@Test
public void receiveTimeoutExpired() {
JmsInvokerProxyFactoryBean pfb = new JmsInvokerProxyFactoryBean() {
@Override
protected Message doExecuteRequest(Session session, Queue queue, Message requestMessage) throws JMSException {
return null; // faking no message received
}
};
pfb.setServiceInterface(ITestBean.class);
pfb.setConnectionFactory(this.mockConnectionFactory);
pfb.setQueue(this.mockQueue);
pfb.setReceiveTimeout(1500);
pfb.afterPropertiesSet();
ITestBean proxy = (ITestBean) pfb.getObject();
assertThatExceptionOfType(RemoteTimeoutException.class).isThrownBy(() ->
proxy.getAge())
.withMessageContaining("1500 ms")
.withMessageContaining("getAge");
}
/**
* Topics shouldn't hold on to messages if there are no subscribers
*/
@Test
public void testPersistentMessagesForTopicDropped() throws Exception {
TopicConnection topicConn = createTopicConnection();
TopicSession sess = topicConn.createTopicSession(true, 0);
TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1);
pub.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = sess.createTextMessage("testing123");
pub.publish(m);
sess.commit();
topicConn.close();
checkEmpty(ActiveMQServerTestCase.topic1);
}
/**
* This implementation converts a TextMessage back to a String, a
* ByteMessage back to a byte array, a MapMessage back to a Map,
* and an ObjectMessage back to a Serializable object. Returns
* the plain Message object in case of an unknown message type.
*
* @return payload
* @throws javax.jms.JMSException
*/
@Override
public Object convert(Message message) throws JMSException
{
if (message instanceof TextMessage) {
return ((TextMessage)message).getText();
} else if (message instanceof StreamMessage) {
return ((StreamMessage)message).readString();
} else if (message instanceof BytesMessage) {
return extractByteArrayFromMessage((BytesMessage)message);
} else if (message instanceof MapMessage) {
return extractMapFromMessage((MapMessage)message);
} else if (message instanceof ObjectMessage) {
return extractSerializableFromMessage((ObjectMessage)message);
} else {
return message;
}
}
@Test
public void testMapMessageCompression() throws Exception {
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 1, included);
MapMessage test = localSession.createMapMessage();
for (int i = 0; i < 100; ++i) {
test.setString(Integer.toString(i), "test string: " + i);
}
producer.send(test);
Message msg = consumer1.receive(RECEIVE_TIMEOUT_MILLS);
assertNotNull(msg);
ActiveMQMapMessage message = (ActiveMQMapMessage) msg;
assertTrue(message.isCompressed());
for (int i = 0; i < 100; ++i) {
assertEquals("test string: " + i, message.getString(Integer.toString(i)));
}
}
protected Message makeMessage(Session session, ModelService modelService, Map<String, Object> context)
throws GenericServiceException, JMSException {
List<String> outParams = modelService.getParameterNames(ModelService.OUT_PARAM, false);
if (UtilValidate.isNotEmpty(outParams))
throw new GenericServiceException("JMS service cannot have required OUT parameters; no parameters will be returned.");
String xmlContext = null;
try {
if (Debug.verboseOn()) Debug.logVerbose("Serializing Context --> " + context, module);
xmlContext = JmsSerializer.serialize(context);
} catch (SerializeException | IOException e) {
throw new GenericServiceException("Cannot serialize context.", e);
}
MapMessage message = session.createMapMessage();
message.setString("serviceName", modelService.invoke);
message.setString("serviceContext", xmlContext);
return message;
}
public void testMessageExpiredAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
assertNotNull(consumer);
Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
producer.setTimeToLive(1);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(2000);
assertNotNull(msg);
}
/**
* Tests if the messages received are valid.
*
* @param receivedMessages - list of received messages.
* @throws JMSException
*/
protected void assertMessagesReceivedAreValid(List<Message> receivedMessages) throws JMSException {
List<Object> copyOfMessages = Arrays.asList(receivedMessages.toArray());
int counter = 0;
if (data.length != copyOfMessages.size()) {
for (Iterator<Object> iter = copyOfMessages.iterator(); iter.hasNext(); ) {
TextMessage message = (TextMessage) iter.next();
}
}
assertEquals("Not enough messages received", data.length, receivedMessages.size());
for (int i = 0; i < data.length; i++) {
TextMessage received = (TextMessage) receivedMessages.get(i);
String text = received.getText();
String stringProperty = received.getStringProperty("stringProperty");
int intProperty = received.getIntProperty("intProperty");
assertEquals("Message: " + i, data[i], text);
assertEquals(data[i], stringProperty);
assertEquals(i, intProperty);
}
}
@Test
public void testBrowseWithSelector() throws Exception {
try {
conn = getConnectionFactory().createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue1);
final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
Message m = session.createMessage();
m.setIntProperty("test_counter", i + 1);
producer.send(m);
}
} finally {
removeAllMessages(queue1.getQueueName(), true);
}
}
private void read(Message message) throws JMSException {
setJMSCorrelationID(message.getJMSCorrelationID());
setJMSDeliveryMode(Integer.valueOf(message.getJMSDeliveryMode()));
setJMSExpiration(Long.valueOf(message.getJMSExpiration()));
setJMSMessageID(message.getJMSMessageID());
setJMSPriority(Integer.valueOf(message.getJMSPriority()));
setJMSRedelivered(Boolean.valueOf(message.getJMSRedelivered()));
setJMSTimeStamp(Long.valueOf(message.getJMSTimestamp()));
setJMSType(message.getJMSType());
setSOAPJMSTargetService(message.getStringProperty(JMSSpecConstants.TARGETSERVICE_FIELD));
setSOAPJMSBindingVersion(message.getStringProperty(JMSSpecConstants.BINDINGVERSION_FIELD));
setSOAPJMSContentType(message.getStringProperty(JMSSpecConstants.CONTENTTYPE_FIELD));
setSOAPJMSContentEncoding(message.getStringProperty(JMSSpecConstants.CONTENTENCODING_FIELD));
setSOAPJMSSOAPAction(message.getStringProperty(JMSSpecConstants.SOAPACTION_FIELD));
if (message.propertyExists(JMSSpecConstants.ISFAULT_FIELD)) {
setSOAPJMSIsFault(message.getBooleanProperty(JMSSpecConstants.ISFAULT_FIELD));
}
setSOAPJMSRequestURI(message.getStringProperty(JMSSpecConstants.REQUESTURI_FIELD));
setJMSReplyTo(getDestName(message));
readProperties(message);
}
@Override
public void onMessage(final Message request) {
try {
System.out.println("Received request message: " + ((TextMessage) request).getText());
// Extract the ReplyTo destination
Destination replyDestination = request.getJMSReplyTo();
System.out.println("Reply to queue: " + replyDestination);
// Create the reply message
TextMessage replyMessage = session.createTextMessage("A reply message");
// Set the CorrelationID, using message id.
replyMessage.setJMSCorrelationID(request.getJMSMessageID());
// Send out the reply message
replyProducer.send(replyDestination, replyMessage);
System.out.println("Reply sent");
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
Session session = createSession();
MessageConsumer consumer = session.createConsumer(queue);
barrierLatch.await();
while (true) {
Message msg = consumer.receive(CONSUMER_WAIT_TIME_MS);
if (msg == null)
break;
msgReceived.incrementAndGet();
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
runnersLatch.countDown();
}
}
/**
* Test that a <code>Destination</code> set by the <code>setJMSReplyTo()</code>
* method on a sended message corresponds to the <code>Destination</code> get by
* the </code>getJMSReplyTo()</code> method.
*/
@Test
public void testJMSReplyTo_1() {
try {
Message message = senderSession.createMessage();
message.setJMSReplyTo(senderQueue);
sender.send(message);
Message msg = receiver.receive(TestConfig.TIMEOUT);
Destination dest = msg.getJMSReplyTo();
Assert.assertTrue("JMS ReplyTo header field should be a Queue", dest instanceof Queue);
Queue replyTo = (Queue) dest;
Assert.assertEquals("JMS ReplyTo header field should be equals to the sender queue", replyTo.getQueueName(), senderQueue.getQueueName());
} catch (JMSException e) {
fail(e);
}
}
@Test
public void testSendTopic() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = factory.createQueueConnection("jdoe", "sunflower");
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
Destination topic = sess.createTopic("TEST.BAR");
MessageProducer producer = sess.createProducer(topic);
MessageConsumer consumer = sess.createConsumer(topic);
producer.send(sess.createTextMessage("test"));
Message msg = consumer.receive(1000);
assertNotNull(msg);
}
@Override
public void onMessage(Message message) {
try {
logger.log(Level.INFO,
"Rejected registration attempt of cargo with tracking ID {0}.",
message.getBody(String.class));
} catch (JMSException ex) {
logger.log(Level.WARNING, "Error processing message.", ex);
}
}
@Override
public void onMessage(Message message) {
try {
logger.log(Level.INFO,
"Cargo with tracking ID {0} delivered.",
message.getBody(String.class));
} catch (JMSException ex) {
logger.log(Level.WARNING, "Error processing message.", ex);
}
}
/**
* Send a <code>ObjectMessage</code> with an empty body.
* <br />
* Receive it and test if the message is effectively an instance of
* <code>ObjectMessage</code>
*/
@Test
public void testObjectMessage_1() {
try {
ObjectMessage message = senderSession.createObjectMessage();
sender.send(message);
Message msg = receiver.receive(TestConfig.TIMEOUT);
Assert.assertTrue("The message should be an instance of ObjectMessage.\n", msg instanceof ObjectMessage);
} catch (JMSException e) {
fail(e);
}
}
/**
* Converts message into SupervisionEvent and notifies registered listeners.
*
* <p>All exceptions are caught and logged (both exceptions in message conversion
* and thrown by the listeners).
*/
@Override
public void onMessage(final Message message) {
try {
if (message instanceof TextMessage) {
if (log.isTraceEnabled()) {
log.trace("AbstractQueuedWrapper received message for {}: {}", this.getClass().getSimpleName(), ((TextMessage)message).getText());
}
U event = convertMessage(message);
long lastNotificationTime = notificationTime.get();
if (lastNotificationTime != 0 && (System.currentTimeMillis() - lastNotificationTime) > notificationTimeBeforeWarning.get()) {
String warning = "Slow consumer class: " + this.getClass().getSimpleName() + ". "
+ "C2MON client is not consuming updates correctly and should be restarted! "
+ " Event type: " + getDescription(event);
log.warn(warning);
log.warn("No returning call from listener since {}", new Timestamp(lastNotificationTime));
slowConsumerListener.onSlowConsumer(warning);
}
eventQueue.put(event);
} else {
log.warn("Non-text message received for " + this.getClass().getSimpleName() + " - ignoring event");
}
} catch (Exception e) {
log.error("Exception caught while processing incoming server event with " + this.getClass().getSimpleName(), e);
}
}
@Override
public void onMessage(Message message) {
latch.countDown();
try {
System.out.println("Received " + ((TextMessage) message).getText());
} catch (Exception e) {
e.printStackTrace();
}
}
@Test(timeout = 10000)
public void testSendToQueueWithDeliveryOptsWithNullOnExplicitQueueSenderThrowsInvalidDestinationException() throws Exception {
Queue queue = session.createQueue(getTestName());
QueueSender sender = session.createSender(queue);
Message message = session.createMessage();
try {
sender.send((Queue) null, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
fail("Expected exception to be thrown");
} catch (InvalidDestinationException ide) {
// expected
}
}
/**
* Receive a batch of up to batchSize for given destination and message selector. Other than batching this method is the same as {@link JmsTemplate#receiveSelected(Destination, String)}
* @return A list of {@link Message}
* @param destination The Destination
* @param messageSelector The Selector
* @param batchSize The batch size
* @throws JmsException The {@link JmsException}
*/
public List<Message> receiveSelectedBatch(final Destination destination, final String messageSelector,
final int batchSize) throws JmsException {
return execute(new SessionCallback<List<Message>>() {
public List<Message> doInJms(Session session) throws JMSException {
return doBatchReceive(session, destination, messageSelector, batchSize);
}
}, true);
}
@Test
public void testToMessageSimplyReturnsMessageAsIsIfSuppliedWithMessage() throws JMSException {
Session session = mock(Session.class);
ObjectMessage message = mock(ObjectMessage.class);
SimpleMessageConverter converter = new SimpleMessageConverter();
Message msg = converter.toMessage(message, session);
assertSame(message, msg);
}
@Override
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
Enumeration<String> names = message.getPropertyNames();
messageProperties = new HashMap<String, String>();
while (names.hasMoreElements()) {
String name = names.nextElement();
messageProperties.put(name, message.getStringProperty(name));
}
BytesMessage bm = (BytesMessage) message;
byte[] transfer = new byte[(int) bm.getBodyLength()];
bm.readBytes(transfer);
return new String(transfer);
}
/**
* if a property is set as a <code>double</code>,
* it can also be read as a <code>java.lang.String</code>.
*/
@Test
public void testDouble2String() {
try {
Message message = senderSession.createMessage();
message.setDoubleProperty("prop", 127.0);
Assert.assertEquals("127.0", message.getStringProperty("prop"));
} catch (JMSException e) {
fail(e);
}
}
@Test(timeout = 40000)
public void testBrowseAllInQueue() throws Exception {
connection = createAmqpConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendToAmqQueue(5);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(5, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
TimeUnit.MILLISECONDS.sleep(50);
}
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
}
protected int receiveAllMessages(MessageConsumer consumer) throws Exception {
int msgsReceived = 0;
Message msg;
do {
msg = consumer.receive(1000);
if (msg != null) {
msgsReceived++;
}
} while (msg != null);
return msgsReceived;
}