下面列出了怎么用javax.jms.MessageListener的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
Destination destination = ActiveMQDestination.fromPrefixedName("queue://shipping");
try (Connection conn = factory.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("Received Message: ");
Order order = Jms.getEntity(message, Order.class);
System.out.println(order);
}
});
conn.start();
Thread.sleep(1000000);
}
}
/**
* Test schedule callback
*/
@Test
public void testScheduleCallBack() throws JMSException, InterruptedException {
/*
* Set up mocks
*/
sqsSessionRunnable.callbackQueue = new ArrayDeque<SQSSession.CallbackEntry>();
MessageListener msgListener = mock(MessageListener.class);
SQSMessageConsumerPrefetch.MessageManager msgManager = mock(SQSMessageConsumerPrefetch.MessageManager.class);
/*
* Nack the messages, exception expected
*/
sqsSessionRunnable.scheduleCallBacks(msgListener, Collections.singletonList(msgManager));
assertEquals(1, sqsSessionRunnable.callbackQueue.size());
SQSSession.CallbackEntry entry = sqsSessionRunnable.callbackQueue.pollFirst();
assertEquals(msgListener, entry.getMessageListener());
assertEquals(msgManager, entry.getMessageManager());
}
/**
* Invoke the specified listener: either as standard JMS MessageListener
* or (preferably) as Spring SessionAwareMessageListener.
* @param session the JMS Session to operate on
* @param message the received JMS Message
* @throws JMSException if thrown by JMS API methods
* @see #setMessageListener
*/
@SuppressWarnings("rawtypes")
protected void invokeListener(Session session, Message message) throws JMSException {
Object listener = getMessageListener();
if (listener instanceof SessionAwareMessageListener) {
doInvokeListener((SessionAwareMessageListener) listener, session, message);
}
else if (listener instanceof MessageListener) {
doInvokeListener((MessageListener) listener, message);
}
else if (listener != null) {
throw new IllegalArgumentException(
"Only MessageListener and SessionAwareMessageListener supported: " + listener);
}
else {
throw new IllegalStateException("No message listener specified - see property 'messageListener'");
}
}
public void testShouldCreateContainer() throws Exception {
final EjbJar ejbJar = new EjbJar();
final OpenejbJar openejbJar = new OpenejbJar();
final EjbModule ejbModule = new EjbModule(ejbJar, openejbJar);
final AppModule appModule = new AppModule(ejbModule);
appModule.getContainers().add(new Container("my-container", "MESSAGE", null));
final AppInfo appInfo = new AppInfoBuilder(new ConfigurationFactory()).build(appModule);
assertEquals(1, appInfo.containers.size());
final ContainerInfo containerInfo = appInfo.containers.get(0);
assertEquals(appInfo.appId + "/my-container", containerInfo.id);
assertEquals(1, containerInfo.types.size());
assertEquals("MESSAGE", containerInfo.types.get(0));
assertEquals(MdbContainerFactory.class.getName(), containerInfo.className);
assertEquals("Default JMS Resource Adapter", containerInfo.properties.get("ResourceAdapter"));
assertEquals(MessageListener.class.getName(), containerInfo.properties.get("MessageListenerInterface"));
assertEquals(TomEEMessageActivationSpec.class.getName(), containerInfo.properties.get("ActivationSpecClass"));
assertEquals("10", containerInfo.properties.get("InstanceLimit"));
assertEquals("true", containerInfo.properties.get("FailOnUnknownActivationSpec"));
}
@Test
public void testCantReceiveWhenListenerIsSet() throws Exception {
conn = cf.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
MessageConsumer consumer = session.createConsumer(jBossQueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(final Message msg) {
}
});
try {
consumer.receiveNoWait();
Assert.fail("Should throw exception");
} catch (JMSException e) {
// Ok
}
}
@Override
public void start() {
try {
session = connection.createSession(transacted, acknowledgeMode);
if (durableSubscriptionName != null && destination instanceof Topic) {
consumer = session.createDurableSubscriber((Topic)destination, durableSubscriptionName,
messageSelector, pubSubNoLocal);
} else {
consumer = session.createConsumer(destination, messageSelector);
}
MessageListener intListener = new LocalTransactionalMessageListener(session, listenerHandler);
// new DispachingListener(getExecutor(), listenerHandler);
consumer.setMessageListener(intListener);
running = true;
} catch (JMSException e) {
throw JMSUtil.convertJmsException(e);
}
}
@Override
protected void start() throws JMSException, NamingException
{
super.start();
this.xaConsumer = this.getXaSession().createDurableSubscriber(this.getXaTopic(), this.getConfig().getDurableConsumerName());
if (this.getInterruptableBackoff() != null && this.getNonXaSession() != null)
{
this.nonXaConsumer = this.getNonXaSession().createConsumer(this.getNonXaTopic());
this.nonXaConsumer.setMessageListener(new MessageListener()
{
@Override
public void onMessage(Message message)
{
IncomingTopic.this.getInterruptableBackoff().asyncInterrupt();
}
});
}
}
protected List<CompletableFuture<List<Message>>> receiveMessagesAsync(int count, MessageConsumer... consumer) throws JMSException {
AtomicInteger totalCount = new AtomicInteger(count);
List<CompletableFuture<List<Message>>> resultsList = new ArrayList<>();
List<List<Message>> receivedResList = new ArrayList<>();
for (int i = 0; i < consumer.length; i++) {
final int index = i;
resultsList.add(new CompletableFuture<>());
receivedResList.add(new ArrayList<>());
MessageListener myListener = message -> {
instanceLog.debug("Mesages received" + message + " count: " + totalCount.get());
receivedResList.get(index).add(message);
if (totalCount.decrementAndGet() == 0) {
for (int j = 0; j < consumer.length; j++) {
resultsList.get(j).complete(receivedResList.get(j));
}
}
};
consumer[i].setMessageListener(myListener);
}
return resultsList;
}
@Test
public void createSimpleContainer() {
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
setDefaultJmsConfig(factory);
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
MessageListener messageListener = new MessageListenerAdapter();
endpoint.setMessageListener(messageListener);
endpoint.setDestination("myQueue");
SimpleMessageListenerContainer container = factory.createListenerContainer(endpoint);
assertDefaultJmsConfig(container);
assertEquals(messageListener, container.getMessageListener());
assertEquals("myQueue", container.getDestinationName());
}
@Test
public void createJmsContainerFullConfig() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
setDefaultJmsConfig(factory);
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
factory.setConcurrency("3-10");
factory.setMaxMessagesPerTask(5);
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
MessageListener messageListener = new MessageListenerAdapter();
endpoint.setMessageListener(messageListener);
endpoint.setDestination("myQueue");
DefaultMessageListenerContainer container = factory.createListenerContainer(endpoint);
assertDefaultJmsConfig(container);
assertEquals(DefaultMessageListenerContainer.CACHE_CONSUMER, container.getCacheLevel());
assertEquals(3, container.getConcurrentConsumers());
assertEquals(10, container.getMaxConcurrentConsumers());
assertEquals(5, container.getMaxMessagesPerTask());
assertEquals(messageListener, container.getMessageListener());
assertEquals("myQueue", container.getDestinationName());
}
@Test
public void testConnectionProblem() throws JMSException {
Connection connection = createConnection("broker");
Queue dest = JMSUtil.createQueue(connection, "test");
MessageListener listenerHandler = new TestMessageListener();
TestExceptionListener exListener = new TestExceptionListener();
PollingMessageListenerContainer container = //
new PollingMessageListenerContainer(connection, dest, listenerHandler, exListener);
connection.close(); // Simulate connection problem
container.start();
Awaitility.await().until(() -> exListener.exception != null);
JMSException ex = exListener.exception;
assertNotNull(ex);
assertEquals("The connection is already closed", ex.getMessage());
}
private void createConsumer() throws JMSException {
if (durableSub) {
consumer = session.createDurableSubscriber(topic, durableID);
} else {
consumer = session.createConsumer(topic);
}
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message arg0) {
TextMessage msg = (TextMessage) arg0;
try {
LOG.debug("Received message [" + msg.getText() + "]");
receivedStrings.add(msg.getText());
receivedLatch.countDown();
} catch (JMSException e) {
fail("Unexpected :" + e);
}
}
});
}
@Override
public CompletableFuture<Message> registerConcreteListenerImplementation(Destination destination) {
final CompletableFuture<Message> incomingMessageFuture = new CompletableFuture<>();
try {
MessageConsumer consumer = session.createConsumer(destination);
//noinspection Convert2Lambda,Anonymous2MethodRef
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
incomingMessageFuture.complete(message);
}
});
} catch (JMSException e) {
throw new RuntimeException(e);
}
return incomingMessageFuture;
}
protected MessageListener createMessageListener(int i, final List<Message> messageList) {
return new MessageListener() {
@Override
public void onMessage(Message message) {
consumeMessage(message, messageList);
}
};
}
private void noConsumerAdvisory() throws JMSException {
for (BrokerItem item : brokers.values()) {
ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(item.broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false");
Connection connection = brokerAFactory.createConnection();
connection.start();
connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(AdvisorySupport.getNoTopicConsumersAdvisoryTopic(new ActiveMQTempTopic(">"))).setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
sendsWithNoConsumers.incrementAndGet();
}
});
}
}
@Test
public void testLocalTransaction() throws JMSException, XAException, InterruptedException {
Connection connection = createConnection("brokerLocalTransaction");
Queue dest = JMSUtil.createQueue(connection, "test");
MessageListener listenerHandler = new TestMessageListener();
AbstractMessageListenerContainer container = new MessageListenerContainer(connection, dest, listenerHandler);
container.setTransacted(true);
container.setAcknowledgeMode(Session.SESSION_TRANSACTED);
container.start();
testTransactionalBehaviour(connection, dest);
container.stop();
connection.close();
}
@Test
public void testSetMessageListenerAfterStart() throws Exception {
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch done = new CountDownLatch(1);
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, destinationType);
MessageConsumer consumer = session.createConsumer(destination);
// Send the messages
sendMessages(session, destination, 4);
// See if the message get sent to the listener
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
counter.incrementAndGet();
if (counter.get() == 4) {
done.countDown();
}
}
});
assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
Thread.sleep(200);
// Make sure only 4 messages were delivered.
assertEquals(4, counter.get());
}
public DelegatingTransactionalMessageListener(MessageListener underlyingListener,
Connection connection,
Destination destination) {
this.underlyingListener = underlyingListener;
try {
session = connection.createSession(transacted, ackMode);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
} catch (JMSException e) {
throw new IllegalStateException("Could not listen to " + destination, e);
}
}
@Test
public void createListener() {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
MessageListener messageListener = new MessageListenerAdapter();
endpoint.setMessageListener(messageListener);
assertSame(messageListener, endpoint.createMessageListener(container));
}
@Test
public void testRemoveAllScheduled() throws Exception {
final int COUNT = 5;
Connection connection = createConnection();
// Setup the scheduled Message
scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6), COUNT);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the Browse Destination and the Reply To location
Destination management = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
// Create the eventual Consumer to receive the scheduled message
MessageConsumer consumer = session.createConsumer(destination);
final CountDownLatch latch = new CountDownLatch(COUNT);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
}
});
connection.start();
// Send the remove request
MessageProducer producer = session.createProducer(management);
Message request = session.createMessage();
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
producer.send(request);
// Now wait and see if any get delivered, none should.
latch.await(10, TimeUnit.SECONDS);
assertEquals(latch.getCount(), COUNT);
}
private void forwardFailureAdvisory() throws JMSException {
for (BrokerItem item : brokers.values()) {
ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(item.broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false");
Connection connection = brokerAFactory.createConnection();
connection.start();
connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic()).setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
forwardFailures.incrementAndGet();
}
});
}
}
private void testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveResponse(final Connection connection,
final BiFunction<ThingId, DittoHeaders, CommandResponse> responseSupplier,
final String expectedAddressPrefix,
final Predicate<String> messageTextPredicate) throws JMSException {
new TestKit(actorSystem) {{
final Props props =
AmqpClientActor.propsForTests(connection, getRef(), getRef(),
(ac, el) -> mockConnection);
final ActorRef amqpClientActor = actorSystem.actorOf(props);
amqpClientActor.tell(OpenConnection.of(CONNECTION_ID, DittoHeaders.empty()), getRef());
expectMsg(CONNECTED_SUCCESS);
final ArgumentCaptor<MessageListener> captor = ArgumentCaptor.forClass(MessageListener.class);
verify(mockConsumer, timeout(1000).atLeastOnce()).setMessageListener(captor.capture());
final MessageListener messageListener = captor.getValue();
messageListener.onMessage(mockMessage());
final ThingCommand command = expectMsgClass(ThingCommand.class);
assertThat((CharSequence) command.getEntityId()).isEqualTo(TestConstants.Things.THING_ID);
assertThat(command.getDittoHeaders().getCorrelationId()).contains(TestConstants.CORRELATION_ID);
assertThat(command).isInstanceOf(ModifyThing.class);
getLastSender().tell(responseSupplier.apply(command.getEntityId(), command.getDittoHeaders()), getRef());
final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
// verify that the message is published via the producer with the correct destination
final MessageProducer messageProducer =
getProducerForAddress(expectedAddressPrefix + command.getEntityId());
verify(messageProducer, timeout(2000)).send(messageCaptor.capture(), any(CompletionListener.class));
final Message message = messageCaptor.getValue();
assertThat(message).isNotNull();
assertThat(messageTextPredicate).accepts(message.getBody(String.class));
}};
}
/**
* Test SetMessageListener when message were prefetched
*/
@Test
public void testSetMessageListener() {
SQSMessageConsumerPrefetch.MessageManager msgManager1 = mock(SQSMessageConsumerPrefetch.MessageManager.class);
Message message1 = mock(Message.class);
when(msgManager1.getMessage())
.thenReturn(message1);
SQSMessageConsumerPrefetch.MessageManager msgManager2 = mock(SQSMessageConsumerPrefetch.MessageManager.class);
Message message2 = mock(Message.class);
when(msgManager2.getMessage())
.thenReturn(message2);
consumerPrefetch.messageQueue.add(msgManager1);
consumerPrefetch.messageQueue.add(msgManager2);
MessageListener msgListener = mock(MessageListener.class);
consumerPrefetch.running = true;
consumerPrefetch.setMessageListener(msgListener);
assertTrue(consumerPrefetch.messageQueue.isEmpty());
List<MessageManager> expectedList = new ArrayList<MessageManager>();
expectedList.add(msgManager1);
expectedList.add(msgManager2);
verify(sqsSessionRunnable).scheduleCallBacks(msgListener, expectedList);
verifyNoMoreInteractions(sqsSessionRunnable);
}
/**
* Get the message listener -- throws IllegalStateException
*
* @return The message listener
* @throws JMSException Thrown if an error occurs
*/
@Override
public MessageListener getMessageListener() throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("getMessageListener()");
}
throw new IllegalStateException("Method not allowed");
}
@Override
public void setupMessageListener(Object messageListener) {
if (messageListener instanceof MessageListener) {
setMessageListener((MessageListener) messageListener);
}
else {
throw new IllegalArgumentException("Unsupported message listener '" +
messageListener.getClass().getName() + "': only '" + MessageListener.class.getName() +
"' type is supported");
}
}
/**
* Test getting message listener
*/
@Test
public void testGetMessageListener() {
MessageListener msgListener = mock(MessageListener.class);
consumerPrefetch.setMessageListener(msgListener);
assertEquals(msgListener, consumerPrefetch.getMessageListener());
}
@Test
public void setupConcurrencySimpleContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
MessageListener messageListener = new MessageListenerAdapter();
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setConcurrency("5-10"); // simple implementation only support max value
endpoint.setMessageListener(messageListener);
endpoint.setupListenerContainer(container);
assertEquals(10, new DirectFieldAccessor(container).getPropertyValue("concurrentConsumers"));
}
@Test
public void receive() throws Exception {
when(delegate.getMessageListener()).thenReturn(mock(PreservesMessagePropertiesMessageListener.class));
propagator.receive();
verify(delegate).receive();
reset(delegate);
when(delegate.getMessageListener()).thenReturn(mock(MessageListener.class));
propagator.receive();
verify(delegate).receive();
}
private void createAdvisorySubscription() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(topic));
advisoryConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (((ActiveMQMessage) message).getDataStructure() instanceof RemoveSubscriptionInfo) {
advisories.incrementAndGet();
}
}
});
}
public void registerListener(String consumerName, MessageListener messageListener)
{
MessageConsumer consumer = _testConsumers.get(consumerName);
try
{
consumer.setMessageListener(messageListener);
}
catch (JMSException e)
{
throw new DistributedTestException("Unable to register message listener with consumer: " + consumerName, e);
}
}