下面列出了javax.jms.QueueBrowser#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test(timeout=30000)
public void testCreateQueueBrowserWithoutEnumeration() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectEnd();
testPeer.expectClose();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Creating the browser should send nothing until an Enumeration is created.
QueueBrowser browser = session.createBrowser(queue);
browser.close();
session.close();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
private void browseQueueAndValidationDeliveryHeaders(final Session session, final Queue queue) throws Exception
{
final QueueBrowser browser = session.createBrowser(queue);
@SuppressWarnings("unchecked")
final List<Message> messages = (List<Message>) new ArrayList(Collections.list(browser.getEnumeration()));
assertThat("Unexpected number of messages seen by browser", messages.size(), is(equalTo(1)));
final Message browsedMessage = messages.get(0);
assertThat(browsedMessage.getJMSRedelivered(), is(equalTo(false)));
if (browsedMessage.propertyExists(JMSX_DELIVERY_COUNT))
{
assertThat(browsedMessage.getIntProperty(JMSX_DELIVERY_COUNT), is(equalTo(1)));
}
browser.close();
}
@Test(timeout = 30000)
public void testBroseOneInQueue() throws Exception {
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello"));
producer.close();
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
assertTrue(m instanceof TextMessage);
LOG.debug("Browsed message {} from Queue {}", m, queue);
}
browser.close();
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(5000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
}
@Test(timeout=30000)
public void testBroseOneInQueue() throws Exception {
connection = createAmqpConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello"));
producer.close();
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
assertTrue(m instanceof TextMessage);
LOG.debug("Browsed message {} from Queue {}", m, queue);
}
browser.close();
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(5000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
}
protected static long getQueueMessageCount(Session jSession,
String queueName, String messageSelector) throws JMSException {
QueueBrowser queueBrowser = null;
try {
Queue queue = jSession.createQueue(queueName);
if (messageSelector == null) {
queueBrowser = jSession.createBrowser(queue);
} else {
queueBrowser = jSession.createBrowser(queue, messageSelector);
}
int count = 0;
for (Enumeration enm = queueBrowser.getEnumeration(); enm
.hasMoreElements(); enm.nextElement()) {
count++;
}
return count;
} finally {
if (queueBrowser != null) {
try {
queueBrowser.close();
} catch (JMSException e) {
log.warn("Exception on closing queueBrowser", e);
}
}
}
}
@Test(timeout=30000)
public void testQueueBrowserPrefetchOne() throws IOException, Exception {
final DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=1");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the browser to create a consumer and send credit, then drain it when
// no message arrives before hasMoreElements is called, at which point we send one.
testPeer.expectQueueBrowserAttach();
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.ONE));
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
1, true, true, Matchers.equalTo(UnsignedInteger.ONE), 1, true, false);
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.ONE));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertTrue(queueView.hasMoreElements());
assertNotNull(queueView.nextElement());
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=30000)
public void testQueueBrowserHasMoreElementsZeroPrefetchNoMessage() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the browser to create a consumer, then drain with 1 credit.
testPeer.expectQueueBrowserAttach();
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.ONE));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertFalse(queueView.hasMoreElements());
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=30000)
public void testCreateQueueBrowserAutoAckSession() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
// Expected the browser to create a consumer and send credit
testPeer.expectQueueBrowserAttach();
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
// Then expect it to drain it when no message arrives before hasMoreElements is called,
// at which point we send one, and a response flow to indicate the rest of the credit was drained.
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
1, true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)), 1, true, false);
// Expect the credit window to be opened again, but accounting for the message we just prefetched.
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH - 1)));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertTrue(queueView.hasMoreElements());
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
private boolean queueContainsMessages(Connection connection, String queueName) throws JMSException
{
Session session = null;
try
{
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = null;
try
{
browser = session.createBrowser(session.createQueue(queueName));
return browser.getEnumeration().hasMoreElements();
}
finally
{
if (browser != null)
{
browser.close();
}
}
}
finally
{
if (session != null)
{
session.close();
}
}
}
@Test
public void browserIsNonDestructive() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
final Message message = session.createMessage();
producer.send(message);
producer.close();
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
assertTrue("Unexpected browser state", enumeration.hasMoreElements());
Message browsedMessage = (Message) enumeration.nextElement();
assertNotNull("No message returned by browser", browsedMessage);
assertEquals("Unexpected JMSMessageID on browsed message", message.getJMSMessageID(), browsedMessage.getJMSMessageID());
browser.close();
MessageConsumer consumer = session.createConsumer(queue);
Message consumedMessage = consumer.receive(getReceiveTimeout());
assertNotNull("No message returned by consumer", consumedMessage);
assertEquals("Unexpected JMSMessageID on consumed message", message.getJMSMessageID(), consumedMessage.getJMSMessageID());
QueueBrowser browser2 = session.createBrowser(queue);
Enumeration enumeration2 = browser2.getEnumeration();
assertFalse("Unexpected browser state", enumeration2.hasMoreElements());
browser2.close();
}
finally
{
connection.close();
}
}
protected static long getQueueFirstMessageAge(Session jSession,
String queueName, String messageSelector, long currentTime,
boolean warn) throws JMSException {
QueueBrowser queueBrowser = null;
try {
Queue queue = jSession.createQueue(queueName);
if (messageSelector == null) {
queueBrowser = jSession.createBrowser(queue);
} else {
queueBrowser = jSession.createBrowser(queue, messageSelector);
}
Enumeration enm = queueBrowser.getEnumeration();
if (enm.hasMoreElements()) {
Object o = enm.nextElement();
if (o instanceof Message) {
Message msg = (Message) o;
long jmsTimestamp = msg.getJMSTimestamp();
return currentTime - jmsTimestamp;
} else {
if (warn) {
log.warn("message was not of type Message, but ["
+ o.getClass().getName() + "]");
}
return -2;
}
} else {
return -1;
}
} finally {
if (queueBrowser != null) {
try {
queueBrowser.close();
} catch (JMSException e) {
log.warn("Exception on closing queueBrowser", e);
}
}
}
}
@Test
public void testBrowseReceive() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = (ActiveMQQueue) this.createDestination(session, ActiveMQDestination.QUEUE_TYPE);
connection.start();
// create consumer
MessageConsumer consumer = session.createConsumer(destination);
// lets consume any outstanding messages from previous test runs
while (consumer.receive(1000) != null) {
}
consumer.close();
Message[] outbound = new Message[]{session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")};
MessageProducer producer = session.createProducer(destination);
producer.send(outbound[0]);
// create browser first
QueueBrowser browser = session.createBrowser(destination);
Enumeration<?> enumeration = browser.getEnumeration();
// browse the first message
assertTrue("should have received the first message", enumeration.hasMoreElements());
assertEquals(outbound[0], enumeration.nextElement());
consumer = session.createConsumer(destination);
// Receive the first message.
assertEquals(outbound[0], consumer.receive(1000));
consumer.close();
browser.close();
producer.close();
}
private static void closeBrowser(QueueBrowser browser) {
if (browser != null) {
try {
browser.close();
} catch (JMSException e) {
// Ignore. See JAMES-2509
}
}
}
@Test
public void testBrowsingLimited() throws Exception {
int messageToSend = 470;
ActiveMQQueue queue = new ActiveMQQueue("TEST");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
String data = "";
for (int i = 0; i < 1024 * 2; i++) {
data += "x";
}
for (int i = 0; i < messageToSend; i++) {
producer.send(session.createTextMessage(data));
}
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
int received = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
received++;
LOG.info("Browsed message " + received + ": " + m.getJMSMessageID());
}
browser.close();
assertEquals(browserLimit, received);
}
@Test
public void testBrowsing() throws JMSException {
int messageToSend = 370;
ActiveMQQueue queue = new ActiveMQQueue("TEST");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
String data = "";
for (int i = 0; i < 1024 * 2; i++) {
data += "x";
}
for (int i = 0; i < messageToSend; i++) {
producer.send(session.createTextMessage(data));
}
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
int received = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
received++;
LOG.info("Browsed message " + received + ": " + m.getJMSMessageID());
}
browser.close();
assertEquals(messageToSend, received);
}
/**
* Tests the queue browser. Browses the messages then the consumer tries to
* receive them. The messages should still be in the queue even when it was
* browsed.
*
* @throws Exception
*/
@Test
public void testReceiveBrowseReceive() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue destination = (ActiveMQQueue) this.createDestination(session, ActiveMQDestination.QUEUE_TYPE);
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
Message[] outbound = new Message[]{session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")};
// lets consume any outstanding messages from previous test runs
while (consumer.receive(1000) != null) {
}
producer.send(outbound[0]);
producer.send(outbound[1]);
producer.send(outbound[2]);
// Get the first.
assertEquals(outbound[0], consumer.receive(1000));
consumer.close();
QueueBrowser browser = session.createBrowser(destination);
Enumeration<?> enumeration = browser.getEnumeration();
// browse the second
assertTrue("should have received the second message", enumeration.hasMoreElements());
assertEquals(outbound[1], enumeration.nextElement());
// browse the third.
assertTrue("Should have received the third message", enumeration.hasMoreElements());
assertEquals(outbound[2], enumeration.nextElement());
// There should be no more.
boolean tooMany = false;
while (enumeration.hasMoreElements()) {
tooMany = true;
}
assertFalse(tooMany);
browser.close();
// Re-open the consumer.
consumer = session.createConsumer(destination);
// Receive the second.
assertEquals(outbound[1], consumer.receive(1000));
// Receive the third.
assertEquals(outbound[2], consumer.receive(1000));
consumer.close();
}
@Test(timeout = 40000)
public void testQueueBrowserInTxSessionLeavesOtherWorkUnaffected() throws Exception {
Connection connection = createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
assertNotNull(session);
javax.jms.Queue queue = session.createQueue(getQueueName());
sendMessages(name.getMethodName(), 5, false);
Queue queueView = getProxyToQueue(getQueueName());
Wait.assertEquals(5, queueView::getMessageCount);
// Send some TX work but don't commit.
MessageProducer txProducer = session.createProducer(queue);
for (int i = 0; i < 5; ++i) {
txProducer.send(session.createMessage());
}
assertEquals(5, queueView.getMessageCount());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
browser.close();
// Now check that all browser work did not affect the session transaction.
Wait.assertEquals(5, queueView::getMessageCount);
session.commit();
Wait.assertEquals(10, queueView::getMessageCount);
}
@Override
public void start()
{
System.out.println("Thread context class loader is: " + Thread.currentThread().getContextClassLoader());
System.out.println("Class class loader used for loading test class is: " + this.getClass().getClassLoader());
int nb = 0;
try
{
// Get the QCF
Object o = NamingManager.getInitialContext(null).lookup("jms/qcf");
System.out.println("Received a " + o.getClass());
// Do as cast & see if no errors
QueueConnectionFactory qcf = (QueueConnectionFactory) o;
// Get the Queue
Object p = NamingManager.getInitialContext(null).lookup("jms/testqueue");
System.out.println("Received a " + p.getClass());
Queue q = (Queue) p;
// Now that we are sure that JNDI works, let's write a message
System.out.println("Opening connection & session to the broker");
Connection connection = qcf.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
System.out.println("Creating producer");
MessageProducer producer = session.createProducer(q);
TextMessage message = session.createTextMessage("HOUBA HOP. SIGNED: MARSUPILAMI");
System.out.println("Sending message");
producer.send(message);
producer.close();
session.commit();
System.out.println("A message was sent to the broker");
// Browse and check the message is there
Connection connection2 = qcf.createConnection();
connection2.start();
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser qb = session2.createBrowser(q);
Enumeration<TextMessage> msgs = qb.getEnumeration();
while (msgs.hasMoreElements())
{
TextMessage msg = msgs.nextElement();
System.out.println("Message received: " + msg.getText());
nb++;
}
System.out.println("Browsing will end here");
qb.close();
System.out.println("End of browsing. Nb of message read: " + nb);
// We are done!
connection.close();
connection2.close();
} catch (Exception e)
{
e.printStackTrace();
}
if (nb == 0)
throw new RuntimeException("test has failed - no messages were received.");
}
@Test(timeout = 40000)
public void testQueueBrowserInTxSessionLeavesOtherWorkUnaffected() throws Exception {
connection = createAmqpConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendToAmqQueue(5);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(5, proxy.getQueueSize());
// Send some TX work but don't commit.
MessageProducer txProducer = session.createProducer(queue);
for (int i = 0; i < 5; ++i) {
txProducer.send(session.createMessage());
}
assertEquals(5, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
browser.close();
// Now check that all browser work did not affect the session transaction.
assertEquals(5, proxy.getQueueSize());
session.commit();
assertEquals(10, proxy.getQueueSize());
}
@Test(timeout=30000)
public void testQueueBrowserHasMoreElementsZeroPrefetchDrainedMessage() throws IOException, Exception {
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the browser to create a consumer.
testPeer.expectQueueBrowserAttach();
// When hasMoreElements is called, the browser should drain with 1 credit,
// at which point we send it a message to use all the credit.
testPeer.expectLinkFlowRespondWithTransfer(
null, null, null, null, amqpValueNullContent, 1, true,
false, equalTo(UnsignedInteger.ONE), 1, true, false);
// Next attempt should not get a message, we just send a response flow indicating
// the credit was cleared, triggering a false on hasMoreElemets.
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.ONE));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertTrue(queueView.hasMoreElements());
Message message = (Message) queueView.nextElement();
assertNotNull(message);
assertFalse(queueView.hasMoreElements());
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}