下面列出了怎么用javax.jms.TextMessage的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testJMSContextConsumerThrowsMessageFormatExceptionOnMalformedBody() throws Exception {
Queue queue = createQueue(true, "ContextMalformedBodyTestQueue");
JMSContext context = qraConnectionFactory.createContext();
JMSProducer producer = context.createProducer();
TextMessage message = context.createTextMessage("TestMessage");
producer.send(queue, message);
JMSConsumer consumer = context.createConsumer(queue);
try {
consumer.receiveBody(Boolean.class);
fail("Should thrown MessageFormatException");
} catch (MessageFormatRuntimeException mfre) {
// Do nothing test passed
} catch (Exception e) {
fail("Threw wrong exception, should be MessageFormatRuntimeException, instead got: " + e.getClass().getCanonicalName());
}
}
public TextMessage pushMessage(String messageContent) {
if (this.producer == null) {
log.error("The producer is null");
Assert.fail();
return null;
} else {
TextMessage message = null;
try {
message = this.session.createTextMessage(messageContent);
this.producer.send(message);
} catch (JMSException e) {
log.error("Error while sending message", e);
Assert.fail();
}
return message;
}
}
/**
* Test the BETWEEN condition in message selector.
* <br />
* <ul>
* <li>"age BETWEEN 15 and 19" is <code>true</code> for 17 and <code>false</code> for 20</li>
* </ul>
*/
@Test
public void testBetween() throws Exception {
if (receiver != null) {
receiver.close();
}
receiver = receiverSession.createReceiver(receiverQueue, "age BETWEEN 15 and 19");
TextMessage dummyMessage = senderSession.createTextMessage();
dummyMessage.setIntProperty("age", 20);
dummyMessage.setText("testBetween:1");
sender.send(dummyMessage);
TextMessage message = senderSession.createTextMessage();
message.setIntProperty("age", 17);
message.setText("testBetween:2");
sender.send(message);
TextMessage msg = (TextMessage) receiver.receive(TestConfig.TIMEOUT);
Assert.assertTrue("Message not received", msg != null);
Assert.assertTrue("Message of another test: " + msg.getText(), msg.getText().startsWith("testBetween"));
Assert.assertEquals("testBetween:2", msg.getText());
}
@Test
/** test the message impl */
public void testListening() throws JMSException {
try {
listener.init();
TextMessage message = mock(TextMessage.class);
when(message.getText()).thenReturn("{messgetext:true}");
when(message.getStringProperty("task_id")).thenReturn("corel-id");
when(message.getStringProperty("task_result_code")).thenReturn(
"200");
when(message.getStringProperty("type")).thenReturn("deploybom");
when(message.getJMSCorrelationID()).thenReturn("jms|cor!rel!ation!id");
listener.onMessage(message);
listener.cleanup();
listener.getConnectionStats();
} catch (JMSException e) {
System.out.println("CAUTH EXCEPTION " + e.getMessage());
e.printStackTrace();
throw e;
}
}
@Override
public void onMessage(final Message message) {
// 使用线程池多线程处理
threadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
LOGGER.info("消费:{}", text);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
@Test
public void testPropertySelector() throws Exception {
int remaining = 5;
Message message = null;
consumer = createConsumer("stringProperty = 'a' and longProperty = 1 and booleanProperty = true");
sendMessages();
while (true) {
message = consumer.receive(1000);
if (message == null) {
break;
}
String text = ((TextMessage) message).getText();
if (!text.equals("1") && !text.equals("3")) {
fail("unexpected message: " + text);
}
remaining--;
}
assertEquals(remaining, 3);
consumer.close();
consumeMessages(remaining);
}
@Test
public void testSendMessageWithReceipt() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
conn.connect(defUser, defPass);
send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow = System.currentTimeMillis();
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
conn.disconnect();
}
@Test
public void replyPayloadNoDestination() throws JMSException {
Queue replyDestination = mock(Queue.class);
Session session = mock(Session.class);
MessageProducer messageProducer = mock(MessageProducer.class);
TextMessage responseMessage = mock(TextMessage.class);
given(session.createTextMessage("Response")).willReturn(responseMessage);
given(session.createProducer(replyDestination)).willReturn(messageProducer);
MessagingMessageListenerAdapter listener =
getPayloadInstance("Response", "replyPayloadNoDestination", Message.class);
listener.setDefaultResponseDestination(replyDestination);
listener.onMessage(mock(javax.jms.Message.class), session);
verify(session, times(0)).createQueue(anyString());
verify(session).createTextMessage("Response");
verify(messageProducer).send(responseMessage);
verify(messageProducer).close();
}
@Test
public void testAnycastQueueToMulticastTopicBothAddress() throws Exception {
String address = "testBoth";
String clientId = "test-client-id";
File file = createMessageFile();
connection.setClientID(clientId);
createBothTypeAddress(address);
createQueue(RoutingType.ANYCAST, address, address);
Session session = createSession(connection);
TopicSubscriber subscriber = session.createDurableSubscriber(session.createTopic(address), "test-subscriber");
List<Message> messages = generateTextMessages(session, getDestination(address));
exportMessages(address, file);
importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + address, file);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
TextMessage messageReceived = (TextMessage) subscriber.receive(1000);
assertNotNull(messageReceived);
assertEquals(((TextMessage) messages.get(i)).getText(), messageReceived.getText());
}
}
protected void sendMessageToLocalBroker(String text) throws JMSException {
Connection connection = null;
try {
connection = foreignConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(inbound);
TextMessage message = session.createTextMessage();
message.setText(text);
producer.send(message);
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
@Test
public void replyPayloadToQueue() throws JMSException {
Session session = mock(Session.class);
Queue replyDestination = mock(Queue.class);
given(session.createQueue("queueOut")).willReturn(replyDestination);
MessageProducer messageProducer = mock(MessageProducer.class);
TextMessage responseMessage = mock(TextMessage.class);
given(session.createTextMessage("Response")).willReturn(responseMessage);
given(session.createProducer(replyDestination)).willReturn(messageProducer);
MessagingMessageListenerAdapter listener = getPayloadInstance("Response", "replyPayloadToQueue", Message.class);
listener.onMessage(mock(javax.jms.Message.class), session);
verify(session).createQueue("queueOut");
verify(session).createTextMessage("Response");
verify(messageProducer).send(responseMessage);
verify(messageProducer).close();
}
/**
* Wrap message
*
* @param message The message to be wrapped
* @return The wrapped message
*/
Message wrapMessage(final Message message) {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("wrapMessage(" + message + ")");
}
if (message instanceof BytesMessage) {
return new ActiveMQRABytesMessage((BytesMessage) message, session);
} else if (message instanceof MapMessage) {
return new ActiveMQRAMapMessage((MapMessage) message, session);
} else if (message instanceof ObjectMessage) {
return new ActiveMQRAObjectMessage((ObjectMessage) message, session);
} else if (message instanceof StreamMessage) {
return new ActiveMQRAStreamMessage((StreamMessage) message, session);
} else if (message instanceof TextMessage) {
return new ActiveMQRATextMessage((TextMessage) message, session);
}
return new ActiveMQRAMessage(message, session);
}
private void sendTextMessages(int nMsgs, ConnectionFactory factory) throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue(testQueueName);
MessageProducer producer = session.createProducer(queue);
TextMessage msg = session.createTextMessage();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < PAYLOAD; ++i) {
builder.append("A");
}
msg.setText(builder.toString());
for (int i = 0; i < nMsgs; ++i) {
msg.setIntProperty("i", (Integer) i);
producer.send(msg);
}
}
}
@Test
public void testRollback() throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
producer.send(session.createTextMessage("test"));
producer.send(session.createTextMessage("test2"));
connection.start();
Assert.assertNull(consumer.receiveNoWait());
session.rollback();
producer.send(session.createTextMessage("test2"));
Assert.assertNull(consumer.receiveNoWait());
session.commit();
TextMessage msg = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(msg);
Assert.assertEquals("test2", msg.getText());
}
}
private void putQueue(List<String> events) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME,
PASSWORD, BROKER_BIND_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(DESTINATION_NAME);
MessageProducer producer = session.createProducer(destination);
for(String event : events) {
TextMessage message = session.createTextMessage();
message.setText(event);
producer.send(message);
}
session.commit();
session.close();
connection.close();
}
@Test
public void testSimpleSendNoXAJMSContext() throws Exception {
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
try (ClientSessionFactory sf = locator.createSessionFactory();
ClientSession session = sf.createSession();
ClientConsumer consVerify = session.createConsumer(MDBQUEUE);
JMSContext jmsctx = qraConnectionFactory.createContext();
) {
session.start();
// These next 4 lines could be written in a single line however it makes difficult for debugging
JMSProducer producer = jmsctx.createProducer();
producer.setProperty("strvalue", "hello");
TextMessage msgsend = jmsctx.createTextMessage("hello");
producer.send(q, msgsend);
ClientMessage msg = consVerify.receive(1000);
assertNotNull(msg);
assertEquals("hello", msg.getStringProperty("strvalue"));
}
}
/**
* Publish ci state message.
*
* @param event the event
*/
public void publishCiStateMessage(CiChangeStateEvent event) {
try {
TextMessage message = session.createTextMessage(gson.toJson(event));
message.setLongProperty("ciId", event.getCiId());
message.setStringProperty("type", "ci-change-state");
message.setStringProperty("ciState", event.getNewState());
producer.send(message);
logger.info("Published: ciId:" + event.getCiId());
logger.info(message.getText());
} catch (JMSException e) {
// TODO see if we can put some durability here
logger.error("caught Exception publishing",e);
}
}
public void sendMessages(ConnectionFactory connectionFactory) throws Exception {
for (int i = 0; i < NUM_MESSAGES; i++) {
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE);
MessageProducer producer = session.createProducer(destination);
String msgTo = "hello";
TextMessage message = session.createTextMessage(msgTo);
producer.send(message);
connection.close();
LOG.debug("sent " + i + " messages using " + connectionFactory.getClass());
}
}
protected void sendMessageToForeignBroker(String text) throws JMSException {
Connection connection = null;
try {
connection = localConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(outbound);
TextMessage message = session.createTextMessage();
message.setText(text);
producer.send(message);
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
@Test(timeout = 30000)
public void testSelectorsWithJMSDeliveryOnQueue() throws Exception {
final Connection connection = createConnection();
String selector = "JMSDeliveryMode = 'PERSISTENT'";
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination, selector);
TextMessage message1 = session.createTextMessage();
message1.setText("non-persistent");
producer.send(message1, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
TextMessage message2 = session.createTextMessage();
message2.setText("persistent");
producer.send(message2, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
connection.start();
Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("Unexpected JMSDeliveryMode value", DeliveryMode.PERSISTENT, msg.getJMSDeliveryMode());
assertEquals("Unexpected message content", "persistent", ((TextMessage) msg).getText());
} finally {
connection.close();
}
}
public Values toTuple(Message msg) throws JMSException {
if (msg instanceof TextMessage) {
String json = ((TextMessage) msg).getText();
return new Values(json);
} else {
return null;
}
}
/** This test is validating a broker feature where the message copy through the DLQ will receive an annotation.
* It is also testing filter on that annotation. */
@Test(timeout = 60000)
public void testExpiryQpidJMS() throws Exception {
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", getBrokerAmqpConnectionURI().toString());
Connection connection = factory.createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
MessageProducer sender = session.createProducer(queue);
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
sender.setTimeToLive(1);
TextMessage message = session.createTextMessage("Test-Message");
message.setStringProperty("key1", "Value1");
sender.send(message);
sender.close();
Wait.assertEquals(1, queueView::getMessagesExpired);
final Queue dlqView = getProxyToQueue(getDeadLetterAddress());
assertNotNull(dlqView);
Wait.assertEquals(1, dlqView::getMessageCount);
connection.start();
javax.jms.Queue queueDLQ = session.createQueue(getDeadLetterAddress());
MessageConsumer receiverDLQ = session.createConsumer(queueDLQ, "\"m.x-opt-ORIG-ADDRESS\"='" + getQueueName() + "'");
Message received = receiverDLQ.receive(5000);
Assert.assertNotNull(received);
receiverDLQ.close();
} finally {
connection.close();
}
}
@Override
public void run() {
try {
Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue("test");
MessageConsumer consumer = null;
if (null != this.jmsSelector) {
consumer = session.createConsumer(queue, "JMSType='" + this.jmsSelector + "'");
} else {
consumer = session.createConsumer(queue);
}
while (!deliveryHalted && (counterReceived < testSize)) {
TextMessage result = (TextMessage) consumer.receive(30000);
if (result != null) {
counterReceived++;
// System.out.println("consuming .... JMSType = " + result.getJMSType() + " received = " +
// counterReceived);
LOG.info("consuming .... JMSType = " + result.getJMSType() + " received = " + counterReceived);
} else {
LOG.info("consuming .... timeout while waiting for a message ... broker must have stopped delivery ... received = " + counterReceived);
deliveryHalted = true;
}
}
session.close();
} catch (Exception e) {
e.printStackTrace();
}
}
static void setAllPropertyTypes(TextMessage message) throws JMSException {
message.setBooleanProperty("boolean", true);
message.setByteProperty("byte", Byte.MAX_VALUE);
message.setDoubleProperty("double", Double.MIN_VALUE);
message.setFloatProperty("float", Float.MIN_VALUE);
message.setIntProperty("int", Integer.MIN_VALUE);
message.setLongProperty("long", Long.MIN_VALUE);
message.setObjectProperty("object", Collections.emptyMap());
message.setShortProperty("short", Short.MIN_VALUE);
message.setStringProperty("string", "string");
}
@Test
public void testAutoCreateOnSendToQueueAnonymousProducer() throws Exception {
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = ActiveMQJMSClient.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(null);
final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(queue, mess);
}
producer.close();
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < numMessages; i++) {
Message m = messageConsumer.receive(5000);
Assert.assertNotNull(m);
}
connection.close();
}
@Test
public void validateConsumeWithCustomHeadersAndProperties() throws Exception {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage("hello from the other side");
message.setStringProperty("foo", "foo");
message.setBooleanProperty("bar", false);
message.setJMSReplyTo(session.createQueue("fooQueue"));
return message;
}
});
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
assertEquals("hello from the other side", new String(response.getMessageBody()));
assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
assertEquals("foo", response.getMessageProperties().get("foo"));
assertEquals("false", response.getMessageProperties().get("bar"));
}
});
assertTrue(callbackInvoked.get());
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
/**
* Tests if acknowledged messages are being consumed.
*
* @throws JMSException
*/
public void testLastMessageAcked() throws JMSException {
connection.start();
Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
TextMessage msg1 = session.createTextMessage("msg1");
TextMessage msg2 = session.createTextMessage("msg2");
TextMessage msg3 = session.createTextMessage("msg3");
producer.send(msg1);
producer.send(msg2);
producer.send(msg3);
// Consume the message...
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(1000);
assertNotNull(msg);
msg = consumer.receive(1000);
assertNotNull(msg);
msg = consumer.receive(1000);
assertNotNull(msg);
msg.acknowledge();
// Reset the session.
session.close();
session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
// Attempt to Consume the message...
consumer = session.createConsumer(queue);
msg = consumer.receive(1000);
assertNotNull(msg);
assertEquals(msg1, msg);
msg = consumer.receive(1000);
assertNotNull(msg);
assertEquals(msg2, msg);
msg = consumer.receive(1000);
assertNull(msg);
session.close();
}
public static void main(String[] args) throws Exception {
Connection connection = null;
ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:5672");
try {
// Step 1. Create an amqp qpid 1.0 connection
connection = connectionFactory.createConnection();
// Step 2. Create a session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 3. Create a sender
Queue queue = session.createQueue("exampleQueue");
MessageProducer sender = session.createProducer(queue);
// Step 4. send a few simple message
sender.send(session.createTextMessage("Hello world "));
connection.start();
// Step 5. create a moving receiver, this means the message will be removed from the queue
MessageConsumer consumer = session.createConsumer(queue);
// Step 7. receive the simple message
TextMessage m = (TextMessage) consumer.receive(5000);
System.out.println("message = " + m.getText());
} finally {
if (connection != null) {
// Step 9. close the connection
connection.close();
}
}
}
public static String extractMessageBody(Message message) throws JMSException, UnexpectedMessageTypeException {
if (message instanceof TextMessage) {
return extractTextMessageBody((TextMessage) message);
} else if (message instanceof BytesMessage) {
return extractByteMessageBody((BytesMessage) message);
} else {
throw new UnexpectedMessageTypeException("Type of Message does not match " + BytesMessage.class.getName() + " or " + TextMessage.class.getName());
}
}
@Override
public TextMessage createTextMessage(String messageText) throws JMSException {
TextMessage message = context.createTextMessage(messageText);
message.setStringProperty("test_string_property", "test123");
message.setIntProperty("test_int_property", 123);
message.setStringProperty("passwd", "secret");
message.setStringProperty("null_property", null);
return message;
}