下面列出了怎么用javax.jms.JMSConsumer的API类实例代码及写法,或者点击链接到github查看源代码。
@After
public void stop() throws Exception {
JMSContext context = factory.createContext();
Destination channel = context.createQueue(MockEvent.class.getName());
JMSConsumer consumer = context.createConsumer(channel);
// 清理测试消息
logger.info("清理JMS测试消息开始");
AtomicInteger count = new AtomicInteger();
consumer.setMessageListener((data) -> {
String message = StringUtility.format("清理JMS测试消息[{}]", count.incrementAndGet());
logger.info(message);
});
Thread.sleep(1000L);
logger.info("清理JMS测试消息结束");
factory.close();
}
@Test
public void testJMSContextConsumerThrowsMessageFormatExceptionOnMalformedBody() throws Exception {
Queue queue = createQueue(true, "ContextMalformedBodyTestQueue");
JMSContext context = qraConnectionFactory.createContext();
JMSProducer producer = context.createProducer();
TextMessage message = context.createTextMessage("TestMessage");
producer.send(queue, message);
JMSConsumer consumer = context.createConsumer(queue);
try {
consumer.receiveBody(Boolean.class);
fail("Should thrown MessageFormatException");
} catch (MessageFormatRuntimeException mfre) {
// Do nothing test passed
} catch (Exception e) {
fail("Threw wrong exception, should be MessageFormatRuntimeException, instead got: " + e.getClass().getCanonicalName());
}
}
@Test
public void testSendStreamMessage() throws JMSException, InterruptedException {
JmsProducerCompletionListenerTest.CountingCompletionListener cl = new JmsProducerCompletionListenerTest.CountingCompletionListener(1);
JMSProducer producer = context.createProducer();
producer.setAsync(cl);
StreamMessage msg = context.createStreamMessage();
msg.setStringProperty("name", name.getMethodName());
String bprop = "booleanProp";
String iprop = "intProp";
msg.setBooleanProperty(bprop, true);
msg.setIntProperty(iprop, 42);
msg.writeBoolean(true);
msg.writeInt(67);
producer.send(queue1, msg);
JMSConsumer consumer = context.createConsumer(queue1);
Message msg2 = consumer.receive(100);
Assert.assertNotNull(msg2);
Assert.assertTrue(cl.completionLatch.await(1, TimeUnit.SECONDS));
StreamMessage sm = (StreamMessage) cl.lastMessage;
Assert.assertEquals(true, sm.getBooleanProperty(bprop));
Assert.assertEquals(42, sm.getIntProperty(iprop));
Assert.assertEquals(true, sm.readBoolean());
Assert.assertEquals(67, sm.readInt());
}
@Test
public void sharedDurableSubSimpleRoundRobin() throws Exception {
context = cf.createContext();
try {
JMSConsumer con1 = context.createSharedDurableConsumer(topic1, "mySharedCon");
JMSConsumer con2 = context.createSharedDurableConsumer(topic1, "mySharedCon");
context.start();
JMSProducer producer = context.createProducer();
int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
producer.send(topic1, "msg:" + i);
}
for (int i = 0; i < numMessages; i += 2) {
String msg = con1.receiveBody(String.class, 5000);
msg = con2.receiveBody(String.class, 5000);
}
} finally {
context.close();
}
}
@Override
public CompletableFuture<Message> registerConcreteListenerImplementation(Destination destination) {
JMSConsumer consumer = context.createConsumer(destination);
final CompletableFuture<Message> incomingMessageFuture = new CompletableFuture<>();
//noinspection Convert2Lambda,Anonymous2MethodRef
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
incomingMessageFuture.complete(message);
}
});
return incomingMessageFuture;
}
@Override
public CompletableFuture<Message> registerListenerMethodReference(Destination destination) {
JMSConsumer consumer = context.createConsumer(destination);
final CompletableFuture<Message> incomingMessageFuture = new CompletableFuture<>();
// ActiveMQ Artemis wraps listeners with actual MessageListener instances
// of org.apache.activemq.artemis.jms.client.ActiveMQJMSConsumer$MessageListenerWrapper anyway..
consumer.setMessageListener(incomingMessageFuture::complete);
return incomingMessageFuture;
}
@Override
public JMSConsumer createDurableConsumer(final Topic topic, final String name) {
try {
// JMS 2 only: final JMSConsumerImpl consumer = new JMSConsumerImpl(this, session().createDurableConsumer(topic, name));
final MessageConsumer delegate = session().createDurableSubscriber(topic, name);
checkAutoStart();
return new JMSConsumerImpl(this, delegate);
} catch (final JMSException e) {
throw toRuntimeException(e);
}
}
JmsSource(JMSContext context, JmsConnectorIncomingConfiguration config, Jsonb json, Executor executor) {
String name = config.getDestination().orElseGet(config::getChannel);
String selector = config.getSelector().orElse(null);
boolean nolocal = config.getNoLocal();
boolean broadcast = config.getBroadcast();
boolean durable = config.getDurable();
Destination destination = getDestination(context, name, config);
JMSConsumer consumer;
if (durable) {
if (!(destination instanceof Topic)) {
throw ex.illegalArgumentInvalidDestination();
}
consumer = context.createDurableConsumer((Topic) destination, name, selector, nolocal);
} else {
consumer = context.createConsumer(destination, selector, nolocal);
}
publisher = new JmsPublisher(consumer);
if (!broadcast) {
source = ReactiveStreams.fromPublisher(publisher).map(m -> new IncomingJmsMessage<>(m, executor, json));
} else {
source = ReactiveStreams.fromPublisher(
Multi.createFrom().publisher(publisher)
.map(m -> new IncomingJmsMessage<>(m, executor, json))
.broadcast().toAllSubscribers());
}
}
@Override
public JMSConsumer createSharedDurableConsumer(Topic topic, String name) {
try {
return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createSharedDurableConsumer(topic, name)));
} catch (JMSException jmse) {
throw JmsExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public JMSConsumer createSharedDurableConsumer(final Topic topic, final String name, final String messageSelector) {
try {
final JMSConsumerImpl consumer = new JMSConsumerImpl(this, session().createSharedDurableConsumer(topic, name, messageSelector));
checkAutoStart();
return consumer;
} catch (final JMSException e) {
throw toRuntimeException(e);
}
}
@Override
public JMSConsumer createDurableConsumer(final Topic topic, final String name, final String messageSelector, final boolean noLocal) {
try {
// JMS 2 only: final JMSConsumerImpl consumer = new JMSConsumerImpl(this, session().createDurableConsumer(topic, name, messageSelector, noLocal));
final MessageConsumer delegate = session().createDurableSubscriber(topic, name, messageSelector, noLocal);
checkAutoStart();
return new JMSConsumerImpl(this, delegate);
} catch (final JMSException e) {
throw toRuntimeException(e);
}
}
@Test
public void testQueue() throws Exception {
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); JMSContext context = factory.createContext()) {
Queue queue = context.createQueue("queue.jms");
JMSProducer producer = context.createProducer();
producer.send(queue, content);
JMSConsumer consumer = context.createConsumer(queue);
Message message = consumer.receive(5000);
Assert.assertEquals(queue, message.getJMSDestination());
Assert.assertEquals(content, message.getBody(String.class));
}
}
@Test
public void test() throws Exception {
String body = createBody();
Response response = RestAssured.with().body(body).post("/artemis");
Assertions.assertEquals(Status.NO_CONTENT.getStatusCode(), response.statusCode());
try (JMSContext context = createContext()) {
JMSConsumer consumer = context.createConsumer(context.createQueue("test-jms"));
Message message = consumer.receive(1000L);
Assertions.assertEquals(body, message.getBody(String.class));
}
}
@Override
public JMSConsumer createConsumer(final Destination destination) {
try {
final JMSConsumerImpl consumer = new JMSConsumerImpl(this, session().createConsumer(destination));
checkAutoStart();
return consumer;
} catch (final JMSException e) {
throw toRuntimeException(e);
}
}
@Test(timeout = 20000)
public void testRemotelyCloseJMSConsumer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JMSContext context = testFixture.createJMSContext(testPeer);
testPeer.expectBegin();
// Create a consumer, then remotely end it afterwards.
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_DELETED, "resource closed");
Queue queue = context.createQueue("myQueue");
final JMSConsumer consumer = context.createConsumer(queue);
// Verify the consumer gets marked closed
testPeer.waitForAllHandlersToComplete(1000);
assertTrue("JMSConsumer never closed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
try {
consumer.getMessageListener();
} catch (IllegalStateRuntimeException jmsise) {
return true;
}
return false;
}
}, 10000, 10));
// Try closing it explicitly, should effectively no-op in client.
// The test peer will throw during close if it sends anything.
consumer.close();
testPeer.expectEnd();
testPeer.expectClose();
context.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Override
public JMSConsumer createConsumer(Destination destination, String selector, boolean noLocal) {
try {
return startIfNeeded(new JmsPoolJMSConsumer((JmsPoolMessageConsumer) getSession().createConsumer(destination, selector, noLocal)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public JMSConsumer createDurableConsumer(Topic topic, String name) {
try {
return startIfNeeded(new JmsPoolJMSConsumer((JmsPoolMessageConsumer) getSession().createDurableConsumer(topic, name)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public JMSConsumer createDurableConsumer(Topic topic, String name, String selector, boolean noLocal) {
try {
return startIfNeeded(new JmsPoolJMSConsumer((JmsPoolMessageConsumer) getSession().createDurableConsumer(topic, name, selector, noLocal)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public JMSConsumer createSharedConsumer(Topic topic, String name) {
try {
return startIfNeeded(new JmsPoolJMSConsumer((JmsPoolMessageConsumer) getSession().createSharedConsumer(topic, name)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public JMSConsumer createSharedConsumer(Topic topic, String name, String selector) {
try {
return startIfNeeded(new JmsPoolJMSConsumer((JmsPoolMessageConsumer) getSession().createSharedConsumer(topic, name, selector)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public JMSConsumer createSharedDurableConsumer(Topic topic, String name) {
try {
return startIfNeeded(new JmsPoolJMSConsumer((JmsPoolMessageConsumer) getSession().createSharedDurableConsumer(topic, name)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public JMSConsumer createSharedDurableConsumer(Topic topic, String name, String selector) {
try {
return startIfNeeded(new JmsPoolJMSConsumer((JmsPoolMessageConsumer) getSession().createSharedDurableConsumer(topic, name, selector)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
private JMSConsumer startIfNeeded(JMSConsumer consumer) throws JMSException {
if (getAutoStart()) {
connection.start();
}
return consumer;
}
@Override
public JMSConsumer createConsumer(Destination destination) {
try {
return startIfNeeded(new MockJMSConsumer(getSession(), (MockJMSMessageConsumer) getSession().createConsumer(destination)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public JMSConsumer createConsumer(Destination destination, String selector) {
try {
return startIfNeeded(new MockJMSConsumer(getSession(), (MockJMSMessageConsumer) getSession().createConsumer(destination, selector)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public JMSConsumer createConsumer(Destination destination, String selector, boolean noLocal) {
try {
return startIfNeeded(new MockJMSConsumer(getSession(), (MockJMSMessageConsumer) getSession().createConsumer(destination, selector, noLocal)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public JMSConsumer createDurableConsumer(Topic topic, String name) {
try {
return startIfNeeded(new MockJMSConsumer(getSession(), (MockJMSMessageConsumer) getSession().createDurableConsumer(topic, name)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public JMSConsumer createDurableConsumer(Topic topic, String name, String selector, boolean noLocal) {
try {
return startIfNeeded(new MockJMSConsumer(getSession(), (MockJMSMessageConsumer) getSession().createDurableConsumer(topic, name, selector, noLocal)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public JMSConsumer createSharedConsumer(Topic topic, String name) {
try {
return startIfNeeded(new MockJMSConsumer(getSession(), (MockJMSMessageConsumer) getSession().createSharedConsumer(topic, name)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public JMSConsumer createSharedDurableConsumer(Topic topic, String name) {
try {
return startIfNeeded(new MockJMSConsumer(getSession(), (MockJMSMessageConsumer) getSession().createSharedDurableConsumer(topic, name)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}