下面列出了javax.jms.JMSConsumer#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void sharedNonDurableUnsubscribeDifferentTopic() throws Exception {
context = cf.createContext();
try {
JMSConsumer con1 = context.createSharedConsumer(topic1, "mySharedCon");
JMSConsumer con2 = context.createSharedConsumer(topic1, "mySharedCon");
con1.close();
Binding binding = server.getPostOffice().getBinding(new SimpleString("nonDurable.mySharedCon"));
assertNotNull(binding);
con2.close();
binding = server.getPostOffice().getBinding(new SimpleString("nonDurable.mySharedCon"));
assertNull(binding);
con1 = context.createSharedConsumer(topic2, "mySharedCon");
} finally {
context.close();
}
}
@Override
public void unregisterMonitor(Set<Class> types, EventMonitor monitor) {
for (Class type : types) {
EventManager manager = managers.get(type);
if (manager != null) {
manager.detachMonitor(monitor);
if (manager.getSize() == 0) {
managers.remove(type);
JMSConsumer consumer = consumers.remove(type);
consumer.close();
}
}
}
}
@Test
public void testCloseMoreThanOnce() throws JMSException {
JMSConsumer consumer = context.createConsumer(context.createTemporaryQueue());
consumer.close();
consumer.close();
}
@Test
public void testReceive() throws JMSException {
JMSConsumer consumer = context.createConsumer(context.createTemporaryQueue());
assertNull(consumer.receive());
consumer.close();
try {
consumer.receive();
fail("Should not be able to interact with closed consumer");
} catch (IllegalStateRuntimeException ise) {}
}
@Test
public void testReceiveNoWait() throws JMSException {
JMSConsumer consumer = context.createConsumer(context.createTemporaryQueue());
assertNull(consumer.receiveNoWait());
consumer.close();
try {
consumer.receiveNoWait();
fail("Should not be able to interact with closed consumer");
} catch (IllegalStateRuntimeException ise) {}
}
@Test
public void testReceiveTimed() throws JMSException {
JMSConsumer consumer = context.createConsumer(context.createTemporaryQueue());
assertNull(consumer.receive(1));
consumer.close();
try {
consumer.receive(1);
fail("Should not be able to interact with closed consumer");
} catch (IllegalStateRuntimeException ise) {}
}
@Test
public void testGetMessageSelector() throws JMSException {
JMSConsumer consumer = context.createConsumer(context.createTemporaryQueue(), "Color = Red");
assertNotNull(consumer.getMessageSelector());
assertEquals("Color = Red", consumer.getMessageSelector());
consumer.close();
try {
consumer.getMessageSelector();
fail("Should not be able to interact with closed consumer");
} catch (IllegalStateRuntimeException ise) {}
}
@Test
public void testContextOnConsumerAMQP() throws Throwable {
if (!isNetty()) {
// no need to run the test, there's no AMQP support
return;
}
assertNull(server.getAddressInfo(SimpleString.toSimpleString("queue")));
ConnectionFactory factory = createFactory(2);
JMSContext context = factory.createContext("admin", "admin", Session.AUTO_ACKNOWLEDGE);
try {
javax.jms.Queue queue = context.createQueue("queue");
JMSConsumer consumer = context.createConsumer(queue);
ServerConsumer serverConsumer = null;
for (ServerSession session : server.getSessions()) {
for (ServerConsumer sessionConsumer : session.getServerConsumers()) {
serverConsumer = sessionConsumer;
}
}
consumer.close();
Assert.assertTrue(serverConsumer.getProtocolContext() instanceof ProtonServerSenderContext);
final AMQPSessionContext sessionContext = ((ProtonServerSenderContext)
serverConsumer.getProtocolContext()).getSessionContext();
Wait.assertEquals(0, () -> sessionContext.getSenderCount(), 1000, 10);
} finally {
context.stop();
context.close();
}
}
@Test
public void sharedDurableUnsubscribeNewTopic() throws Exception {
context = cf.createContext();
try {
JMSConsumer con1 = context.createSharedDurableConsumer(topic1, "mySharedCon");
JMSConsumer con2 = context.createSharedDurableConsumer(topic1, "mySharedCon");
con1.close();
con2.close();
context.unsubscribe("mySharedCon");
con1 = context.createSharedDurableConsumer(topic2, "mySharedCon");
} finally {
context.close();
}
}
@Test(timeout = 20000)
public void testRemotelyCloseJMSConsumer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JMSContext context = testFixture.createJMSContext(testPeer);
testPeer.expectBegin();
// Create a consumer, then remotely end it afterwards.
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_DELETED, "resource closed");
Queue queue = context.createQueue("myQueue");
final JMSConsumer consumer = context.createConsumer(queue);
// Verify the consumer gets marked closed
testPeer.waitForAllHandlersToComplete(1000);
assertTrue("JMSConsumer never closed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
try {
consumer.getMessageListener();
} catch (IllegalStateRuntimeException jmsise) {
return true;
}
return false;
}
}, 10000, 10));
// Try closing it explicitly, should effectively no-op in client.
// The test peer will throw during close if it sends anything.
consumer.close();
testPeer.expectEnd();
testPeer.expectClose();
context.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test
public void testShareDuraleWithJMSContext() throws Exception {
((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
JMSContext conn = cf.createContext(JMSContext.AUTO_ACKNOWLEDGE);
JMSConsumer consumer = conn.createSharedDurableConsumer(topic, "c1");
JMSProducer producer = conn.createProducer();
for (int i = 0; i < 100; i++) {
producer.setProperty("count", i).send(topic, "test" + i);
}
JMSContext conn2 = conn.createContext(JMSContext.AUTO_ACKNOWLEDGE);
JMSConsumer consumer2 = conn2.createSharedDurableConsumer(topic, "c1");
for (int i = 0; i < 50; i++) {
String txt = consumer.receiveBody(String.class, 5000);
Assert.assertNotNull(txt);
txt = consumer.receiveBody(String.class, 5000);
Assert.assertNotNull(txt);
}
Assert.assertNull(consumer.receiveNoWait());
Assert.assertNull(consumer2.receiveNoWait());
boolean exceptionHappened = false;
try {
conn.unsubscribe("c1");
} catch (Exception e) {
e.printStackTrace();
exceptionHappened = true;
}
Assert.assertTrue(exceptionHappened);
consumer.close();
consumer2.close();
conn2.close();
conn.unsubscribe("c1");
}
@Test
public void testRollbackTest() {
JMSContext ctx = addContext(cf.createContext(JMSContext.SESSION_TRANSACTED));
JMSProducer producer = ctx.createProducer();
JMSConsumer cons = ctx.createConsumer(queue1);
producer.send(queue1, context.createTextMessage("hello"));
ctx.rollback();
assertNull(cons.receiveNoWait());
producer.send(queue1, context.createTextMessage("hello"));
ctx.commit();
assertNotNull(cons.receiveNoWait());
ctx.commit();
ctx.rollback();
assertNull(cons.receiveNoWait());
cons.close();
}