类javax.jms.TopicSubscriber源码实例Demo

下面列出了怎么用javax.jms.TopicSubscriber的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: activemq-artemis   文件: MessageConsumerTest.java
@Test
public void testGetTopic() throws Exception {
   Connection consumerConnection = null;

   try {
      consumerConnection = createConnection();

      Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      MessageConsumer topicConsumer = consumerSession.createConsumer(ActiveMQServerTestCase.topic1);

      Topic t = ((TopicSubscriber) topicConsumer).getTopic();

      ProxyAssertSupport.assertEquals(ActiveMQServerTestCase.topic1, t);
   } finally {
      if (consumerConnection != null) {
         consumerConnection.close();
      }
   }
}
 
public void testRemoveAfterRestart() throws Exception {
   Connection connection = createConnection();
   connection.setClientID("cliID");
   connection.start();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   TopicSubscriber subscriber = session.createDurableSubscriber((Topic) createDestination(), "subName");
   subscriber.close();
   connection.close();

   LOG.info("Broker restarting, wait for inactive cleanup afterwards.");

   restartBroker();

   LOG.info("Broker restarted, wait for inactive cleanup now.");

   assertTrue(broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1);

   assertTrue(Wait.waitFor(new Wait.Condition() {
      @Override
      public boolean isSatisified() throws Exception {
         return broker.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
      }
   }, 20000));
}
 
@Test
public void testUnsubscribeWithActiveConsumer() throws Exception {
   Connection conn = createConnection();
   conn.setClientID("zeke");

   Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

   TopicSubscriber dursub = s.createDurableSubscriber(ActiveMQServerTestCase.topic1, "dursub0");

   try {
      s.unsubscribe("dursub0");
      ProxyAssertSupport.fail();
   } catch (IllegalStateException e) {
      // Ok - it is illegal to ubscribe a subscription if it has active consumers
   }

   dursub.close();

   s.unsubscribe("dursub0");
}
 
/**
 * Running this test you can produce a leak of only 2 ConsumerInfo on BE
 * broker, NOT 200 as in other cases!
 */
public void testWithoutSessionAndSubsciberClosePlayAround() throws Exception {

   TopicConnection connection = connectionFactory.createTopicConnection();
   connection.start();

   for (int i = 0; i < 100; i++) {
      TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
      TopicSubscriber subscriber = subscriberSession.createSubscriber(topic);
      DummyMessageListener listener = new DummyMessageListener();
      subscriber.setMessageListener(listener);
      if (i != 50) {
         subscriber.close();
         subscriberSession.close();
      }
   }

   connection.close();
   Thread.sleep(1000);
   Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic);
   assertNotNull(dest);
   assertTrue(dest.getConsumers().isEmpty());
}
 
public void testWithSessionCloseOutsideTheLoop() throws Exception {

      TopicConnection connection = connectionFactory.createTopicConnection();
      connection.start();
      TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
      for (int i = 0; i < 100; i++) {

         TopicSubscriber subscriber = subscriberSession.createSubscriber(topic);
         DummyMessageListener listener = new DummyMessageListener();
         subscriber.setMessageListener(listener);
         subscriber.close();
      }
      subscriberSession.close();
      connection.close();
      Thread.sleep(1000);
      Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic);
      assertNotNull(dest);
      assertTrue(dest.getConsumers().isEmpty());

   }
 
源代码6 项目: activemq-artemis   文件: AcknowledgementTest.java
/**
 * Topics shouldn't hold on to messages when the non-durable subscribers close
 */
@Test
public void testPersistentMessagesForTopicDropped2() throws Exception {
   TopicConnection topicConn = createTopicConnection();
   topicConn.start();
   TopicSession sess = topicConn.createTopicSession(true, 0);
   TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1);
   TopicSubscriber sub = sess.createSubscriber(ActiveMQServerTestCase.topic1);
   pub.setDeliveryMode(DeliveryMode.PERSISTENT);

   Message m = sess.createTextMessage("testing123");
   pub.publish(m);
   sess.commit();

   // receive but rollback
   TextMessage m2 = (TextMessage) sub.receive(3000);

   ProxyAssertSupport.assertNotNull(m2);
   ProxyAssertSupport.assertEquals("testing123", m2.getText());

   sess.rollback();

   topicConn.close();

   checkEmpty(ActiveMQServerTestCase.topic1);
}
 
源代码7 项目: activemq-artemis   文件: ActiveMQSession.java
@Override
public TopicSubscriber createDurableSubscriber(final Topic topic,
                                               final String name,
                                               String messageSelector,
                                               final boolean noLocal) throws JMSException {
   // As per spec. section 4.11
   if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) {
      throw new IllegalStateException("Cannot create a durable subscriber on a QueueSession");
   }
   checkTopic(topic);
   if (!(topic instanceof ActiveMQDestination)) {
      throw new InvalidDestinationException("Not an ActiveMQTopic:" + topic);
   }
   if ("".equals(messageSelector)) {
      messageSelector = null;
   }

   ActiveMQDestination jbdest = (ActiveMQDestination) topic;

   if (jbdest.isQueue()) {
      throw new InvalidDestinationException("Cannot create a subscriber on a queue");
   }

   return createConsumer(jbdest, name, messageSelector, noLocal, ConsumerDurability.DURABLE);
}
 
源代码8 项目: activemq-artemis   文件: MessageSerializerTest.java
@Test
public void testAnycastQueueToMulticastTopicBothAddress() throws Exception {
   String address = "testBoth";
   String clientId = "test-client-id";

   File file = createMessageFile();

   connection.setClientID(clientId);
   createBothTypeAddress(address);
   createQueue(RoutingType.ANYCAST, address, address);
   Session session = createSession(connection);

   TopicSubscriber subscriber = session.createDurableSubscriber(session.createTopic(address), "test-subscriber");

   List<Message> messages = generateTextMessages(session, getDestination(address));

   exportMessages(address, file);

   importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + address, file);
   for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
      TextMessage messageReceived = (TextMessage) subscriber.receive(1000);
      assertNotNull(messageReceived);
      assertEquals(((TextMessage) messages.get(i)).getText(), messageReceived.getText());
   }
}
 
protected void consumeDurableTestMessages(Connection connection, String sub, int size, String topicName,
      AtomicLong publishedMessageSize) throws Exception {


   Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
   Topic topic = session.createTopic(topicName);

   try {
      TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
      for (int i = 0; i < size; i++) {
         ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
         if (publishedMessageSize != null) {
            publishedMessageSize.addAndGet(-message.getCoreMessage().getEncodeSize());
         }
      }

   } finally {
      session.close();
   }

}
 
源代码10 项目: WeEvent   文件: WeEventTopicSession.java
@Override
public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
    if (topic instanceof WeEventTopic) {
        WeEventTopicSubscriber subscriber = new WeEventTopicSubscriber((WeEventTopic) topic);
        this.topicConnection.createSubscriber(subscriber);
        return subscriber;
    }
    throw new JMSException(WeEventConnectionFactory.NotSupportTips);
}
 
源代码11 项目: activemq-artemis   文件: SimpleOpenWireTest.java
@Test
public void testTempTopicDelete() throws Exception {
   connection.start();
   TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

   TemporaryTopic tempTopic = topicSession.createTemporaryTopic();

   ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection();

   try {
      TopicSession newTopicSession = newConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
      TopicPublisher publisher = newTopicSession.createPublisher(tempTopic);

      // need to wait here because the ActiveMQ client's temp destination map is updated asynchronously, not waiting can introduce a race
      assertTrue(Wait.waitFor(() -> newConn.activeTempDestinations.size() == 1, 2000, 100));

      TextMessage msg = newTopicSession.createTextMessage("Test Message");

      publisher.publish(msg);

      try {
         TopicSubscriber consumer = newTopicSession.createSubscriber(tempTopic);
         fail("should have gotten exception but got consumer: " + consumer);
      } catch (JMSException ex) {
         //correct
      }

      connection.close();

      try {
         Message newMsg = newTopicSession.createMessage();
         publisher.publish(newMsg);
      } catch (JMSException e) {
         //ok
      }

   } finally {
      newConn.close();
   }
}
 
protected TopicSubscriber createDurableSubscriber(Connection conn, Destination dest, String name) throws Exception {
   conn.setClientID(name);
   connections.add(conn);
   conn.start();

   Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
   final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic) dest, name);

   return consumer;
}
 
源代码13 项目: activemq-artemis   文件: ActiveMQRASession.java
/**
 * Create a topic subscriber
 *
 * @param topic           The topic
 * @param messageSelector The message selector
 * @param noLocal         If true inhibits the delivery of messages published by its own connection
 * @return The subscriber
 * @throws JMSException Thrown if an error occurs
 */
@Override
public TopicSubscriber createSubscriber(final Topic topic,
                                        final String messageSelector,
                                        final boolean noLocal) throws JMSException {
   lock();
   try {
      TopicSession session = getTopicSessionInternal();

      if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
         ActiveMQRALogger.LOGGER.trace("createSubscriber " + session +
                                          " topic=" +
                                          topic +
                                          " selector=" +
                                          messageSelector +
                                          " noLocal=" +
                                          noLocal);
      }

      TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
      result = new ActiveMQRATopicSubscriber(result, this);

      if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
         ActiveMQRALogger.LOGGER.trace("createdSubscriber " + session + " ActiveMQTopicSubscriber=" + result);
      }

      addConsumer(result);

      return result;
   } finally {
      unlock();
   }
}
 
源代码14 项目: pooled-jms   文件: JmsPoolTopicSubscriberTest.java
@Test
public void testToString() throws JMSException {
    JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
    TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTemporaryTopic();
    TopicSubscriber subscriber = session.createSubscriber(topic);

    assertNotNull(subscriber.toString());
}
 
源代码15 项目: qpid-jms   文件: SessionIntegrationTest.java
@Test(timeout = 20000)
public void testDurableSubscriptionUnsubscribeInUseThrowsJMSEx() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        Connection connection = testFixture.establishConnecton(testPeer);
        connection.start();

        testPeer.expectBegin();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        String topicName = "myTopic";
        Topic dest = session.createTopic(topicName);
        String subscriptionName = "mySubscription";

        testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
        testPeer.expectLinkFlow();

        TopicSubscriber subscriber = session.createDurableSubscriber(dest, subscriptionName);
        assertNotNull("TopicSubscriber object was null", subscriber);

        try {
            session.unsubscribe(subscriptionName);
            fail("Should have thrown a JMSException");
        } catch (JMSException ex) {
        }

        testPeer.expectDetach(false, true, false);

        subscriber.close();

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
/**
 * Get the no local value
 *
 * @return The value
 * @throws JMSException Thrown if an error occurs
 */
@Override
public boolean getNoLocal() throws JMSException {
   if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
      ActiveMQRALogger.LOGGER.trace("getNoLocal()");
   }

   checkState();
   return ((TopicSubscriber) consumer).getNoLocal();
}
 
@Parameters({ "broker-port"})
@Test
public void testSubscriberPublisher(String port) throws Exception {
    String topicName = "MyTopic1";
    int numberOfMessages = 100;

    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder("admin", "admin", "localhost", port)
            .withTopic(topicName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    // Initialize subscriber
    TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
    TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);

    // publish 100 messages
    TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    TopicPublisher producer = producerSession.createPublisher(subscriberDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        producer.publish(producerSession.createTextMessage("Test message " + i));
    }

    producerSession.close();

    for (int i = 0; i < numberOfMessages; i++) {
        Message message = subscriber.receive(1000);
        Assert.assertNotNull(message, "Message #" + i + " was not received");
    }

    connection.close();
}
 
private void assertNotNullWithPublishSubscribeForTopics(String publishTopicName,
                                                        String subscribeTopicName) throws Exception {

    int numberOfMessages = 100;

    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder("admin", "admin", "localhost", port)
            .withTopic(publishTopicName)
            .withTopic(subscribeTopicName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic subscriberDestination = (Topic) initialContext.lookup(subscribeTopicName);
    TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);

    TopicSession publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic publisherDestination = (Topic) initialContext.lookup(publishTopicName);
    TopicPublisher publisher = publisherSession.createPublisher(publisherDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        publisher.publish(publisherSession.createTextMessage("Test message " + i));
    }

    publisherSession.close();

    for (int i = 0; i < numberOfMessages; i++) {
        Message message = subscriber.receive(1000);
        Assert.assertNotNull(message, "Message #" + i + " was not received");
    }

    subscriberSession.close();
    connection.close();
}
 
private void assertNullWithPublishSubscribeForTopics(String publishTopicName,
                                                     String subscribeTopicName) throws Exception {

    int numberOfMessages = 100;

    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder("admin", "admin", "localhost", port)
            .withTopic(publishTopicName)
            .withTopic(subscribeTopicName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic subscriberDestination = (Topic) initialContext.lookup(subscribeTopicName);
    TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);

    TopicSession publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic publisherDestination = (Topic) initialContext.lookup(publishTopicName);
    TopicPublisher publisher = publisherSession.createPublisher(publisherDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        publisher.publish(publisherSession.createTextMessage("Test message " + i));
    }

    publisherSession.close();

    Message message = subscriber.receive(1000);
    Assert.assertNull(message, "A message was received where no message was expected");

    subscriberSession.close();
    connection.close();
}
 
/**
 * Get the topic
 *
 * @return The topic
 * @throws JMSException Thrown if an error occurs
 */
@Override
public Topic getTopic() throws JMSException {
   if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
      ActiveMQRALogger.LOGGER.trace("getTopic()");
   }

   checkState();
   return ((TopicSubscriber) consumer).getTopic();
}
 
源代码21 项目: qpid-broker-j   文件: BDBUpgradeTest.java
private void consumeDurableSubscriptionMessages(Connection connection, boolean selector) throws Exception
{
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic topic = null;
    TopicSubscriber durSub = null;

    if (selector)
    {
        topic = session.createTopic(SELECTOR_TOPIC_NAME);
        durSub = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME, "testprop='true'", false);
    }
    else
    {
        topic = session.createTopic(TOPIC_NAME);
        durSub = session.createDurableSubscriber(topic, SUB_NAME);
    }

    // Retrieve the matching message
    Message m = durSub.receive(getReceiveTimeout());
    assertThat("Failed to receive an expected message", m, is(notNullValue()));
    if (selector)
    {
        assertThat("Selector property did not match", m.getStringProperty("testprop"), is(equalTo("true")));
    }
    assertThat("ID property did not match", m.getIntProperty("ID"), is(equalTo(1)));
    assertThat("Message content was not as expected",
               ((TextMessage) m).getText(),
               is(equalTo(generateString(1024))));

    // Verify that no more messages are received
    m = durSub.receive(getReceiveTimeout());
    assertThat("No more messages should have been recieved", m, is(nullValue()));

    durSub.close();
    session.close();
}
 
源代码22 项目: qpid-broker-j   文件: TimeToLiveTest.java
@Test
public void testPassiveTTLWithDurableSubscription() throws Exception
{
    long timeToLiveMillis = getReceiveTimeout() * 2;
    String subscriptionName = getTestName() + "_sub";
    Topic topic = createTopic(getTestName());
    TopicConnection connection = getTopicConnection();
    try
    {
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName);
        MessageProducer producer = session.createProducer(topic);
        producer.setTimeToLive(timeToLiveMillis);
        producer.send(session.createTextMessage("A"));
        producer.setTimeToLive(0);
        producer.send(session.createTextMessage("B"));
        session.commit();

        connection.start();
        Message message = durableSubscriber.receive(getReceiveTimeout());

        assertTrue("TextMessage should be received", message instanceof TextMessage);
        assertEquals("Unexpected message received", "A", ((TextMessage)message).getText());

        Thread.sleep(timeToLiveMillis);

        session.rollback();
        message = durableSubscriber.receive(getReceiveTimeout());

        assertTrue("TextMessage should be received after waiting for TTL", message instanceof TextMessage);
        assertEquals("Unexpected message received after waiting for TTL", "B", ((TextMessage) message).getText());
    }
    finally
    {
        connection.close();
    }
}
 
源代码23 项目: qpid-broker-j   文件: TimeToLiveTest.java
@Test
public void testActiveTTLWithDurableSubscription() throws Exception
{
    long timeToLiveMillis = getReceiveTimeout();
    String subscriptionName = getTestName() + "_sub";
    Topic topic = createTopic(getTestName());
    TopicConnection connection = getTopicConnection();
    try
    {
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName);
        MessageProducer producer = session.createProducer(topic);
        producer.setTimeToLive(timeToLiveMillis);
        producer.send(session.createTextMessage("A"));
        producer.setTimeToLive(0);
        producer.send(session.createTextMessage("B"));
        session.commit();

        Thread.sleep(timeToLiveMillis);

        connection.start();
        Message message = durableSubscriber.receive(getReceiveTimeout());

        assertTrue("TextMessage should be received", message instanceof TextMessage);
        assertEquals("Unexpected message received", "B", ((TextMessage)message).getText());
    }
    finally
    {
        connection.close();
    }
}
 
源代码24 项目: qpid-broker-j   文件: TopicSubscriberTest.java
@Test
public void messageDeliveredToAllSubscribers() throws Exception
{
    Topic topic = createTopic(getTestName());
    final TopicConnection connection = getTopicConnection();
    try
    {
        final TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        final TopicPublisher producer = session.createPublisher(topic);
        final TopicSubscriber subscriber1 = session.createSubscriber(topic);
        assertEquals("Unexpected subscriber1 topic", topic.getTopicName(), subscriber1.getTopic().getTopicName());
        final TopicSubscriber subscriber2 = session.createSubscriber(topic);
        assertEquals("Unexpected subscriber2 topic", topic.getTopicName(), subscriber2.getTopic().getTopicName());

        connection.start();
        String messageText = "Test Message";
        producer.send(session.createTextMessage(messageText));

        final Message subscriber1Message = subscriber1.receive(getReceiveTimeout());
        final Message subscriber2Message = subscriber2.receive(getReceiveTimeout());

        assertTrue("TextMessage should be received  by subscriber1", subscriber1Message instanceof TextMessage);
        assertEquals(messageText, ((TextMessage) subscriber1Message).getText());
        assertTrue("TextMessage should be received  by subscriber2", subscriber2Message instanceof TextMessage);
        assertEquals(messageText, ((TextMessage) subscriber2Message).getText());
    }
    finally
    {
        connection.close();
    }
}
 
源代码25 项目: activemq-artemis   文件: NetworkBrokerDetachTest.java
private ActiveMQTopic registerDurableConsumer(BrokerService brokerService,
                                              MessageListener listener) throws Exception {
   ConnectionFactory factory = createConnectionFactory(brokerService);
   Connection connection = factory.createConnection();
   connection.setClientID("DurableOne");
   connection.start();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   ActiveMQTopic destination = (ActiveMQTopic) session.createTopic(DESTINATION_NAME);
   // unique to a broker
   TopicSubscriber sub = session.createDurableSubscriber(destination, "SubOne" + brokerService.getBrokerName());
   sub.setMessageListener(listener);
   return destination;
}
 
private void openConsumer() throws Exception {
   consumerConnection = (ActiveMQConnection) createConnection();
   consumerConnection.setClientID("cliID");
   consumerConnection.start();
   Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subName", "filter=true", false);

   subscriber.setMessageListener(new MessageListener() {
      @Override
      public void onMessage(Message message) {
         received++;
      }
   });
}
 
/**
 * Create a new wrapper
 *
 * @param consumer the topic subscriber
 * @param session  the session
 */
public ActiveMQRATopicSubscriber(final TopicSubscriber consumer, final ActiveMQRASession session) {
   super(consumer, session);

   if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
      ActiveMQRALogger.LOGGER.trace("constructor(" + consumer + ", " + session + ")");
   }
}
 
/**
 * Creates a durable topic subscription, checks that it is propagated
 * in the broker network, removes the subscription and checks that
 * the subscription is removed from remote broker as well.
 *
 * @throws Exception
 */
public void testDurableSubNetwork() throws Exception {
   LOG.info("testDurableSubNetwork started.");

   // create durable sub
   ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory(connector.getConnectUri().toString());
   Connection conn = fact.createConnection();
   conn.setClientID("clientID1");
   Session session = conn.createSession(false, 1);
   Destination dest = session.createTopic(topicName);
   TopicSubscriber sub = session.createDurableSubscriber((Topic) dest, subName);
   LOG.info("Durable subscription of name " + subName + "created.");
   Thread.sleep(100);

   // query durable sub on local and remote broker
   // raise an error if not found

   assertTrue(foundSubInLocalBroker(subName));

   assertTrue(foundSubInRemoteBrokerByTopicName(topicName));

   // unsubscribe from durable sub
   sub.close();
   session.unsubscribe(subName);
   LOG.info("Unsubscribed from durable subscription.");
   Thread.sleep(100);

   // query durable sub on local and remote broker
   // raise an error if its not removed from both brokers
   assertFalse(foundSubInLocalBroker(subName));

   assertFalse("Durable subscription not unregistered on remote broker", foundSubInRemoteBrokerByTopicName(topicName));

}
 
源代码29 项目: qpid-jms   文件: SessionIntegrationTest.java
@Test(timeout = 20000)
public void testCloseSessionWithExistingDurableTopicSubscriberDoesNotCloseSubscriberLink() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        Connection connection = testFixture.establishConnecton(testPeer);
        connection.start();

        testPeer.expectBegin();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        String topicName = "myTopic";
        Topic dest = session.createTopic(topicName);
        String subscriptionName = "mySubscription";

        testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
        testPeer.expectLinkFlow();

        TopicSubscriber subscriber = session.createDurableSubscriber(dest, subscriptionName);
        assertNotNull("TopicSubscriber object was null", subscriber);

        testPeer.expectEnd();
        session.close();

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
@Override
@Nullable
public Topic getTopic() throws JMSException {
	return (this.target instanceof TopicSubscriber ? ((TopicSubscriber) this.target).getTopic() : null);
}
 
 类所在包
 同包方法