下面列出了怎么用javax.jms.QueueBrowser的API类实例代码及写法,或者点击链接到github查看源代码。
private static void assertNumMessagesInQueue(String message, Connection connection, Queue queue,
int expectedNum, long timeout) throws JMSException, InterruptedException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(queue);
int actualNum = 0;
for (long startTime = System.currentTimeMillis(); System.currentTimeMillis() - startTime < timeout;
Thread.sleep(100L)) {
actualNum = 0;
for (Enumeration<?> messages = browser.getEnumeration(); messages.hasMoreElements(); actualNum++) {
messages.nextElement();
}
if (actualNum == expectedNum) {
break;
}
//System.out.println("Messages in queue " + queue.getQueueName() + ": " + actualNum
// + ", expecting: " + expectedNum);
}
browser.close();
session.close();
assertEquals(message + " -> number of messages on queue", expectedNum, actualNum);
}
@Override
@Nullable
public <T> T browseSelected(final String queueName, @Nullable final String messageSelector, final BrowserCallback<T> action)
throws JmsException {
Assert.notNull(action, "Callback object must not be null");
return execute(session -> {
Queue queue = (Queue) getDestinationResolver().resolveDestinationName(session, queueName, false);
QueueBrowser browser = createBrowser(session, queue, messageSelector);
try {
return action.doInJms(session, browser);
}
finally {
JmsUtils.closeQueueBrowser(browser);
}
}, true);
}
@Override
@Nullable
public <T> T browseSelected(final Queue queue, @Nullable final String messageSelector, final BrowserCallback<T> action)
throws JmsException {
Assert.notNull(action, "Callback object must not be null");
return execute(session -> {
QueueBrowser browser = createBrowser(session, queue, messageSelector);
try {
return action.doInJms(session, browser);
}
finally {
JmsUtils.closeQueueBrowser(browser);
}
}, true);
}
private static void assertNumMessagesInQueue(String message, Connection connection, Queue queue,
int expectedNum, int timeout) throws JMSException, InterruptedException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(queue);
int actualNum = 0;
for (long startTime = System.currentTimeMillis(); System.currentTimeMillis() - startTime < timeout;
Thread.sleep(100L)) {
actualNum = 0;
for (Enumeration<?> messages = browser.getEnumeration(); messages.hasMoreElements(); actualNum++) {
messages.nextElement();
}
if (actualNum == expectedNum) {
break;
}
//System.out.println("Messages in queue " + queue.getQueueName() + ": " + actualNum
// + ", expecting: " + expectedNum);
}
browser.close();
session.close();
Assert.assertEquals(message + " -> number of messages", expectedNum, actualNum);
}
@Test
public void testGetQueue() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser.getQueue());
browser.close();
browser.close();
try {
browser.getQueue();
fail("Should not be able to use a closed browser");
} catch (IllegalStateException ise) {
}
}
/**
* Create a browser
*
* @param queue The queue
* @param messageSelector The message selector
* @return The browser
* @throws JMSException Thrown if an error occurs
*/
@Override
public QueueBrowser createBrowser(final Queue queue, final String messageSelector) throws JMSException {
if (cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION) {
throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession");
}
Session session = getSessionInternal();
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createBrowser " + session + " queue=" + queue + " selector=" + messageSelector);
}
QueueBrowser result = session.createBrowser(queue, messageSelector);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createdBrowser " + session + " browser=" + result);
}
return result;
}
@Test
public void testGetEnumeration() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser.getEnumeration());
browser.close();
try {
browser.getEnumeration();
fail("Should not be able to use a closed browser");
} catch (IllegalStateException ise) {
}
}
@Test(timeout=30000)
public void testBroseOneInQueue() throws Exception {
connection = createAmqpConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello"));
producer.close();
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
assertTrue(m instanceof TextMessage);
LOG.debug("Browsed message {} from Queue {}", m, queue);
}
browser.close();
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(5000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
}
@Test
public void stoppedConnection() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(queue);
try
{
browser.getEnumeration();
// PASS
}
catch (IllegalStateException e)
{
//PASS
}
}
finally
{
connection.close();
}
}
@Override
public <T> T browseSelected(final Queue queue, final String messageSelector, final BrowserCallback<T> action)
throws JmsException {
Assert.notNull(action, "Callback object must not be null");
return execute(new SessionCallback<T>() {
@Override
public T doInJms(Session session) throws JMSException {
QueueBrowser browser = createBrowser(session, queue, messageSelector);
try {
return action.doInJms(session, browser);
}
finally {
JmsUtils.closeQueueBrowser(browser);
}
}
}, true);
}
@Override
public <T> T browseSelected(final String queueName, final String messageSelector, final BrowserCallback<T> action)
throws JmsException {
Assert.notNull(action, "Callback object must not be null");
return execute(new SessionCallback<T>() {
@Override
public T doInJms(Session session) throws JMSException {
Queue queue = (Queue) getDestinationResolver().resolveDestinationName(session, queueName, false);
QueueBrowser browser = createBrowser(session, queue, messageSelector);
try {
return action.doInJms(session, browser);
}
finally {
JmsUtils.closeQueueBrowser(browser);
}
}
}, true);
}
private void checkQueueBrowse(final CheckContext context) throws Exception {
try (Connection connection = context.getFactory().createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser queueBrowser = session.createBrowser(session.createQueue(getName()))) {
connection.start();
Enumeration<Message> queueBrowserEnum = queueBrowser.getEnumeration();
if (browse == -1) {
queueBrowserEnum.hasMoreElements();
} else {
int count = 0;
while (count < browse) {
if (!queueBrowserEnum.hasMoreElements() || queueBrowserEnum.nextElement() == null) {
throw new CheckException("Insufficient messages to browse: " + count);
}
count++;
}
}
}
}
protected void browseTestQueueMessages(String queueName) throws Exception {
// Start the connection
Connection connection = cf.createConnection();
connection.setClientID("clientId2" + queueName);
connection.start();
Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
try {
QueueBrowser queueBrowser = session.createBrowser(queue);
@SuppressWarnings("unchecked")
Enumeration<Message> messages = queueBrowser.getEnumeration();
while (messages.hasMoreElements()) {
messages.nextElement();
}
} finally {
connection.close();
}
}
@Test
public void testQueueBrowser() throws Exception {
// Send a message to the broker.
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, destinationType);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(this.deliveryMode);
sendMessages(session, producer, 5);
producer.close();
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration<?> enumeration = browser.getEnumeration();
for (int i = 0; i < 5; i++) {
Thread.sleep(100);
assertTrue(enumeration.hasMoreElements());
Message m = (Message) enumeration.nextElement();
assertNotNull(m);
assertEquals("" + i, ((TextMessage) m).getText());
}
assertFalse(enumeration.hasMoreElements());
}
public void testQueueBrowser() throws Exception {
// Send a message to the broker.
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationType);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(this.deliveryMode);
sendMessages(session, producer, 5);
producer.close();
QueueBrowser browser = session.createBrowser((Queue) destination);
Enumeration<?> enumeration = browser.getEnumeration();
for (int i = 0; i < 5; i++) {
Thread.sleep(100);
assertTrue(enumeration.hasMoreElements());
Message m = (Message) enumeration.nextElement();
assertNotNull(m);
assertEquals("" + i, ((TextMessage) m).getText());
}
assertFalse(enumeration.hasMoreElements());
}
@Test(timeout = 40000)
public void testNoMessagesBrowserHasNoElements() throws Exception {
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
javax.jms.Queue queue = session.createQueue(getQueueName());
session.createConsumer(queue).close();
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Queue queueView = getProxyToQueue(getQueueName());
assertEquals(0, queueView.getMessageCount());
Enumeration<?> enumeration = browser.getEnumeration();
assertFalse(enumeration.hasMoreElements());
}
@Test(timeout = 30000)
public void testBroseOneInQueue() throws Exception {
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello"));
producer.close();
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
assertTrue(m instanceof TextMessage);
LOG.debug("Browsed message {} from Queue {}", m, queue);
}
browser.close();
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(5000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
}
@Test(timeout = 60000)
public void testBrowseAllInQueuePrefetchOne() throws Exception {
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
javax.jms.Queue queue = session.createQueue(getQueueName());
sendMessages(name.getMethodName(), 5, false);
Queue queueView = getProxyToQueue(getQueueName());
Wait.assertEquals(5, queueView::getMessageCount);
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
}
public void testBrowseExpiredMessages() throws Exception {
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1199/jmxrmi");
JMXConnector connector = JMXConnectorFactory.connect(url, null);
connector.connect();
MBeanServerConnection connection = connector.getMBeanServerConnection();
ObjectName name = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost," + "destinationType=Queue,destinationName=TEST.Q");
QueueViewMBean queueMbean = MBeanServerInvocationHandler.newProxyInstance(connection, name, QueueViewMBean.class, true);
HashMap<String, String> headers = new HashMap<>();
headers.put("timeToLive", Long.toString(2000));
headers.put("JMSDeliveryMode", Integer.toString(DeliveryMode.PERSISTENT));
queueMbean.sendTextMessage(headers, "test", "system", "manager");
// allow message to expire on the queue
TimeUnit.SECONDS.sleep(4);
Connection c = new ActiveMQConnectionFactory("vm://localhost").createConnection("system", "manager");
c.start();
// browser consumer will force expiration check on addConsumer
QueueBrowser browser = c.createSession(false, Session.AUTO_ACKNOWLEDGE).createBrowser(new ActiveMQQueue("TEST.Q"));
assertTrue("no message in the q", !browser.getEnumeration().hasMoreElements());
// verify dlq got the message, no security exception as brokers context is now used
browser = c.createSession(false, Session.AUTO_ACKNOWLEDGE).createBrowser(new ActiveMQQueue("ActiveMQ.DLQ"));
assertTrue("one message in the dlq", browser.getEnumeration().hasMoreElements());
}
@Test(timeout=30000)
public void testCreateQueueBrowserAutoAckSession() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
// Expected the browser to create a consumer and send credit
testPeer.expectQueueBrowserAttach();
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
// Then expect it to drain it when no message arrives before hasMoreElements is called,
// at which point we send one, and a response flow to indicate the rest of the credit was drained.
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
1, true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)), 1, true, false);
// Expect the credit window to be opened again, but accounting for the message we just prefetched.
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH - 1)));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertTrue(queueView.hasMoreElements());
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=30000)
public void testCreateQueueBrowserClientAckSession() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
// Expected the browser to create a consumer and send credit
testPeer.expectQueueBrowserAttach();
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
// Then expect it to drain it when no message arrives before hasMoreElements is called,
// at which point we send one, and a response flow to indicate the rest of the credit was drained.
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
1, true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)), 1, true, false);
// Expect the credit window to be opened again, but accounting for the message we just prefetched.
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH - 1)));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertTrue(queueView.hasMoreElements());
browser.close();
testPeer.expectEnd();
session.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=30000)
public void testQueueBrowserHasMoreElementsZeroPrefetchNoMessage() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the browser to create a consumer, then drain with 1 credit.
testPeer.expectQueueBrowserAttach();
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.ONE));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertFalse(queueView.hasMoreElements());
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
public void testPurgeCommandComplexSelector() throws Exception {
try {
PurgeCommand purgeCommand = new PurgeCommand();
CommandContext context = new CommandContext();
context.setFormatter(new CommandShellOutputFormatter(System.out));
purgeCommand.setCommandContext(context);
purgeCommand.setJmxUseLocal(true);
List<String> tokens = new ArrayList<>();
tokens.add("--msgsel");
tokens.add(MSG_SEL_COMPLEX);
addMessages();
validateCounts(MESSAGE_COUNT, MESSAGE_COUNT, MESSAGE_COUNT * 2);
purgeCommand.execute(tokens);
QueueBrowser withPropertyBrowser = requestServerSession.createBrowser(theQueue, MSG_SEL_COMPLEX);
QueueBrowser allBrowser = requestServerSession.createBrowser(theQueue);
int withCount = getMessageCount(withPropertyBrowser, "withProperty ");
int allCount = getMessageCount(allBrowser, "allMessages ");
withPropertyBrowser.close();
allBrowser.close();
assertEquals("Expected withCount to be " + "0" + " was " + withCount, 0, withCount);
assertEquals("Expected allCount to be " + MESSAGE_COUNT + " was " + allCount, MESSAGE_COUNT, allCount);
LOG.info("withCount = " + withCount + "\n allCount = " +
allCount + "\n = " + "\n");
} finally {
purgeAllMessages();
}
}
@Test
public void testBrowsingLimited() throws Exception {
int messageToSend = 470;
ActiveMQQueue queue = new ActiveMQQueue("TEST");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
String data = "";
for (int i = 0; i < 1024 * 2; i++) {
data += "x";
}
for (int i = 0; i < messageToSend; i++) {
producer.send(session.createTextMessage(data));
}
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
int received = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
received++;
LOG.info("Browsed message " + received + ": " + m.getJMSMessageID());
}
browser.close();
assertEquals(browserLimit, received);
}
@Override
public QueueBrowser createBrowser(Queue queue, String selector) {
try {
return startIfNeeded(getSession().createBrowser(queue, selector));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
protected static long getQueueFirstMessageAge(Session jSession,
String queueName, String messageSelector, long currentTime,
boolean warn) throws JMSException {
QueueBrowser queueBrowser = null;
try {
Queue queue = jSession.createQueue(queueName);
if (messageSelector == null) {
queueBrowser = jSession.createBrowser(queue);
} else {
queueBrowser = jSession.createBrowser(queue, messageSelector);
}
Enumeration enm = queueBrowser.getEnumeration();
if (enm.hasMoreElements()) {
Object o = enm.nextElement();
if (o instanceof Message) {
Message msg = (Message) o;
long jmsTimestamp = msg.getJMSTimestamp();
return currentTime - jmsTimestamp;
} else {
if (warn) {
log.warn("message was not of type Message, but ["
+ o.getClass().getName() + "]");
}
return -2;
}
} else {
return -1;
}
} finally {
if (queueBrowser != null) {
try {
queueBrowser.close();
} catch (JMSException e) {
log.warn("Exception on closing queueBrowser", e);
}
}
}
}
@Test(timeout = 40000)
public void testBrowseAllInQueueZeroPrefetch() throws Exception {
connection = createAmqpConnection();
JmsConnection jmsConnection = (JmsConnection) connection;
((JmsDefaultPrefetchPolicy) jmsConnection.getPrefetchPolicy()).setQueueBrowserPrefetch(0);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendToAmqQueue(5);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(5, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
}
@Override
public QueueBrowser createBrowser(Queue queue) {
try {
return startIfNeeded(getSession().createBrowser(queue));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public QueueBrowser createBrowser(Queue queue, String selector) {
try {
return startIfNeeded(getSession().createBrowser(queue, selector));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
private QueueBrowser startIfNeeded(QueueBrowser browser) throws JMSException {
if (getAutoStart()) {
connection.start();
}
return browser;
}