下面列出了javax.jms.MessageConsumer#receiveNoWait ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testReceiveNoWait() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
Session session = connection.createSession();
Queue queue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(queue, "Color = Red");
assertNull(consumer.receiveNoWait());
consumer.close();
try {
consumer.receiveNoWait();
fail("Should not be able to interact with closed consumer");
} catch (IllegalStateException ise) {}
}
@Test
public void testReceiveOnTopicTimeoutNoMessage() throws Exception {
Connection consumerConnection = null;
try {
consumerConnection = createConnection();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer topicConsumer = consumerSession.createConsumer(ActiveMQServerTestCase.topic1);
Message m = topicConsumer.receiveNoWait();
ProxyAssertSupport.assertNull(m);
} finally {
if (consumerConnection != null) {
consumerConnection.close();
}
}
}
@Test
public void testPreCommitAcksWithMessageExpiry() throws Exception {
conn = cf.createConnection();
Session session = conn.createSession(false, ActiveMQJMSConstants.PRE_ACKNOWLEDGE);
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
MessageProducer producer = session.createProducer(jBossQueue);
MessageConsumer consumer = session.createConsumer(jBossQueue);
int noOfMessages = 1000;
for (int i = 0; i < noOfMessages; i++) {
TextMessage textMessage = session.createTextMessage("m" + i);
producer.setTimeToLive(1);
producer.send(textMessage);
}
Thread.sleep(2);
conn.start();
Message m = consumer.receiveNoWait();
Assert.assertNull(m);
// Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this
// point
// which can cause delivering count to flip to 1
}
public void testPullConsumerWorks() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello World!"));
// now lets receive it
MessageConsumer consumer = session.createConsumer(queue);
Message answer = consumer.receive(5000);
assertNotNull("Should have received a message!", answer);
// check if method will return at all and will return a null
answer = consumer.receive(1);
assertNull("Should have not received a message!", answer);
answer = consumer.receiveNoWait();
assertNull("Should have not received a message!", answer);
}
@Test
public void testPreCommitAcksWithMessageExpirySetOnConnectionFactory() throws Exception {
((ActiveMQConnectionFactory) cf).setPreAcknowledge(true);
conn = cf.createConnection();
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
MessageProducer producer = session.createProducer(jBossQueue);
MessageConsumer consumer = session.createConsumer(jBossQueue);
int noOfMessages = 1000;
for (int i = 0; i < noOfMessages; i++) {
TextMessage textMessage = session.createTextMessage("m" + i);
producer.setTimeToLive(1);
producer.send(textMessage);
}
Thread.sleep(2);
conn.start();
Message m = consumer.receiveNoWait();
Assert.assertNull(m);
// Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this
// point
// which can cause delivering count to flip to 1
}
@Test
public void testBodyConversion() throws Throwable {
try (
Connection conn = cf.createConnection();
) {
Session sess = conn.createSession();
MessageProducer producer = sess.createProducer(queue);
MessageConsumer cons = sess.createConsumer(queue);
conn.start();
BytesMessage bytesMessage = sess.createBytesMessage();
producer.send(bytesMessage);
Message msg = cons.receiveNoWait();
assertNotNull(msg);
try {
msg.getBody(String.class);
fail("Exception expected");
} catch (MessageFormatException e) {
}
}
}
/**
* Actually receive a message from the given consumer.
* @param consumer the JMS MessageConsumer to receive with
* @param timeout the receive timeout (a negative value indicates
* a no-wait receive; 0 indicates an indefinite wait attempt)
* @return the JMS Message received, or {@code null} if none
* @throws JMSException if thrown by JMS API methods
* @since 4.3
* @see #RECEIVE_TIMEOUT_NO_WAIT
* @see #RECEIVE_TIMEOUT_INDEFINITE_WAIT
*/
@Nullable
protected Message receiveFromConsumer(MessageConsumer consumer, long timeout) throws JMSException {
if (timeout > 0) {
return consumer.receive(timeout);
}
else if (timeout < 0) {
return consumer.receiveNoWait();
}
else {
return consumer.receive();
}
}
/**
* Actually receive a message from the given consumer.
* @param consumer the JMS MessageConsumer to receive with
* @param timeout the receive timeout (a negative value indicates
* a no-wait receive; 0 indicates an indefinite wait attempt)
* @return the JMS Message received, or {@code null} if none
* @throws JMSException if thrown by JMS API methods
* @since 4.3
* @see #RECEIVE_TIMEOUT_NO_WAIT
* @see #RECEIVE_TIMEOUT_INDEFINITE_WAIT
*/
@Nullable
protected Message receiveFromConsumer(MessageConsumer consumer, long timeout) throws JMSException {
if (timeout > 0) {
return consumer.receive(timeout);
}
else if (timeout < 0) {
return consumer.receiveNoWait();
}
else {
return consumer.receive();
}
}
private void doCleanupQueue( final Session session, final Destination destination ) throws JMSException {
try {
MessageConsumer consumer = session.createConsumer(destination);
Message message = null;
do {
message = consumer.receiveNoWait();
if (message != null) {
message.acknowledge();
}
} while (message != null);
} finally {
releaseSession(false);
}
}
@Test
public void testAckModeClient2() throws Exception {
conn.connect(defUser, defPass);
subscribe(conn, "sub1", "client");
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++) {
this.sendJmsMessage("client-ack" + i);
}
ClientStompFrame frame = null;
for (int i = 0; i < num; i++) {
frame = conn.receiveFrame();
Assert.assertNotNull(frame);
//ack the 49th
if (i == num - 2) {
ack(conn, frame);
}
}
unsubscribe(conn, "sub1");
conn.disconnect();
//one can be received.
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
Assert.assertNotNull(message);
message = consumer.receiveNoWait();
Assert.assertNull(message);
}
private void checkDuplicate(MessageConsumer consumer) throws JMSException {
boolean duplicatedMessages = false;
while (true) {
TextMessage txt = (TextMessage) consumer.receiveNoWait();
if (txt == null) {
break;
} else {
duplicatedMessages = true;
instanceLog.warn("received in duplicate:" + txt.getText());
}
}
Assert.assertFalse("received messages in duplicate", duplicatedMessages);
}
@Test
public void testBrowserAndConsumerSimultaneousDifferentConnections() throws Exception {
((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
conn = cf.createConnection();
Connection connConsumer = cf.createConnection();
Session sessionConsumer = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
MessageProducer producer = session.createProducer(jBossQueue);
MessageConsumer consumer = sessionConsumer.createConsumer(jBossQueue);
int noOfMessages = 1000;
for (int i = 0; i < noOfMessages; i++) {
TextMessage textMessage = session.createTextMessage("m" + i);
textMessage.setIntProperty("i", i);
producer.send(textMessage);
}
connConsumer.start();
QueueBrowser browser = session.createBrowser(jBossQueue);
Enumeration enumMessages = browser.getEnumeration();
for (int i = 0; i < noOfMessages; i++) {
TextMessage msg = (TextMessage) enumMessages.nextElement();
Assert.assertNotNull(msg);
Assert.assertEquals(i, msg.getIntProperty("i"));
TextMessage recvMessage = (TextMessage) consumer.receiveNoWait();
Assert.assertNotNull(recvMessage);
Assert.assertEquals(i, msg.getIntProperty("i"));
}
Message m = consumer.receiveNoWait();
Assert.assertFalse(enumMessages.hasMoreElements());
Assert.assertNull(m);
conn.close();
}
@Test
public void testDurableSubscriptionRemovalRaceCondition() throws Exception {
final String topicName = "myTopic";
final String clientID = "myClientID";
final String subscriptionName = "mySub";
createTopic(topicName);
InitialContext ic = getInitialContext();
Topic myTopic = (Topic) ic.lookup("/topic/" + topicName);
Connection conn = null;
for (int i = 0; i < 1000; i++) {
try {
conn = createConnection();
conn.setClientID(clientID);
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = s.createProducer(myTopic);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
s.createDurableSubscriber(myTopic, subscriptionName);
prod.send(s.createTextMessage("k"));
conn.close();
destroyTopic(topicName);
createTopic(topicName);
conn = createConnection();
conn.setClientID(clientID);
s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer durable = s.createDurableSubscriber(myTopic, subscriptionName);
conn.start();
TextMessage tm = (TextMessage) durable.receiveNoWait();
ProxyAssertSupport.assertNull(tm);
durable.close();
s.unsubscribe(subscriptionName);
} finally {
if (conn != null) {
conn.close();
}
}
}
}
/**
* The simplest possible receiveNoWait() test.
*/
@Test
public void testReceiveNoWait() throws Exception {
Connection producerConnection = null;
Connection consumerConnection = null;
try {
producerConnection = createConnection();
consumerConnection = createConnection();
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer queueProducer = producerSession.createProducer(queue1);
MessageConsumer queueConsumer = consumerSession.createConsumer(queue1);
TextMessage tm = producerSession.createTextMessage("someText");
queueProducer.send(tm);
// start consumer connection after the message is submitted
consumerConnection.start();
// NOTE! There semantics of receiveNoWait do not guarantee the message is available
// immediately after the message is sent
// It will be available some indeterminate time later.
// This is fine and as per spec.
// To implement receiveNoWait otherwise would be very costly
// Also other messaging systems e.g. Sun, ActiveMQ Artemis implement it this way
Thread.sleep(500);
TextMessage m = (TextMessage) queueConsumer.receiveNoWait();
ProxyAssertSupport.assertEquals(tm.getText(), m.getText());
} finally {
if (producerConnection != null) {
producerConnection.close();
}
if (consumerConnection != null) {
consumerConnection.close();
}
}
}
/**
* JMS 1.1 6.11.1: A client can change an existing durable subscription by creating a durable
* TopicSubscriber with the same name and a new topic and/or message selector, or NoLocal
* attribute. Changing a durable subscription is equivalent to deleting and recreating it.
* <br>
* Test with a different selector.
*/
@Test
public void testDurableSubscriptionDifferentSelector() throws Exception {
Connection conn = null;
try {
conn = createConnection();
conn.setClientID("brookeburke");
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = s.createProducer(ActiveMQServerTestCase.topic1);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
MessageConsumer durable = s.createDurableSubscriber(ActiveMQServerTestCase.topic1, "monicabelucci", "color = 'red' AND shape = 'square'", false);
TextMessage tm = s.createTextMessage("A red square message");
tm.setStringProperty("color", "red");
tm.setStringProperty("shape", "square");
prod.send(tm);
conn.start();
TextMessage rm = (TextMessage) durable.receive(5000);
ProxyAssertSupport.assertEquals("A red square message", rm.getText());
tm = s.createTextMessage("Another red square message");
tm.setStringProperty("color", "red");
tm.setStringProperty("shape", "square");
prod.send(tm);
// TODO: when subscriptions/durable subscription will be registered as MBean, use the JMX
// interface to make sure the 'another red square message' is maintained by the
// durable subascription
// http://jira.jboss.org/jira/browse/JBMESSAGING-217
conn.close();
conn = createConnection();
conn.setClientID("brookeburke");
s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// modify the selector
durable = s.createDurableSubscriber(ActiveMQServerTestCase.topic1, "monicabelucci", "color = 'red'", false);
conn.start();
Message m = durable.receiveNoWait();
// the durable subscription is destroyed and re-created. The red square message stored by
// the previous durable subscription is lost and (hopefully) garbage collected.
ProxyAssertSupport.assertNull(m);
durable.close();
s.unsubscribe("monicabelucci");
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
public void testNack() throws Exception {
conn.connect(defUser, defPass);
subscribe(conn, "sub1", "client");
sendJmsMessage(getName());
ClientStompFrame frame = conn.receiveFrame();
String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
nack(conn, messageID);
unsubscribe(conn, "sub1");
conn.disconnect();
//Nack makes the message be dropped.
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receiveNoWait();
Assert.assertNull(message);
}
@Test
public void testWithSelector() throws Exception {
String selector1 = "beatle = 'john'";
String selector2 = "beatle = 'paul'";
String selector3 = "beatle = 'george'";
String selector4 = "beatle = 'ringo'";
String selector5 = "beatle = 'jesus'";
Connection conn = null;
try {
conn = getConnectionFactory().createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons1 = sess.createConsumer(ActiveMQServerTestCase.topic1, selector1);
MessageConsumer cons2 = sess.createConsumer(ActiveMQServerTestCase.topic1, selector2);
MessageConsumer cons3 = sess.createConsumer(ActiveMQServerTestCase.topic1, selector3);
MessageConsumer cons4 = sess.createConsumer(ActiveMQServerTestCase.topic1, selector4);
MessageConsumer cons5 = sess.createConsumer(ActiveMQServerTestCase.topic1, selector5);
Message m1 = sess.createMessage();
m1.setStringProperty("beatle", "john");
Message m2 = sess.createMessage();
m2.setStringProperty("beatle", "paul");
Message m3 = sess.createMessage();
m3.setStringProperty("beatle", "george");
Message m4 = sess.createMessage();
m4.setStringProperty("beatle", "ringo");
Message m5 = sess.createMessage();
m5.setStringProperty("beatle", "jesus");
MessageProducer prod = sess.createProducer(ActiveMQServerTestCase.topic1);
prod.send(m1);
prod.send(m2);
prod.send(m3);
prod.send(m4);
prod.send(m5);
Message r1 = cons1.receive(500);
ProxyAssertSupport.assertNotNull(r1);
Message n = cons1.receiveNoWait();
ProxyAssertSupport.assertNull(n);
Message r2 = cons2.receive(500);
ProxyAssertSupport.assertNotNull(r2);
n = cons2.receiveNoWait();
ProxyAssertSupport.assertNull(n);
Message r3 = cons3.receive(500);
ProxyAssertSupport.assertNotNull(r3);
n = cons3.receiveNoWait();
ProxyAssertSupport.assertNull(n);
Message r4 = cons4.receive(500);
ProxyAssertSupport.assertNotNull(r4);
n = cons4.receiveNoWait();
ProxyAssertSupport.assertNull(n);
Message r5 = cons5.receive(500);
ProxyAssertSupport.assertNotNull(r5);
n = cons5.receiveNoWait();
ProxyAssertSupport.assertNull(n);
ProxyAssertSupport.assertEquals("john", r1.getStringProperty("beatle"));
ProxyAssertSupport.assertEquals("paul", r2.getStringProperty("beatle"));
ProxyAssertSupport.assertEquals("george", r3.getStringProperty("beatle"));
ProxyAssertSupport.assertEquals("ringo", r4.getStringProperty("beatle"));
ProxyAssertSupport.assertEquals("jesus", r5.getStringProperty("beatle"));
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
public void testSimplestDurableSubscription() throws Exception {
Connection conn = null;
try {
conn = createConnection();
conn.setClientID("brookeburke");
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = s.createProducer(ActiveMQServerTestCase.topic1);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
s.createDurableSubscriber(ActiveMQServerTestCase.topic1, "monicabelucci");
List<String> subs = listAllSubscribersForTopic("Topic1");
ProxyAssertSupport.assertNotNull(subs);
ProxyAssertSupport.assertEquals(1, subs.size());
ProxyAssertSupport.assertEquals("monicabelucci", subs.get(0));
prod.send(s.createTextMessage("k"));
conn.close();
subs = listAllSubscribersForTopic("Topic1");
ProxyAssertSupport.assertEquals(1, subs.size());
ProxyAssertSupport.assertEquals("monicabelucci", subs.get(0));
conn = createConnection();
conn.setClientID("brookeburke");
s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer durable = s.createDurableSubscriber(ActiveMQServerTestCase.topic1, "monicabelucci");
conn.start();
TextMessage tm = (TextMessage) durable.receive(1000);
ProxyAssertSupport.assertEquals("k", tm.getText());
Message m = durable.receiveNoWait();
ProxyAssertSupport.assertNull(m);
durable.close();
s.unsubscribe("monicabelucci");
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
public void testReceiveNoWait() throws Exception {
assertNotNull(queue);
for (int i = 0; i < 1000; i++) {
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int j = 0; j < 100; j++) {
String text = "Message" + j;
TextMessage message = session.createTextMessage();
message.setText(text);
producer.send(message);
}
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
for (int j = 0; j < 100; j++) {
TextMessage m = (TextMessage) consumer.receiveNoWait();
if (m == null) {
throw new IllegalStateException("msg null");
}
assertEquals("Message" + j, m.getText());
m.acknowledge();
}
connection.close();
}
}
@Test
public void testReceiveNoWaitOnTopic() throws Exception {
Connection producerConnection = null;
Connection consumerConnection = null;
try {
producerConnection = createConnection();
consumerConnection = createConnection();
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer topicProducer = producerSession.createProducer(ActiveMQServerTestCase.topic1);
MessageConsumer topicConsumer = consumerSession.createConsumer(ActiveMQServerTestCase.topic1);
consumerConnection.start();
Message m = topicConsumer.receiveNoWait();
ProxyAssertSupport.assertNull(m);
Message m1 = producerSession.createMessage();
topicProducer.send(m1);
// block this thread for a while to allow ServerConsumerDelegate's delivery thread to kick in
Thread.sleep(500);
m = topicConsumer.receiveNoWait();
ProxyAssertSupport.assertEquals(m1.getJMSMessageID(), m.getJMSMessageID());
} finally {
if (producerConnection != null) {
producerConnection.close();
}
if (consumerConnection != null) {
consumerConnection.close();
}
}
}