下面列出了javax.jms.MessageConsumer#setMessageListener ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testSendManyMessages() throws Exception {
conn.connect(defUser, defPass);
MessageConsumer consumer = session.createConsumer(queue);
int count = 1000;
final CountDownLatch latch = new CountDownLatch(count);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message arg0) {
latch.countDown();
}
});
for (int i = 1; i <= count; i++) {
send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
}
assertTrue(latch.await(60, TimeUnit.SECONDS));
}
@Test
public void testAckedMessageAreConsumed() throws Exception {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello"));
// Consume the message...
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(this);
latch.await(10, TimeUnit.SECONDS);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Attempt to Consume the message...check if message was acknowledge
consumer = session.createConsumer(queue);
Message msg = consumer.receive(1000);
assertNull(msg);
session.close();
}
private void initializeJmsObjectsForTopic() throws JMSException {
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
Topic topic = (Topic)destination;
if (destination == null)
topic = session.createTopic(destinationName);
MessageConsumer consumer = durableSubscription ? session.createDurableSubscriber(topic, durableSubscriptionName) :
session.createConsumer(topic);
IgniteJmsMessageListener messageListener = new IgniteJmsMessageListener(session, true);
consumer.setMessageListener(messageListener);
consumers.add(consumer);
sessions.add(session);
listeners.add(messageListener);
}
@Test
public void testSendManyMessages() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
conn.connect(defUser, defPass);
int count = 1000;
final CountDownLatch latch = new CountDownLatch(count);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message arg0) {
latch.countDown();
}
});
for (int i = 1; i <= count; i++) {
send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
}
assertTrue(latch.await(60, TimeUnit.SECONDS));
conn.disconnect();
}
public void testNoSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = s.createQueue(getClass().getName());
MessageConsumer consumer = s.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
}
});
Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(1000);
assertNull(msg);
}
@Test
public void synchronousReceiveFollowedByMessageListener() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Utils.sendMessages(session, queue, MSG_COUNT);
connection.start();
final MessageConsumer consumer = session.createConsumer(queue);
assertNotNull("Could not receive first message synchronously", consumer.receive(getReceiveTimeout()));
CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT - 1);
consumer.setMessageListener(countingMessageListener);
countingMessageListener.awaitMessages(getReceiveTimeout());
assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount());
}
finally
{
connection.close();
}
}
@Test
public void connectionStopThenStart() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnectionBuilder().setPrefetch(0).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Utils.sendMessages(session, queue, MSG_COUNT);
connection.start();
final MessageConsumer consumer = session.createConsumer(queue);
final int messageToReceivedBeforeConnectionStop = MSG_COUNT / 2;
CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop);
consumer.setMessageListener(countingMessageListener);
countingMessageListener.awaitMessages(getReceiveTimeout());
connection.stop();
assertTrue("Too few messages received after Connection#stop()", countingMessageListener.getReceivedCount() >= messageToReceivedBeforeConnectionStop);
countingMessageListener.resetLatch();
connection.start();
countingMessageListener.awaitMessages(getReceiveTimeout());
assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount());
}
finally
{
connection.close();
}
}
public void testCannotUseMessageListener() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
MessageListener listener = new SpringConsumer();
try {
consumer.setMessageListener(listener);
fail("Should have thrown JMSException as we cannot use MessageListener with zero prefetch");
} catch (JMSException e) {
LOG.info("Received expected exception : " + e);
}
}
protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception {
MessageConsumer consumer;
for (int i = 0; i < consumerCount; i++) {
TimedMessageListener list = new TimedMessageListener();
consumer = createConsumer(factory.createConnection(), dest);
consumer.setMessageListener(list);
consumers.put(consumer, list);
}
}
@Test
public void testAsyncReceiveWithExpirationChecks() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
final CountDownLatch received = new CountDownLatch(1);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(name.getMethodName());
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
received.countDown();
}
});
MessageProducer producer = session.createProducer(destination);
producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
producer.send(session.createTextMessage("test"));
// Allow message to expire in the prefetch buffer
TimeUnit.SECONDS.sleep(4);
connection.start();
assertFalse(received.await(1, TimeUnit.SECONDS));
connection.close();
}
@Test
public void testSendOverDiskFull() throws Exception {
AssertionLoggerHandler.startCapture();
try {
MessageConsumer consumer = session.createConsumer(queue);
conn.connect(defUser, defPass);
int count = 1000;
final CountDownLatch latch = new CountDownLatch(count);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message arg0) {
latch.countDown();
}
});
((ActiveMQServerImpl) server).getMonitor()
.setMaxUsage(0)
.tick();
// Connection should be closed by broker when disk is full and attempt to send
Exception e = null;
try {
for (int i = 1; i <= count; i++) {
send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
}
} catch (Exception se) {
e = se;
}
assertNotNull(e);
// It should encounter the exception on logs
AssertionLoggerHandler.findText("AMQ119119");
} finally {
AssertionLoggerHandler.clear();
AssertionLoggerHandler.stopCapture();
}
}
@Test
public void testAutoAckMsgListenerQueue() throws Exception {
Connection conn = null;
try {
CountDownLatch latch = new CountDownLatch(1);
conn = createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue1);
MessageConsumer consumer = session.createConsumer(queue1);
AutoAckMsgListener listener = new AutoAckMsgListener(latch, session);
consumer.setMessageListener(listener);
// create and send messages
log.debug("Send and receive two message");
Message messageSent = session.createMessage();
messageSent.setBooleanProperty("last", false);
producer.send(messageSent);
messageSent.setBooleanProperty("last", true);
producer.send(messageSent);
conn.start();
// wait until message is received
log.debug("waiting until message has been received by message listener...");
latch.await(10, TimeUnit.SECONDS);
// check message listener status
if (listener.getPassed() == false) {
throw new Exception("failed");
}
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
@Ignore
public void testMessageListenerWithConsumer() throws Exception {
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch done = new CountDownLatch(1);
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, destinationType);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message m) {
counter.incrementAndGet();
if (counter.get() == 4) {
done.countDown();
}
}
});
// Send the messages
sendMessages(session, destination, 4);
assertTrue(done.await(1000, TimeUnit.MILLISECONDS));
Thread.sleep(200);
// Make sure only 4 messages were delivered.
assertEquals(4, counter.get());
}
@Test
public void testMessageListenerClientAck() throws Exception {
Connection conn = createConnection();
Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sessSend.createProducer(queue1);
TextMessage tm1 = sessSend.createTextMessage("a");
TextMessage tm2 = sessSend.createTextMessage("b");
TextMessage tm3 = sessSend.createTextMessage("c");
prod.send(tm1);
prod.send(tm2);
prod.send(tm3);
sessSend.close();
assertRemainingMessages(3);
conn.start();
Session sessReceive = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer cons = sessReceive.createConsumer(queue1);
MessageListenerClientAck listener = new MessageListenerClientAck(sessReceive);
cons.setMessageListener(listener);
listener.waitForMessages();
Thread.sleep(500);
assertRemainingMessages(0);
conn.close();
ProxyAssertSupport.assertFalse(listener.failed);
}
public void run(String... args) throws Exception {
String[] split = args[1].split("@");
String host = args[0];
String vpnName = split[1];
String username = split[0];
String password = args[2];
System.out.printf("TopicSubscriber is connecting to Solace messaging at %s...%n", host);
// Programmatically create the connection factory using default settings
SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setVPN(vpnName);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
Connection connection = connectionFactory.createConnection();
// Create a non-transacted, Auto ACK session.
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.printf("Connected to Solace Message VPN '%s' with client username '%s'.%n", vpnName,
username);
// Create the subscription topic programmatically
Topic topic = session.createTopic(TOPIC_NAME);
// Create the message consumer for the subscription topic
MessageConsumer messageConsumer = session.createConsumer(topic);
// Use the anonymous inner class for receiving messages asynchronously
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
System.out.printf("TextMessage received: '%s'%n", ((TextMessage) message).getText());
} else {
System.out.println("Message received.");
}
System.out.printf("Message Content:%n%s%n", SolJmsUtility.dumpMessage(message));
latch.countDown(); // unblock the main thread
} catch (JMSException ex) {
System.out.println("Error processing incoming message.");
ex.printStackTrace();
}
}
});
// Start receiving messages
connection.start();
System.out.println("Awaiting message...");
// the main thread blocks at the next statement until a message received
latch.await();
connection.stop();
// Close everything in the order reversed from the opening order
// NOTE: as the interfaces below extend AutoCloseable,
// with them it's possible to use the "try-with-resources" Java statement
// see details at https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
messageConsumer.close();
session.close();
connection.close();
}
@Override
public String call() throws JMSException, InterruptedException {
String result = null;
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(createDestination(session));
SynchronousQueue<Message> sq = new SynchronousQueue<>();
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message arg0) {
sq.offer(arg0);
}
});
if (latch != null) {
latch.countDown();
}
Message message = sq.poll(5000, TimeUnit.MILLISECONDS);
if (message instanceof TextMessage) {
TextMessage mesg = (TextMessage) message;
if (mesg.getJMSReplyTo() != null) {
MessageProducer producer = session.createProducer(mesg.getJMSReplyTo());
TextMessage response = session.createTextMessage(mesg.getText() + SUFFIX);
producer.send(response);
producer.close();
}
result = mesg.getText();
}
consumer.close();
session.close();
connection.close();
return result;
}
public void run(String... args) throws Exception {
String[] split = args[1].split("@");
String host = args[0];
String vpnName = split[1];
String username = split[0];
String password = args[2];
System.out.printf("BasicReplier is connecting to Solace messaging at %s...%n", host);
// Programmatically create the connection factory using default settings
SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setVPN(vpnName);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// Create connection to Solace messaging
Connection connection = connectionFactory.createConnection();
// Create a non-transacted, auto ACK session.
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.printf("Connected to the Solace Message VPN '%s' with client username '%s'.%n", vpnName,
username);
// Create the request topic programmatically
Topic requestTopic = session.createTopic(REQUEST_TOPIC_NAME);
// Create the message consumer for the request topic
MessageConsumer requestConsumer = session.createConsumer(requestTopic);
// Create the message producer for the reply queue
final MessageProducer replyProducer = session.createProducer(null);
// Use the anonymous inner class for receiving request messages asynchronously
requestConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message request) {
try {
Destination replyDestination = request.getJMSReplyTo();
if (replyDestination != null) {
System.out.println("Received request, responding...");
TextMessage reply = session.createTextMessage();
String text = "Sample response";
reply.setText(text);
// Copy the correlation ID from the request to the reply
reply.setJMSCorrelationID(request.getJMSCorrelationID());
// For direct messaging only, this flag is needed to interoperate with
// Solace Java, C, and C# request reply APIs.
reply.setBooleanProperty(SupportedProperty.SOLACE_JMS_PROP_IS_REPLY_MESSAGE, Boolean.TRUE);
// Sent the reply
replyProducer.send(replyDestination, reply, DeliveryMode.NON_PERSISTENT,
Message.DEFAULT_PRIORITY,
Message.DEFAULT_TIME_TO_LIVE);
System.out.println("Responded successfully. Exiting...");
latch.countDown(); // unblock the main thread
} else {
System.out.println("Received message without reply-to field.");
}
} catch (JMSException ex) {
System.out.println("Error processing incoming message.");
ex.printStackTrace();
}
}
});
// Start receiving messages
connection.start();
System.out.println("Awaiting request...");
// the main thread blocks at the next statement until a message received
latch.await();
connection.stop();
// Close everything in the order reversed from the opening order
// NOTE: as the interfaces below extend AutoCloseable,
// with them it's possible to use the "try-with-resources" Java statement
// see details at https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
replyProducer.close();
requestConsumer.close();
session.close();
connection.close();
}
public void doTestScheduleRepeated(File existingStore) throws Exception {
File testDir = new File("target/activemq-data/store/scheduler/versionDB");
IOHelper.deleteFile(testDir);
IOHelper.copyFile(existingStore, testDir);
final int NUMBER = 10;
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
for (int i = 0; i < 3; ++i) {
JobSchedulerStoreImpl scheduler = new JobSchedulerStoreImpl();
scheduler.setDirectory(testDir);
scheduler.setJournalMaxFileLength(1024 * 1024);
BrokerService broker = createBroker(scheduler);
broker.start();
broker.waitUntilStarted();
final AtomicInteger count = new AtomicInteger();
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test.queue");
MessageConsumer consumer = session.createConsumer(queue);
final CountDownLatch latch = new CountDownLatch(NUMBER);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
LOG.info("Received scheduled message: {}", message);
latch.countDown();
count.incrementAndGet();
}
});
connection.start();
assertEquals(latch.getCount(), NUMBER);
latch.await(30, TimeUnit.SECONDS);
connection.close();
broker.stop();
broker.waitUntilStopped();
assertEquals(0, latch.getCount());
}
}
public void testIndexPageUsage() throws Exception {
Connection con = createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "true", "filter = 'true'", true);
session.close();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "false", "filter = 'false'", true);
session.close();
con.close();
// send messages
final Connection sendCon = createConnection("send");
final Session sendSession = sendCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = sendSession.createProducer(null);
Thread sendThread = new Thread() {
@Override
public void run() {
try {
for (int i = 0; i < messageCount; i++) {
boolean filter = i % 2 == 1;
Message message = sendSession.createMessage();
message.setStringProperty("filter", filter ? "true" : "false");
producer.send(topic, message);
if (i > 0 && i % 1000 == 0) {
LOG.info("Sent:" + i);
}
}
sendSession.close();
sendCon.close();
} catch (Exception e) {
exceptions.add(e);
}
}
};
sendThread.start();
sendThread.join();
// settle with sent messages
TimeUnit.SECONDS.sleep(4);
// consume messages
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumerTrue = session.createDurableSubscriber(topic, "true", "filter = 'true'", true);
Listener listenerT = new Listener();
consumerTrue.setMessageListener(listenerT);
waitFor(listenerT, messageCount / 2);
MessageConsumer consumerFalse = session.createDurableSubscriber(topic, "false", "filter = 'false'", true);
Listener listenerF = new Listener();
consumerFalse.setMessageListener(listenerF);
waitFor(listenerF, messageCount / 2);
assertEquals(messageCount / 2, listenerT.count);
assertEquals(messageCount / 2, listenerF.count);
consumerTrue.close();
session.unsubscribe("true");
consumerFalse.close();
session.unsubscribe("false");
session.close();
con.close();
PersistenceAdapter persistenceAdapter = broker.getPersistenceAdapter();
if (persistenceAdapter instanceof KahaDBPersistenceAdapter) {
final KahaDBStore store = ((KahaDBPersistenceAdapter) persistenceAdapter).getStore();
LOG.info("Store page count: " + store.getPageFile().getPageCount());
LOG.info("Store free page count: " + store.getPageFile().getFreePageCount());
LOG.info("Store page in-use: " + (store.getPageFile().getPageCount() - store.getPageFile().getFreePageCount()));
assertTrue("no leak of pages, always use just 10", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 10 == store.getPageFile().getPageCount() - store.getPageFile().getFreePageCount();
}
}, TimeUnit.SECONDS.toMillis(10)));
}
}
@Test
public void testSortedQueueWithAscendingSortedKeys() throws Exception
{
final String queueName = getTestName();
final Queue queue = createSortedQueue(queueName, TEST_SORT_KEY);
final Connection consumerConnection = getConnectionBuilder().setPrefetch(0).build();
try
{
final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = consumerSession.createConsumer(queue);
final CountDownLatch receiveLatch = new CountDownLatch(NUMBER_OF_MESSAGES);
consumer.setMessageListener(new CountingMessageListener(receiveLatch, consumerSession));
consumerConnection.start();
final Connection producerConnection = getConnection();
try
{
final Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
final MessageProducer producer = producerSession.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
{
final Message message = producerSession.createMessage();
message.setStringProperty(TEST_SORT_KEY, AscendingSortedKeys.getNextKey());
producer.send(message);
producerSession.commit();
}
}
finally
{
producerConnection.close();
}
assertTrue("Messages were not received during expected time",
receiveLatch.await(getReceiveTimeout() * NUMBER_OF_MESSAGES, TimeUnit.MILLISECONDS));
}
finally
{
consumerConnection.close();
}
}