javax.jms.MessageProducer#send ( )源码实例Demo

下面列出了javax.jms.MessageProducer#send ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: activemq-artemis   文件: SelectorTest.java
public static void publish(String dest, Serializable object, String contentType, String tag) throws Exception {
   Connection conn = connectionFactory.createConnection();
   try {
      Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
      Destination destination = createDestination(dest);
      MessageProducer producer = session.createProducer(destination);
      ObjectMessage message = session.createObjectMessage();

      if (contentType != null) {
         message.setStringProperty(HttpHeaderProperty.CONTENT_TYPE, contentType);
      }
      if (tag != null) {
         message.setStringProperty("MyTag", tag);
      }
      message.setObject(object);

      producer.send(message);
   } finally {
      conn.close();
   }
}
 
源代码2 项目: qpid-jms   文件: JmsAnonymousProducerTest.java
@Test(timeout = 60000)
public void testAnonymousSend() throws Exception {
    connection = createAmqpConnection();
    assertNotNull(connection);
    connection.start();

    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queue = session.createQueue(name.getMethodName());
    assertNotNull(session);
    MessageProducer producer = session.createProducer(null);

    Message message = session.createMessage();
    producer.send(queue, message);

    QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
    assertEquals(1, proxy.getQueueSize());
}
 
源代码3 项目: activemq-artemis   文件: SecurityTest.java
/**
 * Login with valid user and password
 * But try send to address not authorised - Persistent
 * Should not allow and should throw exception
 */
@Test
public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception {
   SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
   if (getJmsServer().locateQueue(queueName) == null) {
      getJmsServer().createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
   }
   ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
   Connection connection = connectionFactory.createConnection("guest", "guest");
   Session session = connection.createSession();
   Destination destination = session.createQueue(queueName.toString());
   MessageProducer messageProducer = session.createProducer(destination);
   try {
      messageProducer.send(session.createTextMessage("hello"));
      fail("JMSSecurityException expected as guest is not allowed to send");
   } catch (JMSSecurityException activeMQSecurityException) {
      //pass
   }
   connection.close();
}
 
public void testRemoveMessages() throws Exception {
   final int QUEUE_SIZE = 30000;
   final long TEST_TIMEOUT = 20000;

   // Populate a test queue with uniquely-identifiable messages.
   Connection conn = createConnection();
   try {
      conn.start();
      Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
      MessageProducer producer = session.createProducer(destination);
      for (int i = 0; i < QUEUE_SIZE; i++) {
         Message message = session.createMessage();
         message.setIntProperty("id", i);
         producer.send(message);
      }
      session.commit();
   } finally {
      conn.close();
   }

   // Access the implementation of the test queue and move the last message
   // to another queue. Verify that the move occurred within the limits of
   // the test.
   Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(destination);

   ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
   context.setBroker(broker.getBroker());
   context.getMessageEvaluationContext().setDestination(destination);

   long startTimeMillis = System.currentTimeMillis();
   Assert.assertEquals(1, queue.removeMatchingMessages("id=" + (QUEUE_SIZE - 1)));

   long durationMillis = System.currentTimeMillis() - startTimeMillis;

   LOG.info("It took " + durationMillis + "ms to remove the last message from a queue a " + QUEUE_SIZE + " messages.");

   Assert.assertTrue("Removing the message took too long: " + durationMillis + "ms", durationMillis < TEST_TIMEOUT);
}
 
源代码5 项目: qpid-broker-j   文件: ArrivalTimeFilterTest.java
@Test
public void testConsumerFilterArrivalTime1000() throws Exception
{
    assumeThat("Only legacy client implements this feature", getProtocol(), is(not(equalTo(Protocol.AMQP_1_0))));

    final String queueName = getTestName();
    createQueue(queueName);
    final Connection connection = getConnection();
    try
    {
        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        final Queue queue =
                session.createQueue(String.format(LEGACY_BINDING_URL, queueName, queueName, getReceiveTimeout()));
        connection.start();
        final MessageProducer producer = session.createProducer(queue);
        producer.send(session.createTextMessage("A"));

        Thread.sleep(getReceiveTimeout() / 4);

        final MessageConsumer consumer = session.createConsumer(queue);

        final Message receivedMessage = consumer.receive(getReceiveTimeout());
        assertNotNull("Message A should be received", receivedMessage);
        assertTrue("Unexpected message type", receivedMessage instanceof TextMessage);
        assertEquals("Unexpected message", "A", ((TextMessage) receivedMessage).getText());

        producer.send(session.createTextMessage("B"));

        final Message secondMessage = consumer.receive(getReceiveTimeout());
        assertNotNull("Message B should be received", secondMessage);
        assertTrue("Unexpected message type", secondMessage instanceof TextMessage);
        assertEquals("Unexpected message", "B", ((TextMessage) secondMessage).getText());
    }
    finally
    {
        connection.close();
    }
}
 
源代码6 项目: karaf-decanter   文件: JmsCollectorTest.java
@Test
public void test() throws Exception {
    Connection connection = null;
    Session session = null;
    try {
        connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(session.createQueue("decanter"));
        ActiveMQMapMessage mapMessage = new ActiveMQMapMessage();
        mapMessage.setString("message", "map");
        producer.send(mapMessage);

        Thread.sleep(200L);

        Assert.assertEquals(1, dispatcher.getPostEvents().size());
        Event event = dispatcher.getPostEvents().get(0);
        Assert.assertEquals("map", event.getProperty("message"));
        Assert.assertEquals("jms", event.getProperty("type"));

        ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
        textMessage.setText("{ \"message\" : \"text\" }");
        producer.send(textMessage);

        Thread.sleep(200L);

        Assert.assertEquals(2, dispatcher.getPostEvents().size());
        event = dispatcher.getPostEvents().get(1);
        Assert.assertEquals("text", event.getProperty("message"));
        Assert.assertEquals("jms", event.getProperty("type"));
    } finally {
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}
 
源代码7 项目: c2mon   文件: ClientRequestReportHandler.java
/**
 * Reports are sent to the clients as json messages.
 * @param jsonResponse the message to be sent
 */
private void sendJsonResponse(final String jsonResponse) {

  if (replyDestination == null) {
    log.error("sendJsonResponse() : JMSReplyTo destination is null - cannot send reply.");
    return;
  }

  MessageProducer messageProducer = null;
  try {
    messageProducer = session.createProducer(replyDestination);
    messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    messageProducer.setTimeToLive(defaultReplyTTL);

    Message replyMessage = null;

    // Send response as Json message
    replyMessage = session.createTextMessage(jsonResponse);
    messageProducer.send(replyMessage);

    log.debug("ClientRequestReportHandler() : Report sent.");
  } catch (Throwable e) {
    log.warn("daqTotalParts(): Failed to send Progress report :" + e.getMessage(), e);
  } finally {
    if (messageProducer != null) {
      try {
        messageProducer.close();
      } catch (JMSException ignore) { // IGNORE
      }
    }
  }
}
 
protected void sendMessages(Connection c, String qName) throws JMSException {
   Session s = null;
   s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
   LOGGER.info("Sender: Using AUTO-ACK session");

   Queue q = s.createQueue(qName);
   MessageProducer producer = s.createProducer(null);
   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

   long sent = 0;
   while (running.get()) {
      beforeSend();
      producer.send(q, s.createTextMessage("Message_" + (sent++)));
   }
}
 
源代码9 项目: tomee   文件: ProperConnectionShutdownTest.java
public void sendMessage(String text) throws JMSException {

            Connection connection = null;
            Session session = null;

            try {
                connection = connectionFactory.createConnection();
                connection.start();

                // Create a Session
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                // Create a MessageProducer from the Session to the Topic or Queue
                MessageProducer producer = session.createProducer(chatQueue);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                // Create a message
                TextMessage message = session.createTextMessage(text);

                // Tell the producer to send the message
                producer.send(message);
            } finally {
                // Clean up
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }
 
private void testConnection(final Connection connection) throws JMSException, InterruptedException {
    try {
        final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        final MessageProducer producer = session.createProducer(destination);
        producer.send(session.createTextMessage(TEXT));
        assertTrue(Listener.sync());
    } finally {
        try {
            connection.close();
        } catch (final JMSException e) {
            //no-op
        }
    }
}
 
源代码11 项目: activemq-artemis   文件: JMSMessageProducerTest.java
@Test(timeout = 60000)
public void testSendWithTimeToLiveExpiresToDLQ() throws Exception {
   Connection connection = createConnection();

   try {
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      Queue queue = session.createQueue(getQueueName());

      MessageProducer sender = session.createProducer(queue);
      sender.setTimeToLive(1);

      Message message = session.createMessage();
      sender.send(message);
      connection.start();

      MessageConsumer consumer = session.createConsumer(session.createQueue(getDeadLetterAddress()));
      Message m = consumer.receive(10000);
      assertNotNull(m);
      consumer.close();

      consumer = session.createConsumer(queue);
      m = consumer.receiveNoWait();
      assertNull(m);
      consumer.close();
   } finally {
      if (connection != null) {
         connection.close();
      }
   }
}
 
源代码12 项目: qpid-jms   文件: TransactionsIntegrationTest.java
@Test(timeout=20000)
public void testProducedMessagesAfterCommitOfSentMessagesFails() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        Connection connection = testFixture.establishConnecton(testPeer);
        connection.start();

        testPeer.expectBegin();
        testPeer.expectCoordinatorAttach();

        // First expect an unsettled 'declare' transfer to the txn coordinator, and
        // reply with a Declared disposition state containing the txnId.
        Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
        testPeer.expectDeclare(txnId1);

        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        Queue queue = session.createQueue("myQueue");

        // Create a producer to use in provoking creation of the AMQP transaction
        testPeer.expectSenderAttach();
        MessageProducer producer = session.createProducer(queue);

        // Expect the message which was sent under the current transaction. Check it carries
        // TransactionalState with the above txnId but has no outcome. Respond with a
        // TransactionalState with Accepted outcome.
        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
        messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));

        TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
        stateMatcher.withTxnId(equalTo(txnId1));
        stateMatcher.withOutcome(nullValue());

        TransactionalState txState = new TransactionalState();
        txState.setTxnId(txnId1);
        txState.setOutcome(new Accepted());

        testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);

        producer.send(session.createMessage());

        // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
        // and reply with rejected and settled disposition to indicate the commit failed
        Rejected commitFailure = new Rejected(new Error(Symbol.valueOf("failed"), "Unknown error"));
        testPeer.expectDischarge(txnId1, false, commitFailure);

        // Then expect an unsettled 'declare' transfer to the txn coordinator, and
        // reply with a declared disposition state containing the txnId.
        Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
        testPeer.expectDeclare(txnId2);

        try {
            session.commit();
            fail("Commit operation should have failed.");
        } catch (TransactionRolledBackException jmsTxRb) {
        }

        // Expect the message which was sent under the current transaction. Check it carries
        // TransactionalState with the above txnId but has no outcome. Respond with a
        // TransactionalState with Accepted outcome.
        stateMatcher = new TransactionalStateMatcher();
        stateMatcher.withTxnId(equalTo(txnId2));
        stateMatcher.withOutcome(nullValue());

        txState = new TransactionalState();
        txState.setTxnId(txnId2);
        txState.setOutcome(new Accepted());

        testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
        testPeer.expectDischarge(txnId2, true);

        producer.send(session.createMessage());

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
@Test
public void testAutoVirtualTopicWildcardStarFQQN() throws Exception {
   Connection connection = null;

   SimpleString topicA = new SimpleString("VirtualTopic.Orders.A");
   SimpleString topicB = new SimpleString("VirtualTopic.Orders.B");
   SimpleString topic = new SimpleString("VirtualTopic.Orders.*");

   this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
   this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);

   try {
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
      activeMQConnectionFactory.setWatchTopicAdvisories(false);
      connection = activeMQConnectionFactory.createConnection();
      connection.start();

      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      Destination destination = session.createTopic(topicA.toString() + "," + topicB.toString());

      MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("Consumer.A." + topic.toString()));

      MessageProducer producer = session.createProducer(destination);
      TextMessage message = session.createTextMessage("This is a text message");
      producer.send(message);

      TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
      TextMessage messageReceivedB = (TextMessage) messageConsumerA.receive(2000);

      assertTrue((messageReceivedA != null && messageReceivedB != null));
      String text = messageReceivedA.getText();
      assertEquals("This is a text message", text);

      messageConsumerA.close();

   } finally {
      if (connection != null) {
         connection.close();
      }
   }
}
 
源代码14 项目: qpid-broker-j   文件: DurableSubscribtionTest.java
/**
 * Tests that messages are delivered normally to a subscriber on a separate connection despite
 * the use of durable subscriber with no-local on the first connection.
 */
@Test
public void testNoLocalSubscriberAndSubscriberOnSeparateConnection() throws Exception
{
    String noLocalSubscriptionName = getTestName() + "_no_local_sub";
    String subscriobtionName = getTestName() + "_sub";
    Topic topic = createTopic(getTestName());
    final String clientId = "clientId";

    Connection noLocalConnection = getConnectionBuilder().setClientId(clientId).build();
    try
    {
        Connection connection = getConnection();
        try
        {
            Session noLocalSession = noLocalConnection.createSession(true, Session.SESSION_TRANSACTED);
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);

            MessageProducer noLocalSessionProducer = noLocalSession.createProducer(topic);
            MessageProducer sessionProducer = session.createProducer(topic);

            try
            {
                TopicSubscriber noLocalSubscriber =
                        noLocalSession.createDurableSubscriber(topic, noLocalSubscriptionName, null, true);
                TopicSubscriber subscriber = session.createDurableSubscriber(topic, subscriobtionName, null, false);
                noLocalConnection.start();
                connection.start();

                noLocalSessionProducer.send(noLocalSession.createTextMessage("Message1"));
                noLocalSession.commit();
                sessionProducer.send(session.createTextMessage("Message2"));
                sessionProducer.send(session.createTextMessage("Message3"));
                session.commit();

                Message durableSubscriberMessage = noLocalSubscriber.receive(getReceiveTimeout());
                assertTrue(durableSubscriberMessage instanceof TextMessage);
                assertEquals("Unexpected local message received",
                             "Message2",
                             ((TextMessage) durableSubscriberMessage).getText());
                noLocalSession.commit();

                Message nonDurableSubscriberMessage = subscriber.receive(getReceiveTimeout());
                assertTrue(nonDurableSubscriberMessage instanceof TextMessage);
                assertEquals("Unexpected message received",
                             "Message1",
                             ((TextMessage) nonDurableSubscriberMessage).getText());

                session.commit();
                noLocalSubscriber.close();
                subscriber.close();
            }
            finally
            {
                noLocalSession.unsubscribe(noLocalSubscriptionName);
                session.unsubscribe(subscriobtionName);
            }
        }
        finally
        {
            connection.close();
        }
    }
    finally
    {
        noLocalConnection.close();
    }
}
 
源代码15 项目: qpid-jms   文件: ProducerIntegrationTest.java
@Test(timeout = 20000)
public void testAsyncCompletionGetsTimedOutErrorWhenNoDispostionArrives() throws Exception {
    try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
        JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
        connection.setSendTimeout(500);

        testPeer.expectBegin();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        String queueName = "myQueue";
        Queue queue = session.createQueue(queueName);

        Message message = session.createTextMessage("text");
        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();

        // Expect the producer to attach and grant it some credit, it should send
        // a transfer which we will not send any response for which should cause the
        // send operation to time out.
        testPeer.expectSenderAttach();
        testPeer.expectTransferButDoNotRespond(messageMatcher);
        testPeer.expectClose();

        MessageProducer producer = session.createProducer(queue);
        TestJmsCompletionListener listener = new TestJmsCompletionListener();

        try {
            producer.send(message, listener);
        } catch (Throwable error) {
            LOG.info("Caught unexpected error: {}", error.getMessage());
            fail("Send should not fail for async.");
        }

        assertTrue("Did not get async callback", listener.awaitCompletion(5, TimeUnit.SECONDS));
        assertNotNull(listener.exception);
        assertTrue(listener.exception instanceof JmsSendTimedOutException);
        assertNotNull(listener.message);

        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
源代码16 项目: qpid-jms   文件: TransactionsIntegrationTest.java
@Test(timeout=20000)
public void testProducedMessagesOnTransactedSessionCanBeReused() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        Connection connection = testFixture.establishConnecton(testPeer);
        connection.start();

        testPeer.expectBegin();
        testPeer.expectCoordinatorAttach();

        // First expect an unsettled 'declare' transfer to the txn coordinator, and
        // reply with a Declared disposition state containing the txnId.
        Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
        testPeer.expectDeclare(txnId);

        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        Queue queue = session.createQueue("myQueue");

        // Create a producer to use in provoking creation of the AMQP transaction
        testPeer.expectSenderAttach();
        MessageProducer producer = session.createProducer(queue);

        // Expect the message which was sent under the current transaction. Check it carries
        // TransactionalState with the above txnId but has no outcome. Respond with a
        // TransactionalState with Accepted outcome.

        Message message = session.createMessage();

        for(int i = 0; i < 3; ++i) {
            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
            messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
            messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));

            TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
            stateMatcher.withTxnId(equalTo(txnId));
            stateMatcher.withOutcome(nullValue());

            TransactionalState txState = new TransactionalState();
            txState.setTxnId(txnId);
            txState.setOutcome(new Accepted());

            testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);

            message.setIntProperty("sequence", i);

            producer.send(message);
        }

        // Expect rollback on close without a commit call.
        testPeer.expectDischarge(txnId, true);
        testPeer.expectClose();

        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
@Parameters({ "broker-port", "admin-username", "admin-password", "broker-hostname" })
@Test
public void testConsumerWithBasicReject(String port,
                                        String adminUsername,
                                        String adminPassword,
                                        String brokerHostname) throws Exception {
    System.setProperty("AndesAckWaitTimeOut", "5000");
    String queueName = "testConsumerWithBasicReject";
    InitialContext initialContextForQueue = ClientHelper
            .getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
            .withQueue(queueName)
            .build();

    ConnectionFactory connectionFactory
            = (ConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
    Connection connection = connectionFactory.createConnection();
    connection.start();

    // publish message
    Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Queue queue = producerSession.createQueue(queueName);
    MessageProducer producer = producerSession.createProducer(queue);

    producer.send(producerSession.createTextMessage("Test message for reject test"));
    producerSession.close();

    // Consume published messages
    Session subscriberSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    Destination subscriberDestination = (Destination) initialContextForQueue.lookup(queueName);
    MessageConsumer consumer = subscriberSession.createConsumer(subscriberDestination);

    Message message = consumer.receive(5000);
    Assert.assertNotNull(message, "Message was not received");

    message = consumer.receive(10000);
    Assert.assertNotNull(message, "Requeued Message was not received");
    Assert.assertTrue(message.getJMSRedelivered(), "Redelivered flag was not set");
    message.acknowledge();

    connection.close();
}
 
源代码18 项目: activemq-artemis   文件: MessageProducerTest.java
@Test
public void testProducerCloseInCompletionListener() throws Exception {
   Connection pconn = createConnection();

   Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageProducer p = ps.createProducer(ActiveMQServerTestCase.topic1);

   CountDownLatch latch = new CountDownLatch(1);
   CloseCompletionListener listener = new CloseCompletionListener(p, latch);

   p.send(ps.createMessage(), DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, 0L, listener);

   ProxyAssertSupport.assertTrue(latch.await(5, TimeUnit.SECONDS));

   ProxyAssertSupport.assertNotNull(listener.exception);

   ProxyAssertSupport.assertTrue(listener.exception instanceof javax.jms.IllegalStateException);
}
 
源代码19 项目: qpid-broker-j   文件: DurableSubscribtionTest.java
@Test
public void testResubscribeWithChangedNoLocal() throws Exception
{
    assumeThat("QPID-8068", getProtocol(), is(equalTo(Protocol.AMQP_1_0)));
    String subscriptionName = getTestName() + "_sub";
    Topic topic = createTopic(getTestName());
    String clientId = "testClientId";
    Connection connection = getConnectionBuilder().setClientId(clientId).build();
    try
    {
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        TopicSubscriber durableSubscriber =
                session.createDurableSubscriber(topic, subscriptionName, null, false);

        MessageProducer producer = session.createProducer(topic);
        producer.send(session.createTextMessage("A"));
        producer.send(session.createTextMessage("B"));
        session.commit();

        connection.start();

        Message receivedMessage = durableSubscriber.receive(getReceiveTimeout());
        assertTrue("TextMessage should be received", receivedMessage instanceof TextMessage);
        assertEquals("Unexpected message received", "A", ((TextMessage)receivedMessage).getText());

        session.commit();
    }
    finally
    {
        connection.close();
    }

    connection = getConnectionBuilder().setClientId(clientId).build();
    try
    {
        connection.start();

        Session session2 = connection.createSession(true, Session.SESSION_TRANSACTED);
        TopicSubscriber noLocalSubscriber2 = session2.createDurableSubscriber(topic, subscriptionName, null, true);

        Connection secondConnection = getConnectionBuilder().setClientId("secondConnection").build();
        try
        {
            Session secondSession = secondConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer secondProducer = secondSession.createProducer(topic);
            secondProducer.send(secondSession.createTextMessage("C"));
        }
        finally
        {
            secondConnection.close();
        }

        Message noLocalSubscriberMessage = noLocalSubscriber2.receive(getReceiveTimeout());
        assertTrue("TextMessage should be received", noLocalSubscriberMessage instanceof TextMessage);
        assertEquals("Unexpected message received", "C", ((TextMessage)noLocalSubscriberMessage).getText());
    }
    finally
    {
        connection.close();
    }
}
 
源代码20 项目: activemq-artemis   文件: ReceiveNoWaitTest.java
@Test
public void testReceiveNoWait() throws Exception {
   assertNotNull(queue);

   for (int i = 0; i < 1000; i++) {
      Connection connection = cf.createConnection();

      Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

      MessageProducer producer = session.createProducer(queue);

      producer.setDeliveryMode(DeliveryMode.PERSISTENT);

      for (int j = 0; j < 100; j++) {
         String text = "Message" + j;

         TextMessage message = session.createTextMessage();

         message.setText(text);

         producer.send(message);
      }

      connection.start();

      MessageConsumer consumer = session.createConsumer(queue);

      for (int j = 0; j < 100; j++) {
         TextMessage m = (TextMessage) consumer.receiveNoWait();

         if (m == null) {
            throw new IllegalStateException("msg null");
         }

         assertEquals("Message" + j, m.getText());

         m.acknowledge();
      }

      connection.close();
   }
}