下面列出了怎么用javax.jms.TopicSubscriber的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testGetTopic() throws Exception {
Connection consumerConnection = null;
try {
consumerConnection = createConnection();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer topicConsumer = consumerSession.createConsumer(ActiveMQServerTestCase.topic1);
Topic t = ((TopicSubscriber) topicConsumer).getTopic();
ProxyAssertSupport.assertEquals(ActiveMQServerTestCase.topic1, t);
} finally {
if (consumerConnection != null) {
consumerConnection.close();
}
}
}
public void testRemoveAfterRestart() throws Exception {
Connection connection = createConnection();
connection.setClientID("cliID");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = session.createDurableSubscriber((Topic) createDestination(), "subName");
subscriber.close();
connection.close();
LOG.info("Broker restarting, wait for inactive cleanup afterwards.");
restartBroker();
LOG.info("Broker restarted, wait for inactive cleanup now.");
assertTrue(broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
}
}, 20000));
}
@Test
public void testUnsubscribeWithActiveConsumer() throws Exception {
Connection conn = createConnection();
conn.setClientID("zeke");
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber dursub = s.createDurableSubscriber(ActiveMQServerTestCase.topic1, "dursub0");
try {
s.unsubscribe("dursub0");
ProxyAssertSupport.fail();
} catch (IllegalStateException e) {
// Ok - it is illegal to ubscribe a subscription if it has active consumers
}
dursub.close();
s.unsubscribe("dursub0");
}
/**
* Running this test you can produce a leak of only 2 ConsumerInfo on BE
* broker, NOT 200 as in other cases!
*/
public void testWithoutSessionAndSubsciberClosePlayAround() throws Exception {
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
for (int i = 0; i < 100; i++) {
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = subscriberSession.createSubscriber(topic);
DummyMessageListener listener = new DummyMessageListener();
subscriber.setMessageListener(listener);
if (i != 50) {
subscriber.close();
subscriberSession.close();
}
}
connection.close();
Thread.sleep(1000);
Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic);
assertNotNull(dest);
assertTrue(dest.getConsumers().isEmpty());
}
public void testWithSessionCloseOutsideTheLoop() throws Exception {
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < 100; i++) {
TopicSubscriber subscriber = subscriberSession.createSubscriber(topic);
DummyMessageListener listener = new DummyMessageListener();
subscriber.setMessageListener(listener);
subscriber.close();
}
subscriberSession.close();
connection.close();
Thread.sleep(1000);
Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic);
assertNotNull(dest);
assertTrue(dest.getConsumers().isEmpty());
}
/**
* Topics shouldn't hold on to messages when the non-durable subscribers close
*/
@Test
public void testPersistentMessagesForTopicDropped2() throws Exception {
TopicConnection topicConn = createTopicConnection();
topicConn.start();
TopicSession sess = topicConn.createTopicSession(true, 0);
TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1);
TopicSubscriber sub = sess.createSubscriber(ActiveMQServerTestCase.topic1);
pub.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = sess.createTextMessage("testing123");
pub.publish(m);
sess.commit();
// receive but rollback
TextMessage m2 = (TextMessage) sub.receive(3000);
ProxyAssertSupport.assertNotNull(m2);
ProxyAssertSupport.assertEquals("testing123", m2.getText());
sess.rollback();
topicConn.close();
checkEmpty(ActiveMQServerTestCase.topic1);
}
@Override
public TopicSubscriber createDurableSubscriber(final Topic topic,
final String name,
String messageSelector,
final boolean noLocal) throws JMSException {
// As per spec. section 4.11
if (sessionType == ActiveMQSession.TYPE_QUEUE_SESSION) {
throw new IllegalStateException("Cannot create a durable subscriber on a QueueSession");
}
checkTopic(topic);
if (!(topic instanceof ActiveMQDestination)) {
throw new InvalidDestinationException("Not an ActiveMQTopic:" + topic);
}
if ("".equals(messageSelector)) {
messageSelector = null;
}
ActiveMQDestination jbdest = (ActiveMQDestination) topic;
if (jbdest.isQueue()) {
throw new InvalidDestinationException("Cannot create a subscriber on a queue");
}
return createConsumer(jbdest, name, messageSelector, noLocal, ConsumerDurability.DURABLE);
}
@Test
public void testAnycastQueueToMulticastTopicBothAddress() throws Exception {
String address = "testBoth";
String clientId = "test-client-id";
File file = createMessageFile();
connection.setClientID(clientId);
createBothTypeAddress(address);
createQueue(RoutingType.ANYCAST, address, address);
Session session = createSession(connection);
TopicSubscriber subscriber = session.createDurableSubscriber(session.createTopic(address), "test-subscriber");
List<Message> messages = generateTextMessages(session, getDestination(address));
exportMessages(address, file);
importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + address, file);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
TextMessage messageReceived = (TextMessage) subscriber.receive(1000);
assertNotNull(messageReceived);
assertEquals(((TextMessage) messages.get(i)).getText(), messageReceived.getText());
}
}
protected void consumeDurableTestMessages(Connection connection, String sub, int size, String topicName,
AtomicLong publishedMessageSize) throws Exception {
Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
try {
TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
for (int i = 0; i < size; i++) {
ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
if (publishedMessageSize != null) {
publishedMessageSize.addAndGet(-message.getCoreMessage().getEncodeSize());
}
}
} finally {
session.close();
}
}
@Override
public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
if (topic instanceof WeEventTopic) {
WeEventTopicSubscriber subscriber = new WeEventTopicSubscriber((WeEventTopic) topic);
this.topicConnection.createSubscriber(subscriber);
return subscriber;
}
throw new JMSException(WeEventConnectionFactory.NotSupportTips);
}
@Test
public void testTempTopicDelete() throws Exception {
connection.start();
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic tempTopic = topicSession.createTemporaryTopic();
ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection();
try {
TopicSession newTopicSession = newConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = newTopicSession.createPublisher(tempTopic);
// need to wait here because the ActiveMQ client's temp destination map is updated asynchronously, not waiting can introduce a race
assertTrue(Wait.waitFor(() -> newConn.activeTempDestinations.size() == 1, 2000, 100));
TextMessage msg = newTopicSession.createTextMessage("Test Message");
publisher.publish(msg);
try {
TopicSubscriber consumer = newTopicSession.createSubscriber(tempTopic);
fail("should have gotten exception but got consumer: " + consumer);
} catch (JMSException ex) {
//correct
}
connection.close();
try {
Message newMsg = newTopicSession.createMessage();
publisher.publish(newMsg);
} catch (JMSException e) {
//ok
}
} finally {
newConn.close();
}
}
protected TopicSubscriber createDurableSubscriber(Connection conn, Destination dest, String name) throws Exception {
conn.setClientID(name);
connections.add(conn);
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TopicSubscriber consumer = sess.createDurableSubscriber((javax.jms.Topic) dest, name);
return consumer;
}
/**
* Create a topic subscriber
*
* @param topic The topic
* @param messageSelector The message selector
* @param noLocal If true inhibits the delivery of messages published by its own connection
* @return The subscriber
* @throws JMSException Thrown if an error occurs
*/
@Override
public TopicSubscriber createSubscriber(final Topic topic,
final String messageSelector,
final boolean noLocal) throws JMSException {
lock();
try {
TopicSession session = getTopicSessionInternal();
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createSubscriber " + session +
" topic=" +
topic +
" selector=" +
messageSelector +
" noLocal=" +
noLocal);
}
TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal);
result = new ActiveMQRATopicSubscriber(result, this);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createdSubscriber " + session + " ActiveMQTopicSubscriber=" + result);
}
addConsumer(result);
return result;
} finally {
unlock();
}
}
@Test
public void testToString() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTemporaryTopic();
TopicSubscriber subscriber = session.createSubscriber(topic);
assertNotNull(subscriber.toString());
}
@Test(timeout = 20000)
public void testDurableSubscriptionUnsubscribeInUseThrowsJMSEx() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
testPeer.expectLinkFlow();
TopicSubscriber subscriber = session.createDurableSubscriber(dest, subscriptionName);
assertNotNull("TopicSubscriber object was null", subscriber);
try {
session.unsubscribe(subscriptionName);
fail("Should have thrown a JMSException");
} catch (JMSException ex) {
}
testPeer.expectDetach(false, true, false);
subscriber.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
/**
* Get the no local value
*
* @return The value
* @throws JMSException Thrown if an error occurs
*/
@Override
public boolean getNoLocal() throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("getNoLocal()");
}
checkState();
return ((TopicSubscriber) consumer).getNoLocal();
}
@Parameters({ "broker-port"})
@Test
public void testSubscriberPublisher(String port) throws Exception {
String topicName = "MyTopic1";
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder("admin", "admin", "localhost", port)
.withTopic(topicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
// Initialize subscriber
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
// publish 100 messages
TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher producer = producerSession.createPublisher(subscriberDestination);
for (int i = 0; i < numberOfMessages; i++) {
producer.publish(producerSession.createTextMessage("Test message " + i));
}
producerSession.close();
for (int i = 0; i < numberOfMessages; i++) {
Message message = subscriber.receive(1000);
Assert.assertNotNull(message, "Message #" + i + " was not received");
}
connection.close();
}
private void assertNotNullWithPublishSubscribeForTopics(String publishTopicName,
String subscribeTopicName) throws Exception {
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder("admin", "admin", "localhost", port)
.withTopic(publishTopicName)
.withTopic(subscribeTopicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subscriberDestination = (Topic) initialContext.lookup(subscribeTopicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
TopicSession publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic publisherDestination = (Topic) initialContext.lookup(publishTopicName);
TopicPublisher publisher = publisherSession.createPublisher(publisherDestination);
for (int i = 0; i < numberOfMessages; i++) {
publisher.publish(publisherSession.createTextMessage("Test message " + i));
}
publisherSession.close();
for (int i = 0; i < numberOfMessages; i++) {
Message message = subscriber.receive(1000);
Assert.assertNotNull(message, "Message #" + i + " was not received");
}
subscriberSession.close();
connection.close();
}
private void assertNullWithPublishSubscribeForTopics(String publishTopicName,
String subscribeTopicName) throws Exception {
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder("admin", "admin", "localhost", port)
.withTopic(publishTopicName)
.withTopic(subscribeTopicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subscriberDestination = (Topic) initialContext.lookup(subscribeTopicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
TopicSession publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic publisherDestination = (Topic) initialContext.lookup(publishTopicName);
TopicPublisher publisher = publisherSession.createPublisher(publisherDestination);
for (int i = 0; i < numberOfMessages; i++) {
publisher.publish(publisherSession.createTextMessage("Test message " + i));
}
publisherSession.close();
Message message = subscriber.receive(1000);
Assert.assertNull(message, "A message was received where no message was expected");
subscriberSession.close();
connection.close();
}
/**
* Get the topic
*
* @return The topic
* @throws JMSException Thrown if an error occurs
*/
@Override
public Topic getTopic() throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("getTopic()");
}
checkState();
return ((TopicSubscriber) consumer).getTopic();
}
private void consumeDurableSubscriptionMessages(Connection connection, boolean selector) throws Exception
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = null;
TopicSubscriber durSub = null;
if (selector)
{
topic = session.createTopic(SELECTOR_TOPIC_NAME);
durSub = session.createDurableSubscriber(topic, SELECTOR_SUB_NAME, "testprop='true'", false);
}
else
{
topic = session.createTopic(TOPIC_NAME);
durSub = session.createDurableSubscriber(topic, SUB_NAME);
}
// Retrieve the matching message
Message m = durSub.receive(getReceiveTimeout());
assertThat("Failed to receive an expected message", m, is(notNullValue()));
if (selector)
{
assertThat("Selector property did not match", m.getStringProperty("testprop"), is(equalTo("true")));
}
assertThat("ID property did not match", m.getIntProperty("ID"), is(equalTo(1)));
assertThat("Message content was not as expected",
((TextMessage) m).getText(),
is(equalTo(generateString(1024))));
// Verify that no more messages are received
m = durSub.receive(getReceiveTimeout());
assertThat("No more messages should have been recieved", m, is(nullValue()));
durSub.close();
session.close();
}
@Test
public void testPassiveTTLWithDurableSubscription() throws Exception
{
long timeToLiveMillis = getReceiveTimeout() * 2;
String subscriptionName = getTestName() + "_sub";
Topic topic = createTopic(getTestName());
TopicConnection connection = getTopicConnection();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName);
MessageProducer producer = session.createProducer(topic);
producer.setTimeToLive(timeToLiveMillis);
producer.send(session.createTextMessage("A"));
producer.setTimeToLive(0);
producer.send(session.createTextMessage("B"));
session.commit();
connection.start();
Message message = durableSubscriber.receive(getReceiveTimeout());
assertTrue("TextMessage should be received", message instanceof TextMessage);
assertEquals("Unexpected message received", "A", ((TextMessage)message).getText());
Thread.sleep(timeToLiveMillis);
session.rollback();
message = durableSubscriber.receive(getReceiveTimeout());
assertTrue("TextMessage should be received after waiting for TTL", message instanceof TextMessage);
assertEquals("Unexpected message received after waiting for TTL", "B", ((TextMessage) message).getText());
}
finally
{
connection.close();
}
}
@Test
public void testActiveTTLWithDurableSubscription() throws Exception
{
long timeToLiveMillis = getReceiveTimeout();
String subscriptionName = getTestName() + "_sub";
Topic topic = createTopic(getTestName());
TopicConnection connection = getTopicConnection();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName);
MessageProducer producer = session.createProducer(topic);
producer.setTimeToLive(timeToLiveMillis);
producer.send(session.createTextMessage("A"));
producer.setTimeToLive(0);
producer.send(session.createTextMessage("B"));
session.commit();
Thread.sleep(timeToLiveMillis);
connection.start();
Message message = durableSubscriber.receive(getReceiveTimeout());
assertTrue("TextMessage should be received", message instanceof TextMessage);
assertEquals("Unexpected message received", "B", ((TextMessage)message).getText());
}
finally
{
connection.close();
}
}
@Test
public void messageDeliveredToAllSubscribers() throws Exception
{
Topic topic = createTopic(getTestName());
final TopicConnection connection = getTopicConnection();
try
{
final TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
final TopicPublisher producer = session.createPublisher(topic);
final TopicSubscriber subscriber1 = session.createSubscriber(topic);
assertEquals("Unexpected subscriber1 topic", topic.getTopicName(), subscriber1.getTopic().getTopicName());
final TopicSubscriber subscriber2 = session.createSubscriber(topic);
assertEquals("Unexpected subscriber2 topic", topic.getTopicName(), subscriber2.getTopic().getTopicName());
connection.start();
String messageText = "Test Message";
producer.send(session.createTextMessage(messageText));
final Message subscriber1Message = subscriber1.receive(getReceiveTimeout());
final Message subscriber2Message = subscriber2.receive(getReceiveTimeout());
assertTrue("TextMessage should be received by subscriber1", subscriber1Message instanceof TextMessage);
assertEquals(messageText, ((TextMessage) subscriber1Message).getText());
assertTrue("TextMessage should be received by subscriber2", subscriber2Message instanceof TextMessage);
assertEquals(messageText, ((TextMessage) subscriber2Message).getText());
}
finally
{
connection.close();
}
}
private ActiveMQTopic registerDurableConsumer(BrokerService brokerService,
MessageListener listener) throws Exception {
ConnectionFactory factory = createConnectionFactory(brokerService);
Connection connection = factory.createConnection();
connection.setClientID("DurableOne");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic destination = (ActiveMQTopic) session.createTopic(DESTINATION_NAME);
// unique to a broker
TopicSubscriber sub = session.createDurableSubscriber(destination, "SubOne" + brokerService.getBrokerName());
sub.setMessageListener(listener);
return destination;
}
private void openConsumer() throws Exception {
consumerConnection = (ActiveMQConnection) createConnection();
consumerConnection.setClientID("cliID");
consumerConnection.start();
Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "subName", "filter=true", false);
subscriber.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
received++;
}
});
}
/**
* Create a new wrapper
*
* @param consumer the topic subscriber
* @param session the session
*/
public ActiveMQRATopicSubscriber(final TopicSubscriber consumer, final ActiveMQRASession session) {
super(consumer, session);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("constructor(" + consumer + ", " + session + ")");
}
}
/**
* Creates a durable topic subscription, checks that it is propagated
* in the broker network, removes the subscription and checks that
* the subscription is removed from remote broker as well.
*
* @throws Exception
*/
public void testDurableSubNetwork() throws Exception {
LOG.info("testDurableSubNetwork started.");
// create durable sub
ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory(connector.getConnectUri().toString());
Connection conn = fact.createConnection();
conn.setClientID("clientID1");
Session session = conn.createSession(false, 1);
Destination dest = session.createTopic(topicName);
TopicSubscriber sub = session.createDurableSubscriber((Topic) dest, subName);
LOG.info("Durable subscription of name " + subName + "created.");
Thread.sleep(100);
// query durable sub on local and remote broker
// raise an error if not found
assertTrue(foundSubInLocalBroker(subName));
assertTrue(foundSubInRemoteBrokerByTopicName(topicName));
// unsubscribe from durable sub
sub.close();
session.unsubscribe(subName);
LOG.info("Unsubscribed from durable subscription.");
Thread.sleep(100);
// query durable sub on local and remote broker
// raise an error if its not removed from both brokers
assertFalse(foundSubInLocalBroker(subName));
assertFalse("Durable subscription not unregistered on remote broker", foundSubInRemoteBrokerByTopicName(topicName));
}
@Test(timeout = 20000)
public void testCloseSessionWithExistingDurableTopicSubscriberDoesNotCloseSubscriberLink() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
testPeer.expectLinkFlow();
TopicSubscriber subscriber = session.createDurableSubscriber(dest, subscriptionName);
assertNotNull("TopicSubscriber object was null", subscriber);
testPeer.expectEnd();
session.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Override
@Nullable
public Topic getTopic() throws JMSException {
return (this.target instanceof TopicSubscriber ? ((TopicSubscriber) this.target).getTopic() : null);
}