类javax.jms.TopicPublisher源码实例Demo

下面列出了怎么用javax.jms.TopicPublisher的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: WeEvent   文件: JMSSample.java
private static void publish() throws JMSException {
    // get topic connection
    TopicConnectionFactory connectionFactory = new WeEventConnectionFactory(defaultBrokerUrl);
    TopicConnection connection = connectionFactory.createTopicConnection();

    // start connection
    connection.start();
    // create session
    TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

    // create topic
    Topic topic = session.createTopic(topicName);

    // create publisher
    TopicPublisher publisher = session.createPublisher(topic);
    // send message
    BytesMessage msg = session.createBytesMessage();
    msg.writeBytes(("hello WeEvent").getBytes(StandardCharsets.UTF_8));
    publisher.publish(msg);

    System.out.print("send done.");
    connection.close();
}
 
源代码2 项目: pooled-jms   文件: JmsPoolTopicPublisherTest.java
@Test
public void testGetTopic() throws JMSException {
    JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
    TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTemporaryTopic();
    TopicPublisher publisher = session.createPublisher(topic);

    assertNotNull(publisher.getTopic());
    assertSame(topic, publisher.getTopic());

    publisher.close();

    try {
        publisher.getTopic();
        fail("Cannot read topic on closed publisher");
    } catch (IllegalStateException ise) {}
}
 
源代码3 项目: pooled-jms   文件: PooledConnectionTest.java
@Test(timeout = 60000)
public void testTopicMessageSend() throws Exception {
    cf.setMaxConnections(1);

    TopicConnection connection = cf.createTopicConnection();

    try {
        TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = topicSession.createTopic(getTestName());

        TopicPublisher topicPublisher = topicSession.createPublisher(topic);
        topicPublisher.send(topicSession.createMessage());
        assertEquals(1, cf.getNumConnections());
    } finally {
        connection.close();
        cf.stop();
    }
}
 
源代码4 项目: pooled-jms   文件: XAConnectionPoolTest.java
@Test(timeout = 60000)
public void testSenderAndPublisherDest() throws Exception {
    JmsPoolXAConnectionFactory pcf = createXAPooledConnectionFactory();

    QueueConnection connection = pcf.createQueueConnection();
    QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    QueueSender sender = session.createSender(session.createQueue("AA"));
    assertNotNull(sender.getQueue().getQueueName());

    connection.close();

    TopicConnection topicConnection = pcf.createTopicConnection();
    TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic("AA"));
    assertNotNull(topicPublisher.getTopic().getTopicName());

    topicConnection.close();
    pcf.stop();
}
 
源代码5 项目: qpid-broker-j   文件: TopicSessionTest.java
@Test
public void publisherGetDeliveryModeAfterConnectionClose() throws Exception
{
    Topic topic = createTopic(getTestName());
    TopicConnection connection =  getTopicConnection();
    try
    {
        TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        TopicPublisher publisher = session.createPublisher(topic);
        connection.close();
        try
        {
            publisher.getDeliveryMode();
            fail("Expected exception not thrown");
        }
        catch (javax.jms.IllegalStateException e)
        {
            // PASS
        }
    }
    finally
    {
        connection.close();
    }
}
 
源代码6 项目: activemq-artemis   文件: AcknowledgementTest.java
/**
 * Topics shouldn't hold on to messages if there are no subscribers
 */
@Test
public void testPersistentMessagesForTopicDropped() throws Exception {
   TopicConnection topicConn = createTopicConnection();
   TopicSession sess = topicConn.createTopicSession(true, 0);
   TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1);
   pub.setDeliveryMode(DeliveryMode.PERSISTENT);

   Message m = sess.createTextMessage("testing123");
   pub.publish(m);
   sess.commit();

   topicConn.close();

   checkEmpty(ActiveMQServerTestCase.topic1);
}
 
源代码7 项目: activemq-artemis   文件: AcknowledgementTest.java
/**
 * Topics shouldn't hold on to messages when the non-durable subscribers close
 */
@Test
public void testPersistentMessagesForTopicDropped2() throws Exception {
   TopicConnection topicConn = createTopicConnection();
   topicConn.start();
   TopicSession sess = topicConn.createTopicSession(true, 0);
   TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1);
   TopicSubscriber sub = sess.createSubscriber(ActiveMQServerTestCase.topic1);
   pub.setDeliveryMode(DeliveryMode.PERSISTENT);

   Message m = sess.createTextMessage("testing123");
   pub.publish(m);
   sess.commit();

   // receive but rollback
   TextMessage m2 = (TextMessage) sub.receive(3000);

   ProxyAssertSupport.assertNotNull(m2);
   ProxyAssertSupport.assertEquals("testing123", m2.getText());

   sess.rollback();

   topicConn.close();

   checkEmpty(ActiveMQServerTestCase.topic1);
}
 
/**
 * Publish message
 *
 * @param message The message
 * @throws JMSException Thrown if an error occurs
 */
@Override
public void publish(final Message message) throws JMSException {
   session.lock();
   try {
      if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
         ActiveMQRALogger.LOGGER.trace("send " + this + " message=" + message);
      }

      checkState();

      ((TopicPublisher) producer).publish(message);

      if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
         ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message);
      }
   } finally {
      session.unlock();
   }
}
 
/**
 * Publish message
 *
 * @param destination The destination
 * @param message     The message
 * @throws JMSException Thrown if an error occurs
 */
@Override
public void publish(final Topic destination, final Message message) throws JMSException {
   session.lock();
   try {
      if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
         ActiveMQRALogger.LOGGER.trace("send " + this + " destination=" + destination + " message=" + message);
      }

      checkState();

      ((TopicPublisher) producer).publish(destination, message);

      if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
         ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message);
      }
   } finally {
      session.unlock();
   }
}
 
源代码10 项目: activemq-artemis   文件: ActiveMQRASession.java
/**
 * Create a topic publisher
 *
 * @param topic The topic
 * @return The publisher
 * @throws JMSException Thrown if an error occurs
 */
@Override
public TopicPublisher createPublisher(final Topic topic) throws JMSException {
   lock();
   try {
      TopicSession session = getTopicSessionInternal();

      if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
         ActiveMQRALogger.LOGGER.trace("createPublisher " + session + " topic=" + topic);
      }

      TopicPublisher result = session.createPublisher(topic);
      result = new ActiveMQRATopicPublisher(result, this);

      if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
         ActiveMQRALogger.LOGGER.trace("createdPublisher " + session + " publisher=" + result);
      }

      addProducer(result);

      return result;
   } finally {
      unlock();
   }
}
 
源代码11 项目: qpid-jms   文件: JmsTopicPublisherTest.java
@Test(timeout = 10000)
public void testPublishMessageOnProvidedTopicWhenNotAnonymous() throws Exception {
    Topic topic = session.createTopic(getTestName());
    TopicPublisher publisher = session.createPublisher(topic);
    Message message = session.createMessage();

    try {
        publisher.publish(session.createTopic(getTestName() + "1"), message);
        fail("Should throw UnsupportedOperationException");
    } catch (UnsupportedOperationException uoe) {}

    try {
        publisher.publish((Topic) null, message);
        fail("Should throw InvalidDestinationException");
    } catch (InvalidDestinationException ide) {}
}
 
源代码12 项目: qpid-jms   文件: JmsTopicPublisherTest.java
@Test(timeout = 10000)
public void testPublishMessageWithOptionsOnProvidedTopicWhenNotAnonymous() throws Exception {
    Topic topic = session.createTopic(getTestName());
    TopicPublisher publisher = session.createPublisher(topic);
    Message message = session.createMessage();

    try {
        publisher.publish(session.createTopic(getTestName() + "1"), message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
        fail("Should throw UnsupportedOperationException");
    } catch (UnsupportedOperationException uoe) {}

    try {
        publisher.publish((Topic) null, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
        fail("Should throw InvalidDestinationException");
    } catch (InvalidDestinationException ide) {}
}
 
源代码13 项目: qpid-jms   文件: JmsTopicPublisherTest.java
@Test
public void testCreateTopicPublisher() throws Exception {
    JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
    TopicConnection connection = factory.createTopicConnection();
    assertNotNull(connection);

    TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    assertNotNull(session);
    Topic topic = session.createTopic(name.getMethodName());
    TopicPublisher publisher = session.createPublisher(topic);
    assertNotNull(publisher);

    TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
    assertEquals(0, proxy.getEnqueueCount());
    connection.close();
}
 
源代码14 项目: brave   文件: TracingMessageProducer.java
@Override public void publish(Message message) throws JMSException {
  checkTopicPublisher();
  TopicPublisher tp = (TopicPublisher) delegate;

  Span span = createAndStartProducerSpan(message, destination(message));
  SpanInScope ws = tracer.withSpanInScope(span);
  Throwable error = null;
  try {
    tp.publish(message);
  } catch (Throwable t) {
    propagateIfFatal(t);
    error = t;
    throw t;
  } finally {
    if (error != null) span.error(error);
    span.finish();
    ws.close();
  }
}
 
源代码15 项目: brave   文件: TracingMessageProducer.java
@Override public void publish(Message message, int deliveryMode, int priority, long timeToLive)
  throws JMSException {
  checkTopicPublisher();
  TopicPublisher tp = (TopicPublisher) delegate;

  Span span = createAndStartProducerSpan(message, destination(message));
  SpanInScope ws = tracer.withSpanInScope(span);
  Throwable error = null;
  try {
    tp.publish(message, deliveryMode, priority, timeToLive);
  } catch (Throwable t) {
    propagateIfFatal(t);
    error = t;
    throw t;
  } finally {
    if (error != null) span.error(error);
    span.finish();
    ws.close();
  }
}
 
源代码16 项目: brave   文件: TracingMessageProducer.java
@Override
public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive)
  throws JMSException {
  checkTopicPublisher();
  TopicPublisher tp = (TopicPublisher) delegate;

  Span span = createAndStartProducerSpan(message, destination(message));
  SpanInScope ws = tracer.withSpanInScope(span);
  Throwable error = null;
  try {
    tp.publish(topic, message, deliveryMode, priority, timeToLive);
  } catch (Throwable t) {
    propagateIfFatal(t);
    error = t;
    throw t;
  } finally {
    if (error != null) span.error(error);
    span.finish();
    ws.close();
  }
}
 
源代码17 项目: WeEvent   文件: WeEventTopicSession.java
@Override
public TopicPublisher createPublisher(Topic topic) throws JMSException {
    if (StringUtils.isBlank(topic.getTopicName())) {
        throw WeEventConnectionFactory.error2JMSException(ErrorCode.TOPIC_IS_BLANK);
    }
    if (topic instanceof WeEventTopic) {
        return new WeEventTopicPublisher(this, (WeEventTopic) topic);
    }

    return null;
}
 
源代码18 项目: pooled-jms   文件: JmsPoolTopicPublisherTest.java
@Test
public void testToString() throws JMSException {
    JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
    TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTemporaryTopic();
    TopicPublisher publisher = session.createPublisher(topic);

    assertNotNull(publisher.toString());
}
 
源代码19 项目: pooled-jms   文件: JmsPoolTopicPublisherTest.java
@Test
public void testPublishToTopicFailsIfNotAnonymousPublisher() throws JMSException {
    JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
    TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTemporaryTopic();
    TopicPublisher publisher = session.createPublisher(topic);

    try {
        publisher.publish(session.createTemporaryTopic(), session.createTextMessage());
        fail("Should not be able to send to alternate destination");
    } catch (UnsupportedOperationException ex) {}
}
 
源代码20 项目: pooled-jms   文件: PooledTopicPublisherTest.java
@Test(timeout = 60000)
public void testJmsPoolConnectionFactory() throws Exception {
    ActiveMQTopic topic = new ActiveMQTopic("test");
    pcf = createPooledConnectionFactory();

    connection = (TopicConnection) pcf.createConnection();
    TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    TopicPublisher publisher = session.createPublisher(topic);
    publisher.publish(session.createMessage());
}
 
@Parameters({ "broker-port"})
@Test
public void testSubscriberPublisher(String port) throws Exception {
    String topicName = "MyTopic1";
    int numberOfMessages = 100;

    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder("admin", "admin", "localhost", port)
            .withTopic(topicName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    // Initialize subscriber
    TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
    TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);

    // publish 100 messages
    TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    TopicPublisher producer = producerSession.createPublisher(subscriberDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        producer.publish(producerSession.createTextMessage("Test message " + i));
    }

    producerSession.close();

    for (int i = 0; i < numberOfMessages; i++) {
        Message message = subscriber.receive(1000);
        Assert.assertNotNull(message, "Message #" + i + " was not received");
    }

    connection.close();
}
 
private void assertNotNullWithPublishSubscribeForTopics(String publishTopicName,
                                                        String subscribeTopicName) throws Exception {

    int numberOfMessages = 100;

    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder("admin", "admin", "localhost", port)
            .withTopic(publishTopicName)
            .withTopic(subscribeTopicName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic subscriberDestination = (Topic) initialContext.lookup(subscribeTopicName);
    TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);

    TopicSession publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic publisherDestination = (Topic) initialContext.lookup(publishTopicName);
    TopicPublisher publisher = publisherSession.createPublisher(publisherDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        publisher.publish(publisherSession.createTextMessage("Test message " + i));
    }

    publisherSession.close();

    for (int i = 0; i < numberOfMessages; i++) {
        Message message = subscriber.receive(1000);
        Assert.assertNotNull(message, "Message #" + i + " was not received");
    }

    subscriberSession.close();
    connection.close();
}
 
private void assertNullWithPublishSubscribeForTopics(String publishTopicName,
                                                     String subscribeTopicName) throws Exception {

    int numberOfMessages = 100;

    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder("admin", "admin", "localhost", port)
            .withTopic(publishTopicName)
            .withTopic(subscribeTopicName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic subscriberDestination = (Topic) initialContext.lookup(subscribeTopicName);
    TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);

    TopicSession publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic publisherDestination = (Topic) initialContext.lookup(publishTopicName);
    TopicPublisher publisher = publisherSession.createPublisher(publisherDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        publisher.publish(publisherSession.createTextMessage("Test message " + i));
    }

    publisherSession.close();

    Message message = subscriber.receive(1000);
    Assert.assertNull(message, "A message was received where no message was expected");

    subscriberSession.close();
    connection.close();
}
 
源代码24 项目: qpid-broker-j   文件: TopicSubscriberTest.java
@Test
public void messageDeliveredToAllSubscribers() throws Exception
{
    Topic topic = createTopic(getTestName());
    final TopicConnection connection = getTopicConnection();
    try
    {
        final TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        final TopicPublisher producer = session.createPublisher(topic);
        final TopicSubscriber subscriber1 = session.createSubscriber(topic);
        assertEquals("Unexpected subscriber1 topic", topic.getTopicName(), subscriber1.getTopic().getTopicName());
        final TopicSubscriber subscriber2 = session.createSubscriber(topic);
        assertEquals("Unexpected subscriber2 topic", topic.getTopicName(), subscriber2.getTopic().getTopicName());

        connection.start();
        String messageText = "Test Message";
        producer.send(session.createTextMessage(messageText));

        final Message subscriber1Message = subscriber1.receive(getReceiveTimeout());
        final Message subscriber2Message = subscriber2.receive(getReceiveTimeout());

        assertTrue("TextMessage should be received  by subscriber1", subscriber1Message instanceof TextMessage);
        assertEquals(messageText, ((TextMessage) subscriber1Message).getText());
        assertTrue("TextMessage should be received  by subscriber2", subscriber2Message instanceof TextMessage);
        assertEquals(messageText, ((TextMessage) subscriber2Message).getText());
    }
    finally
    {
        connection.close();
    }
}
 
源代码25 项目: moleculer-java   文件: JmsTransporter.java
protected TopicPublisher createOrGetPublisher(String channel) throws Exception {
	TopicPublisher publisher;
	synchronized (publishers) {
		publisher = publishers.get(channel);
		if (publisher != null) {
			return publisher;
		}
		Topic topic = session.createTopic(channel);
		publisher = session.createPublisher(topic);
		publisher.setDeliveryMode(deliveryMode);
		publishers.put(channel, publisher);
	}
	return publisher;
}
 
/**
 * Build a dynamic proxy that reflectively adapts to JMS 2.0 API methods, if necessary.
 * Otherwise simply return this CachedMessageProducer instance itself.
 */
public MessageProducer getProxyIfNecessary() {
	if (completionListenerClass != null) {
		return (MessageProducer) Proxy.newProxyInstance(CachedMessageProducer.class.getClassLoader(),
				new Class<?>[] {MessageProducer.class, QueueSender.class, TopicPublisher.class},
				new Jms2MessageProducerInvocationHandler());
	}
	else {
		return this;
	}
}
 
源代码27 项目: activemq-artemis   文件: SimpleOpenWireTest.java
@Test
public void testTempTopicDelete() throws Exception {
   connection.start();
   TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

   TemporaryTopic tempTopic = topicSession.createTemporaryTopic();

   ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection();

   try {
      TopicSession newTopicSession = newConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
      TopicPublisher publisher = newTopicSession.createPublisher(tempTopic);

      // need to wait here because the ActiveMQ client's temp destination map is updated asynchronously, not waiting can introduce a race
      assertTrue(Wait.waitFor(() -> newConn.activeTempDestinations.size() == 1, 2000, 100));

      TextMessage msg = newTopicSession.createTextMessage("Test Message");

      publisher.publish(msg);

      try {
         TopicSubscriber consumer = newTopicSession.createSubscriber(tempTopic);
         fail("should have gotten exception but got consumer: " + consumer);
      } catch (JMSException ex) {
         //correct
      }

      connection.close();

      try {
         Message newMsg = newTopicSession.createMessage();
         publisher.publish(newMsg);
      } catch (JMSException e) {
         //ok
      }

   } finally {
      newConn.close();
   }
}
 
/**
 * Create a new wrapper
 *
 * @param producer the producer
 * @param session  the session
 */
public ActiveMQRATopicPublisher(final TopicPublisher producer, final ActiveMQRASession session) {
   super(producer, session);

   if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
      ActiveMQRALogger.LOGGER.trace("constructor(" + producer + ", " + session + ")");
   }
}
 
/**
 * Get the topic
 *
 * @return The topic
 * @throws JMSException Thrown if an error occurs
 */
@Override
public Topic getTopic() throws JMSException {
   if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
      ActiveMQRALogger.LOGGER.trace("getTopic()");
   }

   return ((TopicPublisher) producer).getTopic();
}
 
/**
 * Publish message
 *
 * @param message      The message
 * @param deliveryMode The delivery mode
 * @param priority     The priority
 * @param timeToLive   The time to live
 * @throws JMSException Thrown if an error occurs
 */
@Override
public void publish(final Message message,
                    final int deliveryMode,
                    final int priority,
                    final long timeToLive) throws JMSException {
   session.lock();
   try {
      if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
         ActiveMQRALogger.LOGGER.trace("send " + this +
                                          " message=" +
                                          message +
                                          " deliveryMode=" +
                                          deliveryMode +
                                          " priority=" +
                                          priority +
                                          " ttl=" +
                                          timeToLive);
      }

      checkState();

      ((TopicPublisher) producer).publish(message, deliveryMode, priority, timeToLive);

      if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
         ActiveMQRALogger.LOGGER.trace("sent " + this + " result=" + message);
      }
   } finally {
      session.unlock();
   }
}
 
 类所在包
 同包方法