下面列出了javax.jms.MessageConsumer#receive ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private int tryToFetchMissingMessages() throws JMSException {
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, 0);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
int count = 0;
while (true) {
Message message = consumer.receive(500);
if (message == null)
break;
log.info("Found \"missing\" message: " + message);
count++;
}
consumer.close();
session.close();
connection.close();
return count;
}
/**
* Tests if acknowledged messages are being consumed.
*
* @throws javax.jms.JMSException
*/
public void testAckedMessageAreConsumed() throws JMSException {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello"));
// Consume the message...
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(1000);
assertNotNull(msg);
// Reset the session.
session.close();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Attempt to Consume the message...
consumer = session.createConsumer(queue);
msg = consumer.receive(1000);
assertNull(msg);
session.close();
}
@Test(timeout = 30000)
public void testAnonymousProducerWithAutoCreation() throws Exception {
Connection connection = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(UUID.randomUUID().toString());
MessageProducer p = session.createProducer(null);
TextMessage message = session.createTextMessage();
message.setText("hello");
// this will auto-create the address
p.send(topic, message);
{
MessageConsumer consumer = session.createConsumer(topic);
p.send(topic, message);
Message msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
consumer.close();
}
} finally {
connection.close();
}
}
@Test
public void testDurableSharedAndNonDurableSharedCanUseTheSameSubscriptionName() throws Exception
{
try (Connection connection = getConnectionBuilder().setPrefetch(0).build())
{
Session publishingSession = connection.createSession();
Session subscriberSession = connection.createSession();
String topicName = getTestName();
Topic topic = publishingSession.createTopic("amq.direct/" + topicName);
MessageConsumer consumer1 = subscriberSession.createSharedDurableConsumer(topic, "testSharedSubscription");
MessageConsumer consumer2 = subscriberSession.createSharedConsumer(topic, "testSharedSubscription");
connection.start();
Utils.sendMessages(publishingSession, topic, 1);
Message message1 = consumer1.receive(getReceiveTimeout());
Message message2 = consumer2.receive(getReceiveTimeout());
assertNotNull("Message 1 was not received", message1);
assertNotNull("Message 2 was not received", message2);
assertEquals("Unexpected index for message 1", 0, message1.getIntProperty(Utils.INDEX));
assertEquals("Unexpected index for message 2", 0, message2.getIntProperty(Utils.INDEX));
}
}
public void testNoScheduledRedeliveryOfExpired() throws Exception {
startBroker(true);
ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
consumerConnection.start();
Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(destination);
sendMessage(1500);
Message message = consumer.receive(1000);
assertNotNull("got message", message);
// ensure there is another consumer to redispatch to
MessageConsumer redeliverConsumer = consumerSession.createConsumer(destination);
// allow consumed to expire so it gets redelivered
TimeUnit.SECONDS.sleep(2);
consumer.close();
// should go to dlq as it has expired
// validate DLQ
MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
Message dlqMessage = dlqConsumer.receive(2000);
assertNotNull("Got message from dql", dlqMessage);
assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
}
public static boolean produceConsume(final Connection connection, final Destination destination) throws Exception
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
try
{
MessageConsumer consumer = session.createConsumer(destination);
sendMessages(session, destination, 1);
session.commit();
connection.start();
Message message = consumer.receive(getReceiveTimeout());
session.commit();
return message != null;
}
finally
{
session.close();
}
}
protected void consumeTestQueueMessages(String queueName, int num) throws Exception {
// Start the connection
Connection connection = cf.createConnection();
connection.setClientID("clientId2" + queueName);
connection.start();
Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer;
try {
consumer = session.createConsumer(queue);
for (int i = 0; i < num; i++) {
consumer.receive();
}
consumer.close();
} finally {
// consumer.close();
connection.close();
}
}
/**
* Tests store upgrade has maintained the priority queue configuration,
* such that sending messages with priorities out-of-order and then consuming
* them gets the messages back in priority order.
*/
@Test
public void testPriorityQueue() throws Exception
{
Connection connection = getConnection();
try
{
connection.start();
// send some messages to the priority queue
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(PRIORITY_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("A"), DeliveryMode.PERSISTENT, 4, -1);
producer.send(session.createTextMessage("B"), DeliveryMode.PERSISTENT, 1, -1);
producer.send(session.createTextMessage("C"), DeliveryMode.PERSISTENT, 9, -1);
session.close();
//consume the messages, expected order: C, A, B.
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
Message message1 = consumer.receive(getReceiveTimeout());
assertThat("expected message was not received", message1, is(instanceOf(TextMessage.class)));
assertThat(((TextMessage) message1).getText(), is(equalTo("C")));
Message message2 = consumer.receive(getReceiveTimeout());
assertThat("expected message was not received", message2, is(instanceOf(TextMessage.class)));
assertThat(((TextMessage) message2).getText(), is(equalTo("A")));
Message message3 = consumer.receive(getReceiveTimeout());
assertThat("expected message was not received", message3, is(instanceOf(TextMessage.class)));
assertThat(((TextMessage) message3).getText(), is(equalTo("B")));
}
finally
{
connection.close();
}
}
/**
* http://www.jboss.org/index.html?module=bb&op=viewtopic&t=71350
*/
@Test
public void testRedel8() throws Exception {
Connection conn = null;
try {
conn = createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(queue1);
// Send 3 messages
prod.send(sess.createTextMessage("1"));
prod.send(sess.createTextMessage("2"));
prod.send(sess.createTextMessage("3"));
conn.start();
MessageConsumer cons1 = sess.createConsumer(queue1);
cons1.close();
MessageConsumer cons2 = sess.createConsumer(queue1);
Message r1 = cons2.receive();
Message r2 = cons2.receive();
Message r3 = cons2.receive();
// Messages should be received?
ProxyAssertSupport.assertNotNull(r1);
ProxyAssertSupport.assertNotNull(r2);
ProxyAssertSupport.assertNotNull(r3);
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
public void testAutoRollbackWithMissingRedeliveries() throws Exception {
LOG.info(this + " running test testAutoRollbackWithMissingRedeliveries");
broker = createBroker();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
try {
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(destination);
produceMessage(producerSession, destination);
Message msg = consumer.receive(20000);
Assert.assertNotNull(msg);
broker.stop();
broker = createBroker();
// use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
broker.start();
try {
consumerSession.commit();
Assert.fail("expected transaction rolledback ex");
} catch (TransactionRolledBackException expected) {
}
broker.stop();
broker = createBroker();
broker.start();
Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
} finally {
connection.close();
}
}
@Test
public void testAutoCreateOnSendToQueue() throws Exception {
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = ActiveMQJMSClient.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
producer.close();
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < numMessages; i++) {
Message m = messageConsumer.receive(5000);
Assert.assertNotNull(m);
}
// make sure the JMX control was created for the address and queue
assertNotNull(server.getManagementService().getResource(ADDRESS + QUEUE_NAME));
assertNotNull(server.getManagementService().getResource(QUEUE + QUEUE_NAME));
connection.close();
}
/**
* Tests rollback message to be marked as redelivered. Session uses client
* acknowledgement and the destination is a queue.
*
* @throws JMSException
*/
public void testQueueRollbackMarksMessageRedelivered() throws JMSException {
connection.start();
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue-" + getName());
MessageProducer producer = createProducer(session, queue);
producer.send(createTextMessage(session));
session.commit();
// Get the message... Should not be redelivered.
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(1000);
assertNotNull(msg);
assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
// Rollback.. should cause redelivery.
session.rollback();
// Attempt to Consume the message...
msg = consumer.receive(2000);
assertNotNull(msg);
assertTrue("Message should be redelivered.", msg.getJMSRedelivered());
session.commit();
session.close();
}
@Test
public void sendAndReceiveNull() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ObjectMessage testMessage = session.createObjectMessage(null);
Object o = testMessage.getObject();
assertNull("Object was not null", o);
assertNotNull("toString returned null", testMessage.toString());
MessageProducer producer = session.createProducer(queue);
producer.send(testMessage);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Message receivedMessage = consumer.receive(getReceiveTimeout());
assertTrue("ObjectMessage should be received", receivedMessage instanceof ObjectMessage);
Object result = ((ObjectMessage) receivedMessage).getObject();
assertEquals("First read: UUIDs were not equal", null, result);
result = ((ObjectMessage) receivedMessage).getObject();
assertEquals("Second read: UUIDs were not equal", null, result);
}
finally
{
connection.close();
}
}
@Test
public void sendEmptyObjectMessage() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ObjectMessage testMessage = session.createObjectMessage();
MessageProducer producer = session.createProducer(queue);
producer.send(testMessage);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Message receivedMessage = consumer.receive(getReceiveTimeout());
assertTrue("ObjectMessage should be received", receivedMessage instanceof ObjectMessage);
Object result = ((ObjectMessage) receivedMessage).getObject();
assertEquals("First read: unexpected object received", null, result);
result = ((ObjectMessage) receivedMessage).getObject();
assertEquals("Second read: unexpected object received", null, result);
}
finally
{
connection.close();
}
}
@Test
public void consumeBeyondPrefetch() throws Exception
{
Connection connection1 = getConnectionBuilder().setPrefetch(1).build();
Queue queue = createQueue(getTestName());
try
{
connection1.start();
final Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer1 = session1.createConsumer(queue);
Utils.sendMessages(connection1, queue, 5);
Message message = consumer1.receive(getReceiveTimeout());
assertNotNull(message);
assertEquals(0, message.getIntProperty(INDEX));
message = consumer1.receive(getReceiveTimeout());
assertNotNull(message);
assertEquals(1, message.getIntProperty(INDEX));
message = consumer1.receive(getReceiveTimeout());
assertNotNull(message);
assertEquals(2, message.getIntProperty(INDEX));
forceSync(session1);
// In pre 0-10, in a transaction session the client does not ack the message until the commit occurs
// so the message observed by another connection will have the index 3 rather than 4.
Connection connection2 = getConnection();
try
{
Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer2 = session2.createConsumer(queue);
connection2.start();
message = consumer2.receive(getReceiveTimeout());
assertNotNull(message);
assertEquals("Received message has unexpected index",
PRE_010_PROTOCOLS.contains(getProtocol()) ? 3 : 4,
message.getIntProperty(INDEX));
session2.rollback();
}
finally
{
connection2.close();
}
}
finally
{
connection1.close();
}
}
@Test
public void testConsumerWithRollback() throws Exception {
String queueName = "testConsumerWithRollback";
String testMessage = "testConsumerWithRollback-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
// Setup XA connection
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
xaConnection.start();
producer.send(session.createTextMessage(testMessage));
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
TextMessage message = (TextMessage) consumer.receive(2000);
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK.");
xaResource.rollback(xid);
session.close();
xaConnection.close();
Assert.assertNotNull(message, "Sent message should be consumed by the consumer.");
Assert.assertEquals(message.getText(), testMessage, "Received message should match the sent message.");
// Check whether the message is published to queue.
QueueMetadata queueMetadata = restApiClient.getQueueMetadata(queueName);
Assert.assertEquals((int) queueMetadata.getSize(), 1, "Queue should be non empty");
}
@Test
public void testPersistedMessageType() throws Exception {
Connection theConn = null;
Connection theOtherConn = null;
try {
theConn = createConnection();
theConn.start();
// Send some persistent messages to a queue with no receivers
Session sessSend = theConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer theProducer = sessSend.createProducer(queue1);
theProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = sessSend.createMessage();
m.setStringProperty("p1", "aardvark");
BytesMessage bm = sessSend.createBytesMessage();
bm.writeObject("aardvark");
MapMessage mm = sessSend.createMapMessage();
mm.setString("s1", "aardvark");
ObjectMessage om = sessSend.createObjectMessage();
om.setObject("aardvark");
StreamMessage sm = sessSend.createStreamMessage();
sm.writeString("aardvark");
TextMessage tm = sessSend.createTextMessage("aardvark");
theProducer.send(m);
theProducer.send(bm);
theProducer.send(mm);
theProducer.send(om);
theProducer.send(sm);
theProducer.send(tm);
theConn.close();
theOtherConn = createConnection();
theOtherConn.start();
Session sessReceive = theOtherConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer theConsumer = sessReceive.createConsumer(queue1);
Message m2 = theConsumer.receive(1500);
log.trace("m2 is " + m2);
ProxyAssertSupport.assertNotNull(m2);
ProxyAssertSupport.assertEquals("aardvark", m2.getStringProperty("p1"));
BytesMessage bm2 = (BytesMessage) theConsumer.receive(1500);
ProxyAssertSupport.assertEquals("aardvark", bm2.readUTF());
MapMessage mm2 = (MapMessage) theConsumer.receive(1500);
ProxyAssertSupport.assertEquals("aardvark", mm2.getString("s1"));
ObjectMessage om2 = (ObjectMessage) theConsumer.receive(1500);
ProxyAssertSupport.assertEquals("aardvark", (String) om2.getObject());
StreamMessage sm2 = (StreamMessage) theConsumer.receive(1500);
ProxyAssertSupport.assertEquals("aardvark", sm2.readString());
TextMessage tm2 = (TextMessage) theConsumer.receive(1500);
ProxyAssertSupport.assertEquals("aardvark", tm2.getText());
} finally {
if (theConn != null) {
theConn.close();
}
if (theOtherConn != null) {
theOtherConn.close();
}
}
}
@Test
@BMRules(
rules = {@BMRule(
name = "Corrupt Decoding",
targetClass = "org.apache.activemq.artemis.core.protocol.ClientPacketDecoder",
targetMethod = "decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.DisconnectOnCriticalFailureTest.doThrow($1);")})
public void testClientDisconnect() throws Exception {
Queue q1 = createQueue("queue1");
final Connection connection = nettyCf.createConnection();
final CountDownLatch latch = new CountDownLatch(1);
try {
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException e) {
latch.countDown();
}
});
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(q1);
TextMessage m = session.createTextMessage("hello");
producer.send(m);
connection.start();
corruptPacket.set(true);
MessageConsumer consumer = session.createConsumer(q1);
consumer.receive(2000);
assertTrue(latch.await(5, TimeUnit.SECONDS));
} finally {
corruptPacket.set(false);
if (connection != null) {
connection.close();
}
}
}
@Test
public void testAutoVirtualTopicWildcardStarFQQN() throws Exception {
Connection connection = null;
SimpleString topicA = new SimpleString("VirtualTopic.Orders.A");
SimpleString topicB = new SimpleString("VirtualTopic.Orders.B");
SimpleString topic = new SimpleString("VirtualTopic.Orders.*");
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
try {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
activeMQConnectionFactory.setWatchTopicAdvisories(false);
connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicA.toString() + "," + topicB.toString());
MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("Consumer.A." + topic.toString()));
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a text message");
producer.send(message);
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
TextMessage messageReceivedB = (TextMessage) messageConsumerA.receive(2000);
assertTrue((messageReceivedA != null && messageReceivedB != null));
String text = messageReceivedA.getText();
assertEquals("This is a text message", text);
messageConsumerA.close();
} finally {
if (connection != null) {
connection.close();
}
}
}
@Test
public void testMixedOpenWireExample() throws Exception {
Connection openConn = null;
SimpleString durableQueue = new SimpleString("exampleQueue");
this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST));
ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory();
Queue queue = new ActiveMQQueue("exampleQueue");
openConn = openCF.createConnection();
openConn.start();
Session openSession = openConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = openSession.createProducer(queue);
TextMessage message = openSession.createTextMessage("This is a text message");
producer.send(message);
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory artemisCF = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
Connection artemisConn = artemisCF.createConnection();
Session artemisSession = artemisConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
artemisConn.start();
MessageConsumer messageConsumer = artemisSession.createConsumer(artemisSession.createQueue("exampleQueue"));
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
assertEquals("This is a text message", messageReceived.getText());
openConn.close();
artemisConn.close();
}