下面列出了怎么用javax.jms.Topic的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Create a JMS MessageConsumer for the given Session and Destination.
* <p>This implementation uses JMS 1.1 API.
* @param session the JMS Session to create a MessageConsumer for
* @param destination the JMS Destination to create a MessageConsumer for
* @return the new JMS MessageConsumer
* @throws javax.jms.JMSException if thrown by JMS API methods
*/
protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
if (isPubSubDomain() && destination instanceof Topic) {
if (isSubscriptionShared()) {
return (isSubscriptionDurable() ?
session.createSharedDurableConsumer((Topic) destination, getSubscriptionName(), getMessageSelector()) :
session.createSharedConsumer((Topic) destination, getSubscriptionName(), getMessageSelector()));
}
else if (isSubscriptionDurable()) {
return session.createDurableSubscriber(
(Topic) destination, getSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
}
else {
// Only pass in the NoLocal flag in case of a Topic (pub-sub mode):
// Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
// in case of the NoLocal flag being specified for a Queue.
return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
}
}
else {
return session.createConsumer(destination, getMessageSelector());
}
}
/**
* Create a JMS MessageConsumer for the given Session and Destination.
* <p>This implementation uses JMS 1.1 API.
* @param session the JMS Session to create a MessageConsumer for
* @param destination the JMS Destination to create a MessageConsumer for
* @return the new JMS MessageConsumer
* @throws javax.jms.JMSException if thrown by JMS API methods
*/
protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
if (isPubSubDomain() && destination instanceof Topic) {
if (isSubscriptionShared()) {
return (isSubscriptionDurable() ?
session.createSharedDurableConsumer((Topic) destination, getSubscriptionName(), getMessageSelector()) :
session.createSharedConsumer((Topic) destination, getSubscriptionName(), getMessageSelector()));
}
else if (isSubscriptionDurable()) {
return session.createDurableSubscriber(
(Topic) destination, getSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
}
else {
// Only pass in the NoLocal flag in case of a Topic (pub-sub mode):
// Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
// in case of the NoLocal flag being specified for a Queue.
return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
}
}
else {
return session.createConsumer(destination, getMessageSelector());
}
}
@Test
public void testGetTopic() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTemporaryTopic();
TopicSubscriber subscriber = session.createSubscriber(topic);
assertNotNull(subscriber.getTopic());
assertSame(topic, subscriber.getTopic());
subscriber.close();
try {
subscriber.getTopic();
fail("Cannot read topic on closed subscriber");
} catch (IllegalStateException ise) {}
}
/**
* Create a topic
*
* @param topicName the topic name
*/
@PublicAtsApi
public void createTopic(
final String topicName ) {
try {
final Session session = loadSession(false, Session.AUTO_ACKNOWLEDGE);
final Topic topic = session.createTopic(topicName);
session.createConsumer(topic).close();
} catch (JMSException e) {
throw new JmsMessageException("Could not start listening for messages on topic " + topicName,
e);
} finally {
releaseSession(false);
}
}
@Test
public void testGetTopic() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTemporaryTopic();
TopicPublisher publisher = session.createPublisher(topic);
assertNotNull(publisher.getTopic());
assertSame(topic, publisher.getTopic());
publisher.close();
try {
publisher.getTopic();
fail("Cannot read topic on closed publisher");
} catch (IllegalStateException ise) {}
}
@Test
public void validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() throws Exception {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class));
Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put("foo", "foo");
flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
publisher.publish(destinationName, "hellomq".getBytes(), flowFileAttributes);
Message receivedMessage = jmsTemplate.receive(destinationName);
assertTrue(receivedMessage instanceof BytesMessage);
assertEquals("foo", receivedMessage.getStringProperty("foo"));
assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
assertEquals("myTopic", ((Topic) receivedMessage.getJMSReplyTo()).getTopicName());
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
private void watcherPaths() throws JMSException {
Topic topic = session.createTopic(TOPIC);
MessageConsumer consumer = session.createConsumer(topic, keyFilter.toString());
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
String key = message.getStringProperty(IDENTIFIER);
TextMessage tm = (TextMessage) message;
keyValueMap.put(key, tm.getText());
LOGGER.info("key = " + key + ",value = " + tm.getText());
} catch (JMSException e) {
LOGGER.error("onMessage error", e);
}
}
});
}
public void publishMessage(String msg) throws NamingException, JMSException {
String topicName = "throttleData";
InitialContext initialContext = ClientHelper.getInitialContextBuilder("admin", "admin",
"localhost", "5672")
.withTopic(topicName)
.build();
ConnectionFactory connectionFactory
= (ConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = (Topic) initialContext.lookup(topicName);
MessageProducer producer = session.createProducer(topic);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("throttleKey", msg);
Date date = new Date();
long time = date.getTime() + 1000;
mapMessage.setLong("expiryTimeStamp", time);
mapMessage.setBoolean("isThrottled", true);
producer.send(mapMessage);
connection.close();
}
@Test(timeout = 60000)
public void testCreateDurableSubscriberWithSelectorAndNoLocal() throws Exception {
JmsPoolConnection connection = (JmsPoolConnection) cf.createConnection();
JmsPoolSession session = (JmsPoolSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTestName());
assertNotNull(session.createDurableSubscriber(topic, "name", "color = red", false));
session.close();
try {
session.createDurableSubscriber(topic, "other-name", "color = green", true);
fail("Should not be able to createDurableSubscriber when closed");
} catch (JMSException ex) {}
}
/**
* This is a JMS spec independent method to create a MessageProducer. Please be cautious when
* making any changes
*
* @param session JMS session
* @param destination the Destination
* @param isQueue is the Destination a queue?
* @param jmsSpec11 should we use JMS 1.1 API ?
* @return a MessageProducer to send messages to the given Destination
* @throws JMSException on errors, to be handled and logged by the caller
*/
public MessageProducer createProducer(Session session, Destination destination, Boolean isQueue)
throws JMSException {
if ("2.0".equals(jmsSpec) || "1.1".equals(jmsSpec) || isQueue == null) {
return session.createProducer(destination);
} else {
if (isQueue) {
return ((QueueSession) session).createSender((Queue) destination);
} else {
return ((TopicSession) session).createPublisher((Topic) destination);
}
}
}
@Test
public void testPublishIntoNonExistingTopic() throws Exception
{
final Topic topic = createTopic(getTestName());
final Connection connection = getConnection();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer messageProducer = session.createProducer(null);
messageProducer.send(topic, session.createTextMessage("A"));
session.commit();
connection.start();
MessageConsumer consumer = session.createConsumer(topic);
messageProducer.send(topic, session.createTextMessage("B"));
session.commit();
Message message = consumer.receive(getReceiveTimeout());
assertTrue("Expected message not received", message instanceof TextMessage);
TextMessage textMessage = (TextMessage) message;
assertEquals("Unexpected text", "B", textMessage.getText());
}
finally
{
connection.close();
}
}
@Test(timeout = 60000)
public void testCreateTopicPublisher() throws Exception {
JmsPoolConnection connection = (JmsPoolConnection) pooledFactory.createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic1 = session.createTopic("Topic-1");
Topic topic2 = session.createTopic("Topic-2");
JmsPoolTopicPublisher publisher1 = (JmsPoolTopicPublisher) session.createPublisher(topic1);
JmsPoolTopicPublisher publisher2 = (JmsPoolTopicPublisher) session.createPublisher(topic2);
assertSame(publisher1.getMessageProducer(), publisher2.getMessageProducer());
connection.close();
}
@Override
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
if (destination instanceof Topic) {
throw new IllegalStateException("Operation not supported by a QueueSession");
}
return super.createConsumer(destination, messageSelector, noLocal);
}
private MessageConsumer getCachedConsumer(Destination dest, @Nullable String selector,
@Nullable Boolean noLocal, @Nullable String subscription, boolean durable) throws JMSException {
ConsumerCacheKey cacheKey = new ConsumerCacheKey(dest, selector, noLocal, subscription, durable);
MessageConsumer consumer = this.cachedConsumers.get(cacheKey);
if (consumer != null) {
if (logger.isTraceEnabled()) {
logger.trace("Found cached JMS MessageConsumer for destination [" + dest + "]: " + consumer);
}
}
else {
if (dest instanceof Topic) {
if (noLocal == null) {
consumer = (durable ?
this.target.createSharedDurableConsumer((Topic) dest, subscription, selector) :
this.target.createSharedConsumer((Topic) dest, subscription, selector));
}
else {
consumer = (durable ?
this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) :
this.target.createConsumer(dest, selector, noLocal));
}
}
else {
consumer = this.target.createConsumer(dest, selector);
}
if (logger.isDebugEnabled()) {
logger.debug("Registering cached JMS MessageConsumer for destination [" + dest + "]: " + consumer);
}
this.cachedConsumers.put(cacheKey, consumer);
}
return new CachedMessageConsumer(consumer);
}
private void assertNotNullWithPublishSubscribeForTopics(String publishTopicName,
String subscribeTopicName) throws Exception {
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder("admin", "admin", "localhost", port)
.withTopic(publishTopicName)
.withTopic(subscribeTopicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subscriberDestination = (Topic) initialContext.lookup(subscribeTopicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
TopicSession publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic publisherDestination = (Topic) initialContext.lookup(publishTopicName);
TopicPublisher publisher = publisherSession.createPublisher(publisherDestination);
for (int i = 0; i < numberOfMessages; i++) {
publisher.publish(publisherSession.createTextMessage("Test message " + i));
}
publisherSession.close();
for (int i = 0; i < numberOfMessages; i++) {
Message message = subscriber.receive(1000);
Assert.assertNotNull(message, "Message #" + i + " was not received");
}
subscriberSession.close();
connection.close();
}
@Override
public Topic createTopicOnDirect(final Connection con, String topicName) throws JMSException, URISyntaxException
{
return createReflectively("org.apache.qpid.client.AMQTopic",
"direct://amq.direct/"
+ topicName
+ "/"
+ topicName
+ "?routingkey='"
+ topicName
+ "',exclusive='true',autodelete='true'");
}
@Override
public Topic createTopicOnFanout(final Connection con, String topicName) throws JMSException, URISyntaxException
{
return createReflectively("org.apache.qpid.client.AMQTopic", "fanout://amq.fanout/"
+ topicName
+ "/"
+ topicName
+ "?routingkey='"
+ topicName
+ "',exclusive='true',autodelete='true'");
}
@Override
public JMSConsumer createSharedDurableConsumer(Topic topic, String name, String selector) {
try {
return startIfNeeded(new JmsPoolJMSConsumer((JmsPoolMessageConsumer) getSession().createSharedDurableConsumer(topic, name, selector)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Parameters({ "broker-port"})
@Test
public void testSubscriberPublisher(String port) throws Exception {
String topicName = "MyTopic1";
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder("admin", "admin", "localhost", port)
.withTopic(topicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
// Initialize subscriber
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
// publish 100 messages
TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher producer = producerSession.createPublisher(subscriberDestination);
for (int i = 0; i < numberOfMessages; i++) {
producer.publish(producerSession.createTextMessage("Test message " + i));
}
producerSession.close();
for (int i = 0; i < numberOfMessages; i++) {
Message message = subscriber.receive(1000);
Assert.assertNotNull(message, "Message #" + i + " was not received");
}
connection.close();
}
@Test(timeout = 30000)
public void testCreateDurableConsumerWithSelector() {
JMSContext context = cf.createContext();
Topic topic = context.createTopic(getTestName());
assertNotNull(context.createDurableConsumer(topic, "test", "color = red", true));
context.close();
try {
context.createDurableConsumer(topic, "test", "color = red", true);
fail("Should not be able to create resource when context is closed");
} catch (IllegalStateRuntimeException isre) {}
}
@Test
public void testGlobalAndNotGlobalCanUseTheSameSubscriptionName() throws Exception
{
try (Connection connection = getConnectionBuilder().setClientId("testClientId").build();
Connection connection2 = getConnectionBuilder().setClientId(null).build())
{
Session publishingSession = connection.createSession();
Session subscriber1Session = connection.createSession();
Session subscriber2Session = connection2.createSession();
String topicName = getTestName();
Topic topic = publishingSession.createTopic("amq.direct/" + topicName);
MessageConsumer consumer1 = subscriber1Session.createSharedConsumer(topic, "testSharedSubscription");
MessageConsumer consumer2 = subscriber2Session.createSharedConsumer(topic, "testSharedSubscription");
connection.start();
connection2.start();
Utils.sendMessages(publishingSession, topic, 1);
Message message1 = consumer1.receive(getReceiveTimeout());
Message message2 = consumer2.receive(getReceiveTimeout());
assertNotNull("Message 1 was not received", message1);
assertNotNull("Message 2 was not received", message2);
assertEquals("Unexpected index for message 1", 0, message1.getIntProperty(Utils.INDEX));
assertEquals("Unexpected index for message 2", 0, message2.getIntProperty(Utils.INDEX));
}
}
@Override
public JMSConsumer createDurableConsumer(Topic topic, String name) {
try {
return startIfNeeded(new MockJMSConsumer(getSession(), (MockJMSMessageConsumer) getSession().createDurableConsumer(topic, name)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
/**
* <ul>
* <li>create and register a durable subscriber with no message selector
* <li>try to create another durable with the same name, should fail
* </ul>
* <p>
* QPID-2418
*/
@Test
public void multipleSubscribersWithTheSameName() throws Exception
{
String subscriptionName = getTestName() + "_sub";
Topic topic = createTopic(subscriptionName);
Connection conn = getConnection();
try
{
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create and register a durable subscriber with no message selector
session.createDurableSubscriber(topic, subscriptionName, null, false);
// try to recreate the durable subscriber
try
{
session.createDurableSubscriber(topic, subscriptionName, null, false);
fail("Subscription should not have been created");
}
catch (JMSException e)
{
// pass
}
}
finally
{
conn.close();
}
}
@Test
public void resolveWithPubSubVanillaSession() throws Exception {
Topic expectedDestination = new StubTopic();
Session session = mock(Session.class);
given(session.createTopic(DESTINATION_NAME)).willReturn(expectedDestination);
testResolveDestination(session, expectedDestination, true);
}
@Override
public JMSConsumer createDurableConsumer(Topic topic, String name, String selector, boolean noLocal) {
try {
return startIfNeeded(new JmsPoolJMSConsumer((JmsPoolMessageConsumer) getSession().createDurableConsumer(topic, name, selector, noLocal)));
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Test
public void testSendingToExchangePattern() throws Exception
{
updateAutoCreationPolicies();
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Topic topic = session.createTopic(getDestinationAddress("barExchange/foo", TYPE_TOPIC));
final MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage(TEST_MESSAGE));
final MessageConsumer consumer = session.createConsumer(topic);
Message received = consumer.receive(getReceiveTimeout() / 4);
assertNull(received);
producer.send(session.createTextMessage("Hello world2!"));
received = consumer.receive(getReceiveTimeout());
assertNotNull(received);
assertTrue(received instanceof TextMessage);
assertEquals("Hello world2!", ((TextMessage) received).getText());
}
finally
{
connection.close();
}
}
JmsSource(JMSContext context, JmsConnectorIncomingConfiguration config, Jsonb json, Executor executor) {
String name = config.getDestination().orElseGet(config::getChannel);
String selector = config.getSelector().orElse(null);
boolean nolocal = config.getNoLocal();
boolean broadcast = config.getBroadcast();
boolean durable = config.getDurable();
Destination destination = getDestination(context, name, config);
JMSConsumer consumer;
if (durable) {
if (!(destination instanceof Topic)) {
throw ex.illegalArgumentInvalidDestination();
}
consumer = context.createDurableConsumer((Topic) destination, name, selector, nolocal);
} else {
consumer = context.createConsumer(destination, selector, nolocal);
}
publisher = new JmsPublisher(consumer);
if (!broadcast) {
source = ReactiveStreams.fromPublisher(publisher).map(m -> new IncomingJmsMessage<>(m, executor, json));
} else {
source = ReactiveStreams.fromPublisher(
Multi.createFrom().publisher(publisher)
.map(m -> new IncomingJmsMessage<>(m, executor, json))
.broadcast().toAllSubscribers());
}
}
@Test(timeout = 60000)
public void testCreateDurableSubscriber() throws Exception {
JmsPoolConnection connection = (JmsPoolConnection) cf.createConnection();
JmsPoolSession session = (JmsPoolSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTestName());
assertNotNull(session.createDurableSubscriber(topic, "name"));
session.close();
try {
session.createDurableSubscriber(topic, "name-2");
fail("Should not be able to createDurableSubscriber when closed");
} catch (JMSException ex) {}
}
@Test(timeout = 60000)
public void testCreateTopic() throws Exception {
JmsPoolConnection connection = (JmsPoolConnection) cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTestName());
assertNotNull(topic);
assertEquals(getTestName(), topic.getTopicName());
assertTrue(topic instanceof MockJMSTopic);
}
@Test
public void testPublishToTopicFailsIfNotAnonymousPublisher() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTemporaryTopic();
TopicPublisher publisher = session.createPublisher(topic);
try {
publisher.publish(session.createTemporaryTopic(), session.createTextMessage());
fail("Should not be able to send to alternate destination");
} catch (UnsupportedOperationException ex) {}
}