javax.jms.Message#setIntProperty ( )源码实例Demo

下面列出了javax.jms.Message#setIntProperty ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hazelcast-simulator   文件: CoordinatorClient.java
private void run() throws JMSException {
    Message message = remoteBroker.session.createMessage();

    if (requestId != null) {
        message.setJMSReplyTo(remoteBroker.replyQueue);
        message.setJMSCorrelationID(requestId);
    }

    message.setStringProperty("source", coordinatorAddress().toString());
    message.setStringProperty("target", target.toString());
    message.setStringProperty("payload", OperationCodec.toJson(op));
    message.setIntProperty("operationType", getOperationType(op).toInt());

    switch (target.getAddressLevel()) {
        case AGENT:
            remoteBroker.agentProducer.send(message);
            break;
        case WORKER:
            remoteBroker.workerProducer.send(message);
            break;
        default:
            throw new RuntimeException("unhandled target:" + target);
    }
}
 
public void send() throws JMSException {

         LOG.info("Sending ... ");

         Connection con = cf.createConnection();

         Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);

         MessageProducer prod = sess.createProducer(null);
         Message message = sess.createMessage();
         message.setIntProperty("ID", ++messageRover);
         message.setBooleanProperty("COMMIT", true);
         prod.send(topic, message);

         msgCount++;
         LOG.info("Message Sent.");

         sess.close();
         con.close();
      }
 
源代码3 项目: hazelcast-simulator   文件: Server.java
public void sendCoordinator(SimulatorOperation op) {
    try {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("sending [" + op + "]");
        }

        Destination topic = session.createTopic("coordinator");
        MessageProducer producer = session.createProducer(topic);
        producer.setDeliveryMode(NON_PERSISTENT);

        Message message = session.createMessage();

        message.setStringProperty("source", selfAddressString);
        message.setStringProperty("payload", OperationCodec.toJson(op));
        message.setIntProperty("operationType", getOperationType(op).toInt());

        producer.send(message);
    } catch (JMSException e) {
        LOGGER.error(e);
    }
}
 
源代码4 项目: activemq-artemis   文件: BrowserTest.java
@Test
public void testBrowseWithSelector() throws Exception {
   try {
      conn = getConnectionFactory().createConnection();

      Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

      MessageProducer producer = session.createProducer(queue1);

      final int numMessages = 100;

      for (int i = 0; i < numMessages; i++) {
         Message m = session.createMessage();
         m.setIntProperty("test_counter", i + 1);
         producer.send(m);
      }
   } finally {
      removeAllMessages(queue1.getQueueName(), true);
   }
}
 
/**
 * Sends and consumes the messages.
 *
 * @throws Exception
 */
@Test
public void testSendReceive() throws Exception {
   messages.clear();
   for (int i = 0; i < data.length; i++) {
      Message message = session.createTextMessage(data[i]);
      message.setStringProperty("stringProperty", data[i]);
      message.setIntProperty("intProperty", i);

      producer.send(producerDestination, message);

      messageSent();
   }

   assertMessagesAreReceived();
}
 
源代码6 项目: qpid-broker-j   文件: MessageProvider.java
protected void setCustomProperty(Message message, String propertyName, Object propertyValue) throws JMSException
{
    if (propertyValue instanceof Integer)
    {
        message.setIntProperty(propertyName, ((Integer) propertyValue).intValue());
    }
    else if (propertyValue instanceof Long)
    {
        message.setLongProperty(propertyName, ((Long) propertyValue).longValue());
    }
    else if (propertyValue instanceof Boolean)
    {
        message.setBooleanProperty(propertyName, ((Boolean) propertyValue).booleanValue());
    }
    else if (propertyValue instanceof Byte)
    {
        message.setByteProperty(propertyName, ((Byte) propertyValue).byteValue());
    }
    else if (propertyValue instanceof Double)
    {
        message.setDoubleProperty(propertyName, ((Double) propertyValue).doubleValue());
    }
    else if (propertyValue instanceof Float)
    {
        message.setFloatProperty(propertyName, ((Float) propertyValue).floatValue());
    }
    else if (propertyValue instanceof Short)
    {
        message.setShortProperty(propertyName, ((Short) propertyValue).shortValue());
    }
    else if (propertyValue instanceof String)
    {
        message.setStringProperty(propertyName, (String) propertyValue);
    }
    else
    {
        message.setObjectProperty(propertyName, propertyValue);
    }
}
 
/**
 * Test consuming an expired queue.
 *
 * @throws Exception
 */
public void testConsumeExpiredQueue() throws Exception {

   MessageProducer producer = createProducer(timeToLive);

   consumerDestination = session.createQueue(getConsumerSubject());
   producerDestination = session.createQueue(getProducerSubject());

   MessageConsumer consumer = createConsumer();
   connection.start();

   for (int i = 0; i < data.length; i++) {
      Message message = session.createTextMessage(data[i]);
      message.setStringProperty("stringProperty", data[i]);
      message.setIntProperty("intProperty", i);

      if (verbose) {
         if (LOG.isDebugEnabled()) {
            LOG.debug("About to send a queue message: " + message + " with text: " + data[i]);
         }
      }

      producer.send(producerDestination, message);
   }

   // sleeps a second longer than the expiration time.
   // Basically waits till queue expires.
   Thread.sleep(timeToLive + 1000);

   // message should have expired.
   assertNull(consumer.receive(1000));
}
 
源代码8 项目: qpid-broker-j   文件: SelectorTest.java
@Test
public void runtimeSelectorError() throws Exception
{
    Connection connection = getConnection();
    Queue queue = createQueue(getTestName());
    try
    {
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer(queue , "testproperty % 5 = 1");
        MessageProducer producer = session.createProducer(queue);

        Message message = session.createMessage();
        message.setIntProperty("testproperty", 1); // 1 % 5
        producer.send(message);

        connection.start();

        Message receivedMessage = consumer.receive(getReceiveTimeout());
        assertNotNull("Message matching selector should be received", receivedMessage);

        message.setStringProperty("testproperty", "hello"); // "hello" % 5 would cause a runtime error
        producer.send(message);
        receivedMessage = consumer.receive(getReceiveTimeout());
        assertNull("Message causing runtime selector error should not be received", receivedMessage);

        MessageConsumer consumerWithoutSelector = session.createConsumer(queue);
        receivedMessage = consumerWithoutSelector.receive(getReceiveTimeout());
        assertNotNull("Message that previously caused a runtime error should be consumable by another consumer", receivedMessage);
    }
    finally
    {
        connection.close();
    }
}
 
/**
 * Test consuming an expired topic.
 *
 * @throws Exception
 */
public void testConsumeExpiredTopic() throws Exception {

   MessageProducer producer = createProducer(timeToLive);

   consumerDestination = session.createTopic(getConsumerSubject());
   producerDestination = session.createTopic(getProducerSubject());

   MessageConsumer consumer = createConsumer();
   connection.start();

   for (int i = 0; i < data.length; i++) {
      Message message = session.createTextMessage(data[i]);
      message.setStringProperty("stringProperty", data[i]);
      message.setIntProperty("intProperty", i);

      if (verbose) {
         if (LOG.isDebugEnabled()) {
            LOG.debug("About to send a topic message: " + message + " with text: " + data[i]);
         }
      }

      producer.send(producerDestination, message);
   }

   // sleeps a second longer than the expiration time.
   // Basically waits till topic expires.
   Thread.sleep(timeToLive + 1000);

   // message should have expired.
   assertNull(consumer.receive(1000));
}
 
源代码10 项目: qpid-broker-j   文件: DefaultFiltersTest.java
@Test
public void defaultFilterIsOverridden() throws Exception
{
    String queueName = getTestName();
    Connection connection = getConnection();
    try
    {
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        createQueueWithDefaultFilter(queueName, "foo = 1");
        Queue queue = createQueue(queueName);

        final MessageProducer prod = session.createProducer(queue);
        Message message = session.createMessage();
        message.setIntProperty("foo", 0);
        prod.send(message);

        MessageConsumer cons = session.createConsumer(queue, "foo = 0");

        Message receivedMsg = cons.receive(getReceiveTimeout());
        assertNotNull("Message with foo=0 should be received", receivedMsg);
        assertEquals("Property foo not as expected", 0, receivedMsg.getIntProperty("foo"));

        message = session.createMessage();
        message.setIntProperty("foo", 1);
        prod.send( message);

        assertNull("Message with foo=1 should not be received", cons.receive(getReceiveTimeout()));
    }
    finally
    {
        connection.close();
    }
}
 
源代码11 项目: qpid-broker-j   文件: Utils.java
public static Message createNextMessage(Session session, int msgCount) throws JMSException
{
    Message message = createMessage(session, DEFAULT_MESSAGE_SIZE);
    message.setIntProperty(INDEX, msgCount);

    return message;
}
 
/**
 * Sends and consumes the messages to a topic destination.
 *
 * @throws Exception
 */
public void testConsumeTopic() throws Exception {

   MessageProducer producer = createProducer(0);

   consumerDestination = session.createTopic(getConsumerSubject());
   producerDestination = session.createTopic(getProducerSubject());

   MessageConsumer consumer = createConsumer();
   connection.start();

   for (int i = 0; i < data.length; i++) {
      Message message = session.createTextMessage(data[i]);
      message.setStringProperty("stringProperty", data[i]);
      message.setIntProperty("intProperty", i);

      if (verbose) {
         if (LOG.isDebugEnabled()) {
            LOG.debug("About to send a topic message: " + message + " with text: " + data[i]);
         }
      }

      producer.send(producerDestination, message);
   }

   // should receive a topic since there is no expiration.
   assertNotNull(consumer.receive(1000));
}
 
public void testRemoveMessages() throws Exception {
   final int QUEUE_SIZE = 30000;
   final long TEST_TIMEOUT = 20000;

   // Populate a test queue with uniquely-identifiable messages.
   Connection conn = createConnection();
   try {
      conn.start();
      Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
      MessageProducer producer = session.createProducer(destination);
      for (int i = 0; i < QUEUE_SIZE; i++) {
         Message message = session.createMessage();
         message.setIntProperty("id", i);
         producer.send(message);
      }
      session.commit();
   } finally {
      conn.close();
   }

   // Access the implementation of the test queue and move the last message
   // to another queue. Verify that the move occurred within the limits of
   // the test.
   Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(destination);

   ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
   context.setBroker(broker.getBroker());
   context.getMessageEvaluationContext().setDestination(destination);

   long startTimeMillis = System.currentTimeMillis();
   Assert.assertEquals(1, queue.removeMatchingMessages("id=" + (QUEUE_SIZE - 1)));

   long durationMillis = System.currentTimeMillis() - startTimeMillis;

   LOG.info("It took " + durationMillis + "ms to remove the last message from a queue a " + QUEUE_SIZE + " messages.");

   Assert.assertTrue("Removing the message took too long: " + durationMillis + "ms", durationMillis < TEST_TIMEOUT);
}
 
源代码14 项目: qpid-broker-j   文件: BDBUpgradeTest.java
/**
 * Test that the DurableSubscription without selector was successfully
 * transfered to the new store, and functions as expected with continued use.
 */
@Test
public void testDurableSubscriptionWithoutSelector() throws Exception
{
    TopicConnection connection = getTopicConnection();
    try
    {
        connection.start();

        TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED);

        Topic topic = session.createTopic(TOPIC_NAME);
        TopicPublisher publisher = session.createPublisher(topic);

        int index = ThreadLocalRandom.current().nextInt();
        Message messageA = session.createTextMessage("A");
        messageA.setIntProperty("ID", index);
        messageA.setStringProperty("testprop", "false");
        publisher.publish(messageA);

        Message messageB = session.createTextMessage("B");
        messageB.setIntProperty("ID", index);
        messageB.setStringProperty("testprop", "true");
        publisher.publish(messageB);

        session.commit();

        TopicSubscriber subscriber = session.createDurableSubscriber(topic, SUB_NAME);
        Message migrated = subscriber.receive(getReceiveTimeout());
        assertThat("Failed to receive migrated message", migrated, is(notNullValue()));

        Message receivedA = subscriber.receive(getReceiveTimeout());
        session.commit();
        assertThat("Failed to receive published message A", receivedA, is(notNullValue()));
        assertThat("Message A is not Text message", receivedA, is(instanceOf(TextMessage.class)));
        assertThat("Unexpected text for A", ((TextMessage) receivedA).getText(), is(equalTo("A")));
        assertThat("Unexpected index", receivedA.getIntProperty("ID"), is(equalTo(index)));

        Message receivedB = subscriber.receive(getReceiveTimeout());
        session.commit();
        assertThat("Failed to receive published message B", receivedB, is(notNullValue()));
        assertThat("Message B is not Text message", receivedB, is(instanceOf(TextMessage.class)));
        assertThat("Unexpected text for B", ((TextMessage) receivedB).getText(), is(equalTo("B")));
        assertThat("Unexpected index  for B", receivedB.getIntProperty("ID"), is(equalTo(index)));

        session.commit();
        session.close();
    }
    finally
    {
        connection.close();
    }
}
 
@Test(timeout = 60 * 1000)
// https://issues.apache.org/jira/browse/AMQ-3206
public void testCleanupDeletedSubAfterRestart() throws Exception {
   Connection con = createConnection("cli1");
   Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
   session.createDurableSubscriber(topic, "SubsId", null, true);
   session.close();
   con.close();

   con = createConnection("cli2");
   session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
   session.createDurableSubscriber(topic, "SubsId", null, true);
   session.close();
   con.close();

   con = createConnection();
   session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageProducer producer = session.createProducer(null);

   final int toSend = 500;
   final String payload = Arrays.toString(new byte[40 * 1024]);
   int sent = 0;
   for (int i = sent; i < toSend; i++) {
      Message message = session.createTextMessage(payload);
      message.setStringProperty("filter", "false");
      message.setIntProperty("ID", i);
      producer.send(topic, message);
      sent++;
   }
   con.close();
   LOG.info("sent: " + sent);

   // kill off cli1
   con = createConnection("cli1");
   session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
   session.unsubscribe("SubsId");

   destroyBroker();
   createBroker(false);

   con = createConnection("cli2");
   session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
   final DurableSubscriptionOfflineTestListener listener = new DurableSubscriptionOfflineTestListener();
   consumer.setMessageListener(listener);
   assertTrue("got all sent", Wait.waitFor(new Wait.Condition() {
      @Override
      public boolean isSatisified() throws Exception {
         LOG.info("Want: " + toSend + ", current: " + listener.count);
         return listener.count == toSend;
      }
   }));
   session.close();
   con.close();

   destroyBroker();
   createBroker(false);
   final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
   assertTrue("Should have less than three journal files left but was: " + pa.getStore().getJournal().getFileMap().size(), Wait.waitFor(new Wait.Condition() {

      @Override
      public boolean isSatisified() throws Exception {
         return pa.getStore().getJournal().getFileMap().size() <= 3;
      }
   }));
}
 
源代码16 项目: qpid-jms   文件: ProducerIntegrationTest.java
@Test(timeout = 20000)
public void testRemotelyCloseOneProducerDoesNotCompleteAsyncSendFromAnotherProducer() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);

        testPeer.expectBegin();
        testPeer.expectSenderAttach();
        testPeer.expectSenderAttach();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("myQueue");

        MessageProducer producer = session.createProducer(queue);

        // Create a second producer which allows for a safe wait for credit for the
        // first producer without the need for a sleep.  Otherwise the first producer
        // might not do an actual async send due to not having received credit yet.
        session.createProducer(queue);

        Message message = session.createTextMessage("content");
        message.setIntProperty("test", 1);

        testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());

        // This closes link for the second producer we created, not the one that we
        // will use to send a message.
        testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true);

        assertNull("Should not yet have a JMSDestination", message.getJMSDestination());

        TestJmsCompletionListener listener = new TestJmsCompletionListener();
        try {
            producer.send(message, listener);
        } catch (JMSException e) {
            LOG.warn("Caught unexpected error: {}", e.getMessage());
            fail("No expected exception for this send.");
        }

        testPeer.waitForAllHandlersToComplete(2000);

        assertFalse("Should not get async callback", listener.awaitCompletion(10, TimeUnit.MILLISECONDS));

        // Closing the session should complete the send with an exception
        testPeer.expectEnd();
        session.close();

        assertTrue("Did not get async callback", listener.awaitCompletion(5, TimeUnit.SECONDS));
        assertNotNull(listener.exception);
        assertNotNull(listener.message);
        assertTrue(listener.message instanceof TextMessage);

        // Message should be readable
        assertNotNull("Should have a readable JMSDestination", message.getJMSDestination());
        assertEquals("Message body not as expected", "content", ((TextMessage) message).getText());
        assertEquals("Message property not as expected", 1, message.getIntProperty("test"));

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(2000);
    }
}
 
源代码17 项目: qpid-broker-j   文件: LastValueQueueTest.java
@Test
public void testConflation() throws Exception
{
    final String queueName = getTestName();
    final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, false);
    final Connection producerConnection = getConnection();
    try
    {
        Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = producerSession.createProducer(queue);

        Message message = producerSession.createMessage();

        message.setStringProperty(KEY_PROPERTY, "A");
        message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 1);
        producer.send(message);

        message.setStringProperty(KEY_PROPERTY, "B");
        message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 2);
        producer.send(message);

        message.setStringProperty(KEY_PROPERTY, "A");
        message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 3);
        producer.send(message);

        message.setStringProperty(KEY_PROPERTY, "B");
        message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 4);
        producer.send(message);
    }
    finally
    {
        producerConnection.close();
    }

    Connection consumerConnection = getConnection();
    try
    {
        Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = consumerSession.createConsumer(queue);
        consumerConnection.start();

        Message received1 = consumer.receive(getReceiveTimeout());
        assertNotNull("First message is not received", received1);
        assertEquals("Unexpected key property value", "A", received1.getStringProperty(KEY_PROPERTY));
        assertEquals("Unexpected sequence property value",
                     3,
                     received1.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));

        Message received2 = consumer.receive(getReceiveTimeout());
        assertNotNull("Second message is not received", received2);
        assertEquals("Unexpected key property value", "B", received2.getStringProperty(KEY_PROPERTY));
        assertEquals("Unexpected sequence property value",
                     4,
                     received2.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));

        assertNull("Unexpected message is received", consumer.receive(getReceiveTimeout() / 4));
    }
    finally
    {
        consumerConnection.close();
    }
}
 
源代码18 项目: qpid-jms   文件: TransactionsIntegrationTest.java
@Test(timeout=20000)
public void testProducedMessagesOnTransactedSessionCanBeReused() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        Connection connection = testFixture.establishConnecton(testPeer);
        connection.start();

        testPeer.expectBegin();
        testPeer.expectCoordinatorAttach();

        // First expect an unsettled 'declare' transfer to the txn coordinator, and
        // reply with a Declared disposition state containing the txnId.
        Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
        testPeer.expectDeclare(txnId);

        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        Queue queue = session.createQueue("myQueue");

        // Create a producer to use in provoking creation of the AMQP transaction
        testPeer.expectSenderAttach();
        MessageProducer producer = session.createProducer(queue);

        // Expect the message which was sent under the current transaction. Check it carries
        // TransactionalState with the above txnId but has no outcome. Respond with a
        // TransactionalState with Accepted outcome.

        Message message = session.createMessage();

        for(int i = 0; i < 3; ++i) {
            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
            messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));

            TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
            stateMatcher.withTxnId(equalTo(txnId));
            stateMatcher.withOutcome(nullValue());

            TransactionalState txState = new TransactionalState();
            txState.setTxnId(txnId);
            txState.setOutcome(new Accepted());

            testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);

            message.setIntProperty("sequence", i);

            producer.send(message);
        }

        // Expect rollback on close without a commit call.
        testPeer.expectDischarge(txnId, true);
        testPeer.expectClose();

        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
源代码19 项目: activemq-artemis   文件: TransactedSessionTest.java
@Test
@Ignore
public void _testSendCommitQueueCommitsInOrder() throws Exception {
   Connection conn = null;

   try {
      conn = createConnection();

      Session producerSess = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
      MessageProducer producer = producerSess.createProducer(queue1);
      producer.setDeliveryMode(DeliveryMode.PERSISTENT);

      Session consumerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
      MessageConsumer consumer = consumerSession.createConsumer(queue1);
      CountDownLatch latch = new CountDownLatch(1);
      conn.start();
      myReceiver myReceiver = new myReceiver(latch, conn);
      consumer.setMessageListener(myReceiver);
      long lastBatchTime = System.currentTimeMillis();
      int sentId = 0;
      boolean started = false;
      // Send some messages
      while (true) {
         try {
            Message m = producerSess.createMessage();
            m.setIntProperty("foo", sentId);
            sentId++;
            producer.send(m);

            if (sentId == 1 || System.currentTimeMillis() - lastBatchTime > 50) {
               lastBatchTime = System.currentTimeMillis();
               producerSess.commit();
            }
         } catch (JMSException e) {
            //ignore connection closed by consumer
         }

         // wait for the first message to be received before we continue sending
         if (!started) {
            Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
            started = true;
         } else {
            if (myReceiver.failed) {
               throw myReceiver.e;
            }
         }
      }

   } finally {
      if (conn != null) {
         conn.close();
      }
      removeAllMessages(queue1.getQueueName(), true);
   }

}
 
源代码20 项目: activemq-artemis   文件: BrowserTest.java
@Test
public void testBrowse() throws Exception {
   conn = getConnectionFactory().createConnection();

   Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

   MessageProducer producer = session.createProducer(queue1);

   QueueBrowser browser = session.createBrowser(queue1);

   ProxyAssertSupport.assertEquals(browser.getQueue(), queue1);

   ProxyAssertSupport.assertNull(browser.getMessageSelector());

   Enumeration<Message> en = browser.getEnumeration();

   conn.start();

   Message m = session.createMessage();
   m.setIntProperty("cnt", 0);
   producer.send(m);
   Message m2 = en.nextElement();

   Assert.assertNotNull(m2);

   drainDestination(getConnectionFactory(), queue1);
}