下面列出了javax.jms.QueueBrowser#getEnumeration ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test(timeout = 40000)
public void testNoMessagesBrowserHasNoElements() throws Exception {
connection = createAmqpConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
session.createConsumer(queue).close();
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(0, proxy.getQueueSize());
Enumeration enumeration = browser.getEnumeration();
assertFalse(enumeration.hasMoreElements());
}
protected void browseTestQueueMessages(String queueName) throws Exception {
// Start the connection
Connection connection = cf.createConnection();
connection.setClientID("clientId2" + queueName);
connection.start();
Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
try {
QueueBrowser queueBrowser = session.createBrowser(queue);
@SuppressWarnings("unchecked")
Enumeration<Message> messages = queueBrowser.getEnumeration();
while (messages.hasMoreElements()) {
messages.nextElement();
}
} finally {
connection.close();
}
}
@Test
public void testQueueBrowser() throws Exception {
// Send a message to the broker.
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, destinationType);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(this.deliveryMode);
sendMessages(session, producer, 5);
producer.close();
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration<?> enumeration = browser.getEnumeration();
for (int i = 0; i < 5; i++) {
Thread.sleep(100);
assertTrue(enumeration.hasMoreElements());
Message m = (Message) enumeration.nextElement();
assertNotNull(m);
assertEquals("" + i, ((TextMessage) m).getText());
}
assertFalse(enumeration.hasMoreElements());
}
@Test(timeout = 40000)
public void testNoMessagesBrowserHasNoElements() throws Exception {
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
javax.jms.Queue queue = session.createQueue(getQueueName());
session.createConsumer(queue).close();
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(0, queueView.getMessageCount());
Enumeration<?> enumeration = browser.getEnumeration();
assertFalse(enumeration.hasMoreElements());
}
@Test(timeout = 40000)
public void testBrowseAllInQueueTxSession() throws Exception {
connection = createAmqpConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendToAmqQueue(5);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(5, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
}
private static void assertNumMessagesInQueue(String message, Connection connection, Queue queue,
int expectedNum, long timeout) throws JMSException, InterruptedException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(queue);
int actualNum = 0;
for (long startTime = System.currentTimeMillis(); System.currentTimeMillis() - startTime < timeout;
Thread.sleep(100L)) {
actualNum = 0;
for (Enumeration<?> messages = browser.getEnumeration(); messages.hasMoreElements(); actualNum++) {
messages.nextElement();
}
if (actualNum == expectedNum) {
break;
}
//System.out.println("Messages in queue " + queue.getQueueName() + ": " + actualNum
// + ", expecting: " + expectedNum);
}
browser.close();
session.close();
assertEquals(message + " -> number of messages on queue", expectedNum, actualNum);
}
@Test
public void testBrowsing() throws JMSException {
int messageToSend = 370;
ActiveMQQueue queue = new ActiveMQQueue("TEST");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
String data = "";
for (int i = 0; i < 1024 * 2; i++) {
data += "x";
}
for (int i = 0; i < messageToSend; i++) {
producer.send(session.createTextMessage(data));
}
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
int received = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
received++;
LOG.info("Browsed message " + received + ": " + m.getJMSMessageID());
}
browser.close();
assertEquals(messageToSend, received);
}
public int countMessagesInQueue(final String queueName) throws Exception {
final Queue queue = jmsContext.createQueue(queueName);
final QueueBrowser browser = jmsContext.createBrowser(queue);
final Enumeration<?> msgEnumeration = browser.getEnumeration();
int count = 0;
while (msgEnumeration.hasMoreElements()) {
msgEnumeration.nextElement();
count++;
}
return count;
}
@Test(timeout = 40000)
public void testBrowseAllInQueuePrefetchOne() throws Exception {
connection = createAmqpConnection();
JmsConnection jmsConnection = (JmsConnection) connection;
((JmsDefaultPrefetchPolicy) jmsConnection.getPrefetchPolicy()).setQueueBrowserPrefetch(1);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendToAmqQueue(5);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(5, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
}
@Test
public void testBrowserAndConsumerSimultaneousDifferentConnections() throws Exception {
((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
conn = cf.createConnection();
Connection connConsumer = cf.createConnection();
Session sessionConsumer = connConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
MessageProducer producer = session.createProducer(jBossQueue);
MessageConsumer consumer = sessionConsumer.createConsumer(jBossQueue);
int noOfMessages = 1000;
for (int i = 0; i < noOfMessages; i++) {
TextMessage textMessage = session.createTextMessage("m" + i);
textMessage.setIntProperty("i", i);
producer.send(textMessage);
}
connConsumer.start();
QueueBrowser browser = session.createBrowser(jBossQueue);
Enumeration enumMessages = browser.getEnumeration();
for (int i = 0; i < noOfMessages; i++) {
TextMessage msg = (TextMessage) enumMessages.nextElement();
Assert.assertNotNull(msg);
Assert.assertEquals(i, msg.getIntProperty("i"));
TextMessage recvMessage = (TextMessage) consumer.receiveNoWait();
Assert.assertNotNull(recvMessage);
Assert.assertEquals(i, msg.getIntProperty("i"));
}
Message m = consumer.receiveNoWait();
Assert.assertFalse(enumMessages.hasMoreElements());
Assert.assertNull(m);
conn.close();
}
@Test
public void testBrowserOnly() throws Exception {
((ActiveMQConnectionFactory) cf).setConsumerWindowSize(0);
conn = cf.createConnection();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
MessageProducer producer = session.createProducer(jBossQueue);
int noOfMessages = 10;
for (int i = 0; i < noOfMessages; i++) {
TextMessage textMessage = session.createTextMessage("m" + i);
textMessage.setIntProperty("i", i);
producer.send(textMessage);
}
QueueBrowser browser = session.createBrowser(jBossQueue);
Enumeration enumMessages = browser.getEnumeration();
for (int i = 0; i < noOfMessages; i++) {
Assert.assertTrue(enumMessages.hasMoreElements());
TextMessage msg = (TextMessage) enumMessages.nextElement();
Assert.assertNotNull(msg);
Assert.assertEquals(i, msg.getIntProperty("i"));
}
Assert.assertFalse(enumMessages.hasMoreElements());
conn.close();
// Asserting delivering count is zero is bogus since messages might still be being delivered and expired at this
// point
// which can cause delivering count to flip to 1
}
protected void executeCopy(CommandLine cmdLine) throws JMSException, ScriptException, IOException {
Queue tq = sess.createQueue(cmdLine.getArgs()[0]);
Queue q = sess.createQueue(cmdLine.getOptionValue(CMD_COPY_QUEUE)); // Source
QueueBrowser qb = null;
MessageProducer mp = sess.createProducer(tq);
if (cmdLine.hasOption(CMD_SELECTOR)) { // Selectors
qb = sess.createBrowser(q, cmdLine.getOptionValue(CMD_SELECTOR));
} else {
qb = sess.createBrowser(q);
}
int count = Integer.parseInt(cmdLine.getOptionValue(CMD_COUNT,
DEFAULT_COUNT_ALL));
int i = 0, j = 0;
@SuppressWarnings("unchecked")
Enumeration<Message> en = qb.getEnumeration();
while ((i < count || count == 0) && en.hasMoreElements()) {
Message msg = en.nextElement();
if (msg == null) {
break;
} else {
// if search is enabled
if (cmdLine.hasOption(CMD_FIND)) {
if (msg instanceof TextMessage) {
String haystack = ((TextMessage) msg).getText();
String needle = cmdLine.getOptionValue(CMD_FIND);
if (haystack != null && haystack.contains(needle)) {
sendWithOptionalTransformer(cmdLine, msg, mp);
++j;
}
}
} else {
sendWithOptionalTransformer(cmdLine, msg, mp);
++j;
}
++i;
}
}
output(j, " msgs copied from ", cmdLine.getOptionValue(CMD_COPY_QUEUE),
" to ", cmdLine.getArgs()[0]);
}
@Test(timeout = 60000)
public void testBrowseAllInQueueSmallPrefetch() throws Exception {
connection = createAmqpConnection();
((JmsDefaultPrefetchPolicy) ((JmsConnection) connection).getPrefetchPolicy()).setQueueBrowserPrefetch(1);
connection.start();
final int MSG_COUNT = 30;
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendToAmqQueue(MSG_COUNT);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(MSG_COUNT, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
assertFalse(enumeration.hasMoreElements());
assertEquals(MSG_COUNT, count);
}
@Test(timeout = 60000)
public void testBrowseAllInQueueZeroPrefetch() throws Exception {
final int MSG_COUNT = 5;
JmsConnection connection = (JmsConnection) createConnection();
((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setAll(0);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
javax.jms.Queue queue = session.createQueue(getQueueName());
sendMessages(name.getMethodName(), MSG_COUNT, false);
Queue queueView = getProxyToQueue(getQueueName());
Wait.assertEquals(MSG_COUNT, queueView::getMessageCount);
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (count < MSG_COUNT && enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
LOG.debug("Received all expected message, checking that hasMoreElements returns false");
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
}
@Test(timeout = 60000)
public void testBrowseAllInQueue() throws Exception {
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
javax.jms.Queue queue = session.createQueue(getQueueName());
sendMessages(name.getMethodName(), 5, false);
Queue queueView = getProxyToQueue(getQueueName());
Wait.assertEquals(5, queueView::getMessageCount);
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
TimeUnit.MILLISECONDS.sleep(50);
}
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
}
@Test(timeout=30000)
public void testQueueBrowserNextElementWithNoMessage() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the browser to create a consumer and send credit, then drain it when
// no message is there to satisfy an internal hasMoreElements check, then send more
// credit to reopen a window.
testPeer.expectQueueBrowserAttach();
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
try {
queueView.nextElement();
fail("Should have thrown an exception due to there being no more elements");
} catch (NoSuchElementException nsee) {
// expected
}
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
/**
* Tests the queue browser. Browses the messages then the consumer tries to receive them. The messages should still
* be in the queue even when it was browsed.
*
* @throws Exception
*/
public void testReceiveBrowseReceive() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
Message[] outbound = new Message[]{session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")};
// lets consume any outstanding messages from previous test runs
while (consumer.receive(1000) != null) {
}
producer.send(outbound[0]);
producer.send(outbound[1]);
producer.send(outbound[2]);
// Get the first.
assertEquals(outbound[0], consumer.receive(1000));
consumer.close();
QueueBrowser browser = session.createBrowser(destination);
Enumeration<?> enumeration = browser.getEnumeration();
// browse the second
assertTrue("should have received the second message", enumeration.hasMoreElements());
assertEquals(outbound[1], enumeration.nextElement());
// browse the third.
assertTrue("Should have received the third message", enumeration.hasMoreElements());
assertEquals(outbound[2], enumeration.nextElement());
// There should be no more.
boolean tooMany = false;
while (enumeration.hasMoreElements()) {
LOG.info("Got extra message: " + ((TextMessage) enumeration.nextElement()).getText());
tooMany = true;
}
assertFalse(tooMany);
browser.close();
// Re-open the consumer.
consumer = session.createConsumer(destination);
// Receive the second.
assertEquals(outbound[1], consumer.receive(1000));
// Receive the third.
assertEquals(outbound[2], consumer.receive(1000));
consumer.close();
}
@Override
public void start()
{
System.out.println("Thread context class loader is: " + Thread.currentThread().getContextClassLoader());
System.out.println("Class class loader used for loading test class is: " + this.getClass().getClassLoader());
int nb = 0;
try
{
// Get the QCF
Object o = NamingManager.getInitialContext(null).lookup("jms/qcf");
System.out.println("Received a " + o.getClass());
// Do as cast & see if no errors
QueueConnectionFactory qcf = (QueueConnectionFactory) o;
// Get the Queue
Object p = NamingManager.getInitialContext(null).lookup("jms/testqueue");
System.out.println("Received a " + p.getClass());
Queue q = (Queue) p;
// Now that we are sure that JNDI works, let's write a message
System.out.println("Opening connection & session to the broker");
Connection connection = qcf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
System.out.println("Creating producer");
MessageProducer producer = session.createProducer(q);
TextMessage message = session.createTextMessage("HOUBA HOP. SIGNED: MARSUPILAMI");
System.out.println("Sending message");
producer.send(message);
producer.close();
session.commit();
System.out.println("A message was sent to the broker");
// Browse and check the message is there
Connection connection2 = qcf.createConnection();
connection2.start();
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser qb = session2.createBrowser(q);
Enumeration<TextMessage> msgs = qb.getEnumeration();
while (msgs.hasMoreElements())
{
TextMessage msg = msgs.nextElement();
System.out.println("Message received: " + msg.getText());
nb++;
}
System.out.println("Browsing will end here");
qb.close();
System.out.println("End of browsing. Nb of message read: " + nb);
// We are done!
connection.close();
connection2.close();
} catch (Exception e)
{
e.printStackTrace();
}
if (nb == 0)
throw new RuntimeException("test has failed - no messages were received.");
}
/**
* Tests the queue browser. Browses the messages then the consumer tries to
* receive them. The messages should still be in the queue even when it was
* browsed.
*
* @throws Exception
*/
public void testReceiveBrowseReceive() throws Exception {
Message[] outbound = new Message[]{session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")};
// lets consume any outstanding messages from previous test runs
beginTx();
while (consumer.receive(1000) != null) {
}
commitTx();
beginTx();
producer.send(outbound[0]);
producer.send(outbound[1]);
producer.send(outbound[2]);
commitTx();
// Get the first.
beginTx();
assertEquals(outbound[0], consumer.receive(1000));
consumer.close();
commitTx();
beginTx();
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration enumeration = browser.getEnumeration();
// browse the second
assertTrue("should have received the second message", enumeration.hasMoreElements());
assertEquals(outbound[1], (Message) enumeration.nextElement());
// browse the third.
assertTrue("Should have received the third message", enumeration.hasMoreElements());
assertEquals(outbound[2], (Message) enumeration.nextElement());
LOG.info("Check for more...");
// There should be no more.
boolean tooMany = false;
while (enumeration.hasMoreElements()) {
LOG.info("Got extra message: " + ((TextMessage) enumeration.nextElement()).getText());
tooMany = true;
}
assertFalse(tooMany);
LOG.info("close browser...");
browser.close();
LOG.info("reopen and consume...");
// Re-open the consumer.
consumer = resourceProvider.createConsumer(session, destination);
// Receive the second.
assertEquals(outbound[1], consumer.receive(1000));
// Receive the third.
assertEquals(outbound[2], consumer.receive(1000));
consumer.close();
commitTx();
}
@Test(timeout = 40000)
public void testQueueBrowserInTxSessionLeavesOtherWorkUnaffected() throws Exception {
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
assertNotNull(session);
javax.jms.Queue queue = session.createQueue(getQueueName());
sendMessages(name.getMethodName(), 5, false);
Queue queueView = getProxyToQueue(getQueueName());
Wait.assertEquals(5, queueView::getMessageCount);
// Send some TX work but don't commit.
MessageProducer txProducer = session.createProducer(queue);
for (int i = 0; i < 5; ++i) {
txProducer.send(session.createMessage());
}
assertEquals(5, queueView.getMessageCount());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
browser.close();
// Now check that all browser work did not affect the session transaction.
Wait.assertEquals(5, queueView::getMessageCount);
session.commit();
Wait.assertEquals(10, queueView::getMessageCount);
}