下面列出了怎么用javax.jms.TopicPublisher的API类实例代码及写法,或者点击链接到github查看源代码。
private static void publish() throws JMSException {
// get topic connection
TopicConnectionFactory connectionFactory = new WeEventConnectionFactory(defaultBrokerUrl);
TopicConnection connection = connectionFactory.createTopicConnection();
// start connection
connection.start();
// create session
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// create topic
Topic topic = session.createTopic(topicName);
// create publisher
TopicPublisher publisher = session.createPublisher(topic);
// send message
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(("hello WeEvent").getBytes(StandardCharsets.UTF_8));
publisher.publish(msg);
System.out.print("send done.");
connection.close();
}
@Test
public void testGetTopic() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTemporaryTopic();
TopicPublisher publisher = session.createPublisher(topic);
assertNotNull(publisher.getTopic());
assertSame(topic, publisher.getTopic());
publisher.close();
try {
publisher.getTopic();
fail("Cannot read topic on closed publisher");
} catch (IllegalStateException ise) {}
}
@Test(timeout = 60000)
public void testTopicMessageSend() throws Exception {
cf.setMaxConnections(1);
TopicConnection connection = cf.createTopicConnection();
try {
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(getTestName());
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.send(topicSession.createMessage());
assertEquals(1, cf.getNumConnections());
} finally {
connection.close();
cf.stop();
}
}
@Test(timeout = 60000)
public void testSenderAndPublisherDest() throws Exception {
JmsPoolXAConnectionFactory pcf = createXAPooledConnectionFactory();
QueueConnection connection = pcf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender sender = session.createSender(session.createQueue("AA"));
assertNotNull(sender.getQueue().getQueueName());
connection.close();
TopicConnection topicConnection = pcf.createTopicConnection();
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic("AA"));
assertNotNull(topicPublisher.getTopic().getTopicName());
topicConnection.close();
pcf.stop();
}
@Test
public void publisherGetDeliveryModeAfterConnectionClose() throws Exception
{
Topic topic = createTopic(getTestName());
TopicConnection connection = getTopicConnection();
try
{
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = session.createPublisher(topic);
connection.close();
try
{
publisher.getDeliveryMode();
fail("Expected exception not thrown");
}
catch (javax.jms.IllegalStateException e)
{
// PASS
}
}
finally
{
connection.close();
}
}
/**
* Topics shouldn't hold on to messages if there are no subscribers
*/
@Test
public void testPersistentMessagesForTopicDropped() throws Exception {
TopicConnection topicConn = createTopicConnection();
TopicSession sess = topicConn.createTopicSession(true, 0);
TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1);
pub.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = sess.createTextMessage("testing123");
pub.publish(m);
sess.commit();
topicConn.close();
checkEmpty(ActiveMQServerTestCase.topic1);
}
/**
* Topics shouldn't hold on to messages when the non-durable subscribers close
*/
@Test
public void testPersistentMessagesForTopicDropped2() throws Exception {
TopicConnection topicConn = createTopicConnection();
topicConn.start();
TopicSession sess = topicConn.createTopicSession(true, 0);
TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1);
TopicSubscriber sub = sess.createSubscriber(ActiveMQServerTestCase.topic1);
pub.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = sess.createTextMessage("testing123");
pub.publish(m);
sess.commit();
// receive but rollback
TextMessage m2 = (TextMessage) sub.receive(3000);
ProxyAssertSupport.assertNotNull(m2);
ProxyAssertSupport.assertEquals("testing123", m2.getText());
sess.rollback();
topicConn.close();
checkEmpty(ActiveMQServerTestCase.topic1);
}
/**
* Publish message
*
* @param message The message
* @throws JMSException Thrown if an error occurs
*/
@Override
public void publish(final Message message) throws JMSException {
session.lock();
try {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("send " + this + " message=" + message);
}
checkState();
((TopicPublisher) producer).publish(message);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message);
}
} finally {
session.unlock();
}
}
/**
* Publish message
*
* @param destination The destination
* @param message The message
* @throws JMSException Thrown if an error occurs
*/
@Override
public void publish(final Topic destination, final Message message) throws JMSException {
session.lock();
try {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("send " + this + " destination=" + destination + " message=" + message);
}
checkState();
((TopicPublisher) producer).publish(destination, message);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message);
}
} finally {
session.unlock();
}
}
/**
* Create a topic publisher
*
* @param topic The topic
* @return The publisher
* @throws JMSException Thrown if an error occurs
*/
@Override
public TopicPublisher createPublisher(final Topic topic) throws JMSException {
lock();
try {
TopicSession session = getTopicSessionInternal();
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createPublisher " + session + " topic=" + topic);
}
TopicPublisher result = session.createPublisher(topic);
result = new ActiveMQRATopicPublisher(result, this);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createdPublisher " + session + " publisher=" + result);
}
addProducer(result);
return result;
} finally {
unlock();
}
}
@Test(timeout = 10000)
public void testPublishMessageOnProvidedTopicWhenNotAnonymous() throws Exception {
Topic topic = session.createTopic(getTestName());
TopicPublisher publisher = session.createPublisher(topic);
Message message = session.createMessage();
try {
publisher.publish(session.createTopic(getTestName() + "1"), message);
fail("Should throw UnsupportedOperationException");
} catch (UnsupportedOperationException uoe) {}
try {
publisher.publish((Topic) null, message);
fail("Should throw InvalidDestinationException");
} catch (InvalidDestinationException ide) {}
}
@Test(timeout = 10000)
public void testPublishMessageWithOptionsOnProvidedTopicWhenNotAnonymous() throws Exception {
Topic topic = session.createTopic(getTestName());
TopicPublisher publisher = session.createPublisher(topic);
Message message = session.createMessage();
try {
publisher.publish(session.createTopic(getTestName() + "1"), message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
fail("Should throw UnsupportedOperationException");
} catch (UnsupportedOperationException uoe) {}
try {
publisher.publish((Topic) null, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
fail("Should throw InvalidDestinationException");
} catch (InvalidDestinationException ide) {}
}
@Test
public void testCreateTopicPublisher() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
TopicConnection connection = factory.createTopicConnection();
assertNotNull(connection);
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Topic topic = session.createTopic(name.getMethodName());
TopicPublisher publisher = session.createPublisher(topic);
assertNotNull(publisher);
TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
assertEquals(0, proxy.getEnqueueCount());
connection.close();
}
@Override public void publish(Message message) throws JMSException {
checkTopicPublisher();
TopicPublisher tp = (TopicPublisher) delegate;
Span span = createAndStartProducerSpan(message, destination(message));
SpanInScope ws = tracer.withSpanInScope(span);
Throwable error = null;
try {
tp.publish(message);
} catch (Throwable t) {
propagateIfFatal(t);
error = t;
throw t;
} finally {
if (error != null) span.error(error);
span.finish();
ws.close();
}
}
@Override public void publish(Message message, int deliveryMode, int priority, long timeToLive)
throws JMSException {
checkTopicPublisher();
TopicPublisher tp = (TopicPublisher) delegate;
Span span = createAndStartProducerSpan(message, destination(message));
SpanInScope ws = tracer.withSpanInScope(span);
Throwable error = null;
try {
tp.publish(message, deliveryMode, priority, timeToLive);
} catch (Throwable t) {
propagateIfFatal(t);
error = t;
throw t;
} finally {
if (error != null) span.error(error);
span.finish();
ws.close();
}
}
@Override
public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive)
throws JMSException {
checkTopicPublisher();
TopicPublisher tp = (TopicPublisher) delegate;
Span span = createAndStartProducerSpan(message, destination(message));
SpanInScope ws = tracer.withSpanInScope(span);
Throwable error = null;
try {
tp.publish(topic, message, deliveryMode, priority, timeToLive);
} catch (Throwable t) {
propagateIfFatal(t);
error = t;
throw t;
} finally {
if (error != null) span.error(error);
span.finish();
ws.close();
}
}
@Override
public TopicPublisher createPublisher(Topic topic) throws JMSException {
if (StringUtils.isBlank(topic.getTopicName())) {
throw WeEventConnectionFactory.error2JMSException(ErrorCode.TOPIC_IS_BLANK);
}
if (topic instanceof WeEventTopic) {
return new WeEventTopicPublisher(this, (WeEventTopic) topic);
}
return null;
}
@Test
public void testToString() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTemporaryTopic();
TopicPublisher publisher = session.createPublisher(topic);
assertNotNull(publisher.toString());
}
@Test
public void testPublishToTopicFailsIfNotAnonymousPublisher() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTemporaryTopic();
TopicPublisher publisher = session.createPublisher(topic);
try {
publisher.publish(session.createTemporaryTopic(), session.createTextMessage());
fail("Should not be able to send to alternate destination");
} catch (UnsupportedOperationException ex) {}
}
@Test(timeout = 60000)
public void testJmsPoolConnectionFactory() throws Exception {
ActiveMQTopic topic = new ActiveMQTopic("test");
pcf = createPooledConnectionFactory();
connection = (TopicConnection) pcf.createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = session.createPublisher(topic);
publisher.publish(session.createMessage());
}
@Parameters({ "broker-port"})
@Test
public void testSubscriberPublisher(String port) throws Exception {
String topicName = "MyTopic1";
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder("admin", "admin", "localhost", port)
.withTopic(topicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
// Initialize subscriber
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
// publish 100 messages
TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher producer = producerSession.createPublisher(subscriberDestination);
for (int i = 0; i < numberOfMessages; i++) {
producer.publish(producerSession.createTextMessage("Test message " + i));
}
producerSession.close();
for (int i = 0; i < numberOfMessages; i++) {
Message message = subscriber.receive(1000);
Assert.assertNotNull(message, "Message #" + i + " was not received");
}
connection.close();
}
private void assertNotNullWithPublishSubscribeForTopics(String publishTopicName,
String subscribeTopicName) throws Exception {
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder("admin", "admin", "localhost", port)
.withTopic(publishTopicName)
.withTopic(subscribeTopicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subscriberDestination = (Topic) initialContext.lookup(subscribeTopicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
TopicSession publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic publisherDestination = (Topic) initialContext.lookup(publishTopicName);
TopicPublisher publisher = publisherSession.createPublisher(publisherDestination);
for (int i = 0; i < numberOfMessages; i++) {
publisher.publish(publisherSession.createTextMessage("Test message " + i));
}
publisherSession.close();
for (int i = 0; i < numberOfMessages; i++) {
Message message = subscriber.receive(1000);
Assert.assertNotNull(message, "Message #" + i + " was not received");
}
subscriberSession.close();
connection.close();
}
private void assertNullWithPublishSubscribeForTopics(String publishTopicName,
String subscribeTopicName) throws Exception {
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder("admin", "admin", "localhost", port)
.withTopic(publishTopicName)
.withTopic(subscribeTopicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subscriberDestination = (Topic) initialContext.lookup(subscribeTopicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
TopicSession publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic publisherDestination = (Topic) initialContext.lookup(publishTopicName);
TopicPublisher publisher = publisherSession.createPublisher(publisherDestination);
for (int i = 0; i < numberOfMessages; i++) {
publisher.publish(publisherSession.createTextMessage("Test message " + i));
}
publisherSession.close();
Message message = subscriber.receive(1000);
Assert.assertNull(message, "A message was received where no message was expected");
subscriberSession.close();
connection.close();
}
@Test
public void messageDeliveredToAllSubscribers() throws Exception
{
Topic topic = createTopic(getTestName());
final TopicConnection connection = getTopicConnection();
try
{
final TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
final TopicPublisher producer = session.createPublisher(topic);
final TopicSubscriber subscriber1 = session.createSubscriber(topic);
assertEquals("Unexpected subscriber1 topic", topic.getTopicName(), subscriber1.getTopic().getTopicName());
final TopicSubscriber subscriber2 = session.createSubscriber(topic);
assertEquals("Unexpected subscriber2 topic", topic.getTopicName(), subscriber2.getTopic().getTopicName());
connection.start();
String messageText = "Test Message";
producer.send(session.createTextMessage(messageText));
final Message subscriber1Message = subscriber1.receive(getReceiveTimeout());
final Message subscriber2Message = subscriber2.receive(getReceiveTimeout());
assertTrue("TextMessage should be received by subscriber1", subscriber1Message instanceof TextMessage);
assertEquals(messageText, ((TextMessage) subscriber1Message).getText());
assertTrue("TextMessage should be received by subscriber2", subscriber2Message instanceof TextMessage);
assertEquals(messageText, ((TextMessage) subscriber2Message).getText());
}
finally
{
connection.close();
}
}
protected TopicPublisher createOrGetPublisher(String channel) throws Exception {
TopicPublisher publisher;
synchronized (publishers) {
publisher = publishers.get(channel);
if (publisher != null) {
return publisher;
}
Topic topic = session.createTopic(channel);
publisher = session.createPublisher(topic);
publisher.setDeliveryMode(deliveryMode);
publishers.put(channel, publisher);
}
return publisher;
}
/**
* Build a dynamic proxy that reflectively adapts to JMS 2.0 API methods, if necessary.
* Otherwise simply return this CachedMessageProducer instance itself.
*/
public MessageProducer getProxyIfNecessary() {
if (completionListenerClass != null) {
return (MessageProducer) Proxy.newProxyInstance(CachedMessageProducer.class.getClassLoader(),
new Class<?>[] {MessageProducer.class, QueueSender.class, TopicPublisher.class},
new Jms2MessageProducerInvocationHandler());
}
else {
return this;
}
}
@Test
public void testTempTopicDelete() throws Exception {
connection.start();
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic tempTopic = topicSession.createTemporaryTopic();
ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection();
try {
TopicSession newTopicSession = newConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = newTopicSession.createPublisher(tempTopic);
// need to wait here because the ActiveMQ client's temp destination map is updated asynchronously, not waiting can introduce a race
assertTrue(Wait.waitFor(() -> newConn.activeTempDestinations.size() == 1, 2000, 100));
TextMessage msg = newTopicSession.createTextMessage("Test Message");
publisher.publish(msg);
try {
TopicSubscriber consumer = newTopicSession.createSubscriber(tempTopic);
fail("should have gotten exception but got consumer: " + consumer);
} catch (JMSException ex) {
//correct
}
connection.close();
try {
Message newMsg = newTopicSession.createMessage();
publisher.publish(newMsg);
} catch (JMSException e) {
//ok
}
} finally {
newConn.close();
}
}
/**
* Create a new wrapper
*
* @param producer the producer
* @param session the session
*/
public ActiveMQRATopicPublisher(final TopicPublisher producer, final ActiveMQRASession session) {
super(producer, session);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("constructor(" + producer + ", " + session + ")");
}
}
/**
* Get the topic
*
* @return The topic
* @throws JMSException Thrown if an error occurs
*/
@Override
public Topic getTopic() throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("getTopic()");
}
return ((TopicPublisher) producer).getTopic();
}
/**
* Publish message
*
* @param message The message
* @param deliveryMode The delivery mode
* @param priority The priority
* @param timeToLive The time to live
* @throws JMSException Thrown if an error occurs
*/
@Override
public void publish(final Message message,
final int deliveryMode,
final int priority,
final long timeToLive) throws JMSException {
session.lock();
try {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("send " + this +
" message=" +
message +
" deliveryMode=" +
deliveryMode +
" priority=" +
priority +
" ttl=" +
timeToLive);
}
checkState();
((TopicPublisher) producer).publish(message, deliveryMode, priority, timeToLive);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message);
}
} finally {
session.unlock();
}
}