下面列出了javax.jms.TopicSubscriber#setMessageListener ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public JMSSink(final String tcfBindingName, final String topicBindingName, final String username,
final String password) {
try {
final Context ctx = new InitialContext();
final TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) lookup(ctx,
tcfBindingName);
final TopicConnection topicConnection =
topicConnectionFactory.createTopicConnection(username,
password);
topicConnection.start();
final TopicSession topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
final Topic topic = (Topic) ctx.lookup(topicBindingName);
final TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);
topicSubscriber.setMessageListener(this);
} catch (final Exception e) {
logger.error("Could not read JMS message.", e);
}
}
@Test
public void testNotificationProperties() throws Exception {
try (TopicConnection topicConnection = factory.createTopicConnection()) {
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic notificationsTopic = topicSession.createTopic("activemq.notifications");
TopicSubscriber subscriber = topicSession.createSubscriber(notificationsTopic);
List<Message> receivedMessages = new CopyOnWriteArrayList<>();
subscriber.setMessageListener(receivedMessages::add);
topicConnection.start();
Wait.waitFor(() -> receivedMessages.size() > 0);
Assert.assertTrue(receivedMessages.size() > 0);
for (Message message : receivedMessages) {
assertNotNull(message);
assertNotNull(message.getStringProperty("_AMQ_NotifType"));
}
}
}
public void testWithSessionAndSubsciberClose() throws Exception {
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
for (int i = 0; i < 100; i++) {
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = subscriberSession.createSubscriber(topic);
DummyMessageListener listener = new DummyMessageListener();
subscriber.setMessageListener(listener);
subscriber.close();
subscriberSession.close();
}
connection.close();
Thread.sleep(1000);
Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic);
assertNotNull(dest);
assertTrue(dest.getConsumers().isEmpty());
}
public void testWithSessionCloseOutsideTheLoop() throws Exception {
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < 100; i++) {
TopicSubscriber subscriber = subscriberSession.createSubscriber(topic);
DummyMessageListener listener = new DummyMessageListener();
subscriber.setMessageListener(listener);
subscriber.close();
}
subscriberSession.close();
connection.close();
Thread.sleep(1000);
Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic);
assertNotNull(dest);
assertTrue(dest.getConsumers().isEmpty());
}
public void testWithOneSubscriber() throws Exception {
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = subscriberSession.createSubscriber(topic);
DummyMessageListener listener = new DummyMessageListener();
subscriber.setMessageListener(listener);
subscriber.close();
subscriberSession.close();
connection.close();
Thread.sleep(1000);
Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic);
assertNotNull(dest);
assertTrue(dest.getConsumers().isEmpty());
}
/**
* Running this test you can produce a leak of only 2 ConsumerInfo on BE
* broker, NOT 200 as in other cases!
*/
public void testWithoutSessionAndSubsciberClosePlayAround() throws Exception {
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
for (int i = 0; i < 100; i++) {
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = subscriberSession.createSubscriber(topic);
DummyMessageListener listener = new DummyMessageListener();
subscriber.setMessageListener(listener);
if (i != 50) {
subscriber.close();
subscriberSession.close();
}
}
connection.close();
Thread.sleep(1000);
Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic);
assertNotNull(dest);
assertTrue(dest.getConsumers().isEmpty());
}
private static void subscribe() throws JMSException {
// get topic connection
TopicConnectionFactory connectionFactory = new WeEventConnectionFactory(defaultBrokerUrl);
TopicConnection connection = connectionFactory.createTopicConnection();
// start connection
connection.start();
// create session
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// create topic
Topic topic = session.createTopic(topicName);
// create subscriber
TopicSubscriber subscriber = session.createSubscriber(topic);
// create listener
subscriber.setMessageListener(message -> {
BytesMessage msg = (BytesMessage) message;
try {
byte[] data = new byte[(int) msg.getBodyLength()];
msg.readBytes(data);
System.out.println("received: " + new String(data, StandardCharsets.UTF_8));
} catch (JMSException e) {
e.printStackTrace();
}
});
connection.close();
}
public synchronized void load() throws GenericServiceException {
try {
InitialContext jndi = JNDIContextFactory.getInitialContext(jndiServer);
TopicConnectionFactory factory = (TopicConnectionFactory) jndi.lookup(jndiName);
if (factory != null) {
con = factory.createTopicConnection(userName, password);
con.setExceptionListener(this);
session = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = (Topic) jndi.lookup(topicName);
if (topic != null) {
TopicSubscriber subscriber = session.createSubscriber(topic);
subscriber.setMessageListener(this);
con.start();
this.setConnected(true);
if (Debug.verboseOn()) Debug.logInfo("Listening to topic [" + topicName + "] on [" + jndiServer + "]...", module);
} else {
throw new GenericServiceException("Topic lookup failed.");
}
} else {
throw new GenericServiceException("Factory (broker) lookup failed.");
}
} catch (NamingException ne) {
throw new GenericServiceException("JNDI lookup problems; listener not running.", ne);
} catch (JMSException je) {
throw new GenericServiceException("JMS internal error; listener not running.", je);
} catch (GeneralException ge) {
throw new GenericServiceException("Problems with InitialContext; listener not running.", ge);
}
}
private ActiveMQTopic registerDurableConsumer(BrokerService brokerService,
MessageListener listener) throws Exception {
ConnectionFactory factory = createConnectionFactory(brokerService);
Connection connection = factory.createConnection();
connection.setClientID("DurableOne");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic destination = (ActiveMQTopic) session.createTopic(DESTINATION_NAME);
// unique to a broker
TopicSubscriber sub = session.createDurableSubscriber(destination, "SubOne" + brokerService.getBrokerName());
sub.setMessageListener(listener);
return destination;
}
private void openConsumer() throws Exception {
consumerConnection = (ActiveMQConnection) createConnection();
consumerConnection.setClientID("cliID");
consumerConnection.start();
Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subName", "filter=true", false);
subscriber.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
received++;
}
});
}
public void testExpireMessagesForDurableSubscriber() throws Exception {
createBroker();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
connection = factory.createConnection();
connection.setClientID("myConnection");
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
connection.start();
Topic destination = session.createTopic("test");
producer = session.createProducer(destination);
final int ttl = 1000;
producer.setTimeToLive(ttl);
final long sendCount = 10;
TopicSubscriber sub = session.createDurableSubscriber(destination, "mySub");
sub.close();
for (int i = 0; i < sendCount; i++) {
producer.send(session.createTextMessage("test"));
}
DestinationViewMBean view = createView((ActiveMQTopic) destination);
LOG.info("messages sent");
LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
assertEquals(0, view.getExpiredCount());
assertEquals(10, view.getEnqueueCount());
Thread.sleep(5000);
LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
assertEquals(10, view.getExpiredCount());
assertEquals(10, view.getEnqueueCount());
final AtomicLong received = new AtomicLong();
sub = session.createDurableSubscriber(destination, "mySub");
sub.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
received.incrementAndGet();
}
});
LOG.info("Waiting for messages to arrive");
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return received.get() >= sendCount;
}
}, 1000);
LOG.info("received=" + received.get());
LOG.info("expired=" + view.getExpiredCount() + " " + view.getEnqueueCount());
assertEquals(0, received.get());
assertEquals(10, view.getExpiredCount());
assertEquals(10, view.getEnqueueCount());
}
@Test(timeout = 60000)
public void testOfflineSubscriberGetsItsMessages() throws Exception {
connection = createAmqpConnection();
connection.setClientID("DURABLE-AMQP");
connection.start();
assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
final int MSG_COUNT = 5;
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Topic topic = session.createTopic(name.getMethodName());
TopicSubscriber subscriber = session.createDurableSubscriber(topic, getSubscriptionName());
TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
assertEquals(0, proxy.getQueueSize());
assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
subscriber.close();
assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < MSG_COUNT; i++) {
producer.send(session.createTextMessage("Message: " + i));
}
producer.close();
LOG.info("Bringing offline subscription back online.");
subscriber = session.createDurableSubscriber(topic, getSubscriptionName());
assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length);
assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
final CountDownLatch messages = new CountDownLatch(MSG_COUNT);
subscriber.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
LOG.info("Consumer got a message: {}", message);
messages.countDown();
}
});
assertTrue("Only recieved messages: " + messages.getCount(), messages.await(30, TimeUnit.SECONDS));
subscriber.close();
session.unsubscribe(getSubscriptionName());
}