下面列出了javax.jms.JMSConsumer#receive ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void run() {
try (JMSContext context = connectionFactory.createContext(Session.AUTO_ACKNOWLEDGE)) {
JMSConsumer consumer = context.createConsumer(context.createQueue("prices"));
while (true) {
Message message = consumer.receive();
if (message == null) {
// receive returns `null` if the JMSConsumer is closed
return;
}
lastPrice = message.getBody(String.class);
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
@Test
public void testSendStreamMessage() throws JMSException, InterruptedException {
JmsProducerCompletionListenerTest.CountingCompletionListener cl = new JmsProducerCompletionListenerTest.CountingCompletionListener(1);
JMSProducer producer = context.createProducer();
producer.setAsync(cl);
StreamMessage msg = context.createStreamMessage();
msg.setStringProperty("name", name.getMethodName());
String bprop = "booleanProp";
String iprop = "intProp";
msg.setBooleanProperty(bprop, true);
msg.setIntProperty(iprop, 42);
msg.writeBoolean(true);
msg.writeInt(67);
producer.send(queue1, msg);
JMSConsumer consumer = context.createConsumer(queue1);
Message msg2 = consumer.receive(100);
Assert.assertNotNull(msg2);
Assert.assertTrue(cl.completionLatch.await(1, TimeUnit.SECONDS));
StreamMessage sm = (StreamMessage) cl.lastMessage;
Assert.assertEquals(true, sm.getBooleanProperty(bprop));
Assert.assertEquals(42, sm.getIntProperty(iprop));
Assert.assertEquals(true, sm.readBoolean());
Assert.assertEquals(67, sm.readInt());
}
@Test
public void testQueue() throws Exception {
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); JMSContext context = factory.createContext()) {
Queue queue = context.createQueue("queue.jms");
JMSProducer producer = context.createProducer();
producer.send(queue, content);
JMSConsumer consumer = context.createConsumer(queue);
Message message = consumer.receive(5000);
Assert.assertEquals(queue, message.getJMSDestination());
Assert.assertEquals(content, message.getBody(String.class));
}
}
@Test
public void test() throws Exception {
String body = createBody();
Response response = RestAssured.with().body(body).post("/artemis");
Assertions.assertEquals(Status.NO_CONTENT.getStatusCode(), response.statusCode());
try (JMSContext context = createContext()) {
JMSConsumer consumer = context.createConsumer(context.createQueue("test-jms"));
Message message = consumer.receive(1000L);
Assertions.assertEquals(body, message.getBody(String.class));
}
}
@Test
public void testReceive() throws JMSException {
JMSConsumer consumer = context.createConsumer(context.createTemporaryQueue());
assertNull(consumer.receive());
consumer.close();
try {
consumer.receive();
fail("Should not be able to interact with closed consumer");
} catch (IllegalStateRuntimeException ise) {}
}
@Test
public void testReceiveTimed() throws JMSException {
JMSConsumer consumer = context.createConsumer(context.createTemporaryQueue());
assertNull(consumer.receive(1));
consumer.close();
try {
consumer.receive(1);
fail("Should not be able to interact with closed consumer");
} catch (IllegalStateRuntimeException ise) {}
}
/**
* The client sends a messagge to a fanout exchange instance which is bound to a queue with
* holdsOnPublish turned off. The Broker must reject the message.
*/
@Test
public void testDeliveryDelayNotSupportedByQueueViaExchange_MessageRejected() throws Exception
{
try (JMSContext context = getConnectionBuilder().buildConnectionFactory().createContext())
{
String testQueueName = BrokerAdmin.TEST_QUEUE_NAME;
String testExchangeName = "test_exch";
Destination consumeDest = createQueue(context, testQueueName, false);
Destination publishDest = createExchange(context, testExchangeName);
bindQueueToExchange(testExchangeName, testQueueName);
JMSConsumer consumer = context.createConsumer(consumeDest);
JMSProducer producer = context.createProducer();
producer.send(publishDest, "message without delivery delay");
Message message = consumer.receive(getReceiveTimeout());
assertNotNull("Message published without delivery delay not received", message);
producer.setDeliveryDelay(DELIVERY_DELAY);
try
{
producer.send(publishDest, "message with delivery delay");
fail("Exception not thrown");
}
catch (JMSRuntimeException e)
{
assertTrue("Unexpected exception message: " + e.getMessage(),
e.getMessage().contains("amqp:precondition-failed"));
}
}
}
@Test
public void testCloseSecondContextConnectionRemainsOpen() throws JMSException {
JMSContext localContext = context.createContext(JMSContext.CLIENT_ACKNOWLEDGE);
Assert.assertEquals("client_ack", JMSContext.CLIENT_ACKNOWLEDGE, localContext.getSessionMode());
JMSProducer producer = localContext.createProducer();
JMSConsumer consumer = localContext.createConsumer(queue1);
final int pass = 1;
for (int idx = 0; idx < 2; idx++) {
Message m = localContext.createMessage();
int intProperty = random.nextInt();
m.setIntProperty("random", intProperty);
Assert.assertNotNull(m);
producer.send(queue1, m);
m = null;
Message msg = consumer.receive(100);
Assert.assertNotNull("must have a msg", msg);
Assert.assertEquals(intProperty, msg.getIntProperty("random"));
/* In the second pass we close the connection before ack'ing */
if (idx == pass) {
localContext.close();
}
/**
* From {@code JMSContext.close()}'s javadoc:<br/>
* Invoking the {@code acknowledge} method of a received message from a closed connection's
* session must throw an {@code IllegalStateRuntimeException}. Closing a closed connection
* must NOT throw an exception.
*/
try {
msg.acknowledge();
Assert.assertEquals("connection should be open on pass 0. It is " + pass, 0, idx);
} catch (javax.jms.IllegalStateException expected) {
// HORNETQ-1209 "JMS 2.0" XXX JMSContext javadoc says we must expect a
// IllegalStateRuntimeException here. But Message.ack...() says it must throws the
// non-runtime variant.
Assert.assertEquals("we only close the connection on pass " + pass, pass, idx);
}
}
}
protected final void receiveMessages(JMSConsumer consumer, final int start, final int msgCount, final boolean ack) {
try {
for (int i = start; i < msgCount; i++) {
Message message = consumer.receive(100);
Assert.assertNotNull("Expecting a message " + i, message);
final int actual = message.getIntProperty("counter");
Assert.assertEquals("expected=" + i + ". Got: property['counter']=" + actual, i, actual);
if (ack)
message.acknowledge();
}
} catch (JMSException cause) {
throw new JMSRuntimeException(cause.getMessage(), cause.getErrorCode(), cause);
}
}
@Test(timeout = 20000)
public void testReceiveMessageWithReceiveZeroTimeout() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JMSContext context = testFixture.createJMSContext(testPeer);
testPeer.expectBegin();
Queue queue = context.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
JMSConsumer messageConsumer = context.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(0);
assertNotNull("A message should have been recieved", receivedMessage);
testPeer.expectEnd();
testPeer.expectClose();
context.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test
public void testDelay() throws Exception {
JMSProducer producer = context.createProducer();
JMSConsumer consumer = context.createConsumer(queue1);
producer.setDeliveryDelay(500);
long timeStart = System.currentTimeMillis();
String strRandom = newXID().toString();
producer.send(queue1, context.createTextMessage(strRandom));
TextMessage msg = (TextMessage) consumer.receive(2500);
assertNotNull(msg);
long actualDelay = System.currentTimeMillis() - timeStart;
assertTrue("delay is not working, actualDelay=" + actualDelay, actualDelay >= 500 && actualDelay < 2000);
assertEquals(strRandom, msg.getText());
}