javax.jms.TopicConnectionFactory#createTopicConnection ( )源码实例Demo

下面列出了javax.jms.TopicConnectionFactory#createTopicConnection ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: WeEvent   文件: JMSSample.java
private static void publish() throws JMSException {
    // get topic connection
    TopicConnectionFactory connectionFactory = new WeEventConnectionFactory(defaultBrokerUrl);
    TopicConnection connection = connectionFactory.createTopicConnection();

    // start connection
    connection.start();
    // create session
    TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

    // create topic
    Topic topic = session.createTopic(topicName);

    // create publisher
    TopicPublisher publisher = session.createPublisher(topic);
    // send message
    BytesMessage msg = session.createBytesMessage();
    msg.writeBytes(("hello WeEvent").getBytes(StandardCharsets.UTF_8));
    publisher.publish(msg);

    System.out.print("send done.");
    connection.close();
}
 
源代码2 项目: cacheonix-core   文件: JMSSink.java
public JMSSink(final String tcfBindingName, final String topicBindingName, final String username,
               final String password) {

   try {
      final Context ctx = new InitialContext();
      final TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) lookup(ctx,
              tcfBindingName);

      final TopicConnection topicConnection =
              topicConnectionFactory.createTopicConnection(username,
                      password);
      topicConnection.start();

      final TopicSession topicSession = topicConnection.createTopicSession(false,
              Session.AUTO_ACKNOWLEDGE);

      final Topic topic = (Topic) ctx.lookup(topicBindingName);

      final TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);

      topicSubscriber.setMessageListener(this);

   } catch (final Exception e) {
      logger.error("Could not read JMS message.", e);
   }
}
 
源代码3 项目: activemq-artemis   文件: PubSubTestCase.java
/**
 * Create all administrated objects connections and sessions ready to use for tests.
 * <br />
 * Start connections.
 */
@Override
@Before
public void setUp() throws Exception {
   super.setUp();

   try {
      // ...and creates administrated objects and binds them
      admin.createTopicConnectionFactory(PubSubTestCase.TCF_NAME);
      admin.createTopic(PubSubTestCase.TOPIC_NAME);

      Context ctx = admin.createContext();

      publisherTCF = (TopicConnectionFactory) ctx.lookup(PubSubTestCase.TCF_NAME);
      publisherTopic = (Topic) ctx.lookup(PubSubTestCase.TOPIC_NAME);
      publisherConnection = publisherTCF.createTopicConnection();
      publisherConnection.setClientID("publisherConnection");
      publisherSession = publisherConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
      publisher = publisherSession.createPublisher(publisherTopic);

      subscriberTCF = (TopicConnectionFactory) ctx.lookup(PubSubTestCase.TCF_NAME);
      subscriberTopic = (Topic) ctx.lookup(PubSubTestCase.TOPIC_NAME);
      subscriberConnection = subscriberTCF.createTopicConnection();
      subscriberConnection.setClientID("subscriberConnection");
      subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
      subscriber = subscriberSession.createSubscriber(subscriberTopic);

      publisherConnection.start();
      subscriberConnection.start();
      // end of client step
   } catch (Exception e) {
      e.printStackTrace();
      throw new RuntimeException(e);
   }
}
 
源代码4 项目: activemq-artemis   文件: ConnectionFactoryTest.java
/**
 * Test that ConnectionFactory can be cast to TopicConnectionFactory and TopicConnection can be
 * created.
 */
@Test
public void testTopicConnectionFactory() throws Exception {
   deployConnectionFactory(0, JMSFactoryType.TOPIC_CF, "CF_TOPIC_XA_FALSE", "/CF_TOPIC_XA_FALSE");
   TopicConnectionFactory qcf = (TopicConnectionFactory) ic.lookup("/CF_TOPIC_XA_FALSE");
   TopicConnection tc = qcf.createTopicConnection();
   tc.close();
   undeployConnectionFactory("CF_TOPIC_XA_FALSE");
}
 
@Parameters({ "broker-port"})
@Test
public void testSubscriberPublisher(String port) throws Exception {
    String topicName = "MyTopic1";
    int numberOfMessages = 100;

    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder("admin", "admin", "localhost", port)
            .withTopic(topicName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    // Initialize subscriber
    TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
    TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);

    // publish 100 messages
    TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    TopicPublisher producer = producerSession.createPublisher(subscriberDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        producer.publish(producerSession.createTextMessage("Test message " + i));
    }

    producerSession.close();

    for (int i = 0; i < numberOfMessages; i++) {
        Message message = subscriber.receive(1000);
        Assert.assertNotNull(message, "Message #" + i + " was not received");
    }

    connection.close();
}
 
/**
 * This implementation delegates to the {@code createTopicConnection(username, password)}
 * method of the target TopicConnectionFactory, passing in the specified user credentials.
 * If the specified username is empty, it will simply delegate to the standard
 * {@code createTopicConnection()} method of the target ConnectionFactory.
 * @param username the username to use
 * @param password the password to use
 * @return the Connection
 * @see javax.jms.TopicConnectionFactory#createTopicConnection(String, String)
 * @see javax.jms.TopicConnectionFactory#createTopicConnection()
 */
protected TopicConnection doCreateTopicConnection(String username, String password) throws JMSException {
	Assert.state(this.targetConnectionFactory != null, "'targetConnectionFactory' is required");
	if (!(this.targetConnectionFactory instanceof TopicConnectionFactory)) {
		throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a TopicConnectionFactory");
	}
	TopicConnectionFactory queueFactory = (TopicConnectionFactory) this.targetConnectionFactory;
	if (StringUtils.hasLength(username)) {
		return queueFactory.createTopicConnection(username, password);
	}
	else {
		return queueFactory.createTopicConnection();
	}
}
 
private TopicConnection getTopicConnection(InitialContext initialContext) throws NamingException, JMSException {
    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();
    return connection;
}
 
private void assertNotNullWithPublishSubscribeForTopics(String publishTopicName,
                                                        String subscribeTopicName) throws Exception {

    int numberOfMessages = 100;

    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder("admin", "admin", "localhost", port)
            .withTopic(publishTopicName)
            .withTopic(subscribeTopicName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic subscriberDestination = (Topic) initialContext.lookup(subscribeTopicName);
    TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);

    TopicSession publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic publisherDestination = (Topic) initialContext.lookup(publishTopicName);
    TopicPublisher publisher = publisherSession.createPublisher(publisherDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        publisher.publish(publisherSession.createTextMessage("Test message " + i));
    }

    publisherSession.close();

    for (int i = 0; i < numberOfMessages; i++) {
        Message message = subscriber.receive(1000);
        Assert.assertNotNull(message, "Message #" + i + " was not received");
    }

    subscriberSession.close();
    connection.close();
}
 
@Parameters({ "broker-port", "admin-username", "admin-password" })
@Test(description = "Test user with valid credentials")
public void testValidClientConnection(String port, String adminUsername, String adminPassword) throws Exception {
    String topicName = "MyTopic1";
    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder(adminUsername, adminPassword, "localhost", port).withTopic(topicName).build();
    TopicConnectionFactory connectionFactory = (TopicConnectionFactory) initialContext
            .lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();
    connection.close();
}
 
@Parameters({ "broker-port", "test-username", "test-password" })
@Test(description = "Test valid user password with special characters")
public void testPasswordWithSpecialCharacters(String port, String testUsername, String testPassword) throws
        Exception {
    String topicName = "MyTopic1";
    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder(testUsername, testPassword, "localhost", port).withTopic(topicName).build();
    TopicConnectionFactory connectionFactory = (TopicConnectionFactory) initialContext
            .lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();
    connection.close();
}
 
@Parameters({ "broker-port", "admin-username" })
@Test(description = "Test user with invalid credentials",
      expectedExceptions = JMSException.class)
public void testInvalidClientConnection(String port, String adminUsername) throws Exception {
    String topicName = "MyTopic1";
    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder(adminUsername, "invalidPassword", "localhost", port).withTopic(topicName)
            .build();
    TopicConnectionFactory connectionFactory = (TopicConnectionFactory) initialContext
            .lookup(ClientHelper.CONNECTION_FACTORY);
    connectionFactory.createTopicConnection();
}
 
源代码12 项目: product-ei   文件: AndesJMSConsumer.java
/**
 * Creates a topic connection, session and receiver.
 *
 * @throws NamingException
 * @throws JMSException
 */
private void createTopicConnection() throws NamingException, JMSException {
    // Creates a topic connection, sessions and receiver
    TopicConnectionFactory connFactory = (TopicConnectionFactory) super.getInitialContext()
            .lookup(AndesClientConstants.CF_NAME);
    TopicConnection topicConnection = connFactory.createTopicConnection();
    topicConnection.setClientID(this.consumerConfig.getSubscriptionID());
    topicConnection.start();
    TopicSession topicSession;
    // Sets acknowledgement mode
    if (TopicSession.SESSION_TRANSACTED == this.consumerConfig.getAcknowledgeMode().getType()) {
        topicSession = topicConnection
                .createTopicSession(true, this.consumerConfig.getAcknowledgeMode().getType());
    } else {
        topicSession = topicConnection
                .createTopicSession(false, this.consumerConfig.getAcknowledgeMode().getType());
    }

    Topic topic =
            (Topic) super.getInitialContext().lookup(this.consumerConfig.getDestinationName());

    connection = topicConnection;
    session = topicSession;
    // If topic is durable
    if (this.consumerConfig.isDurable()) {
        // If selectors exists
        if (null != this.consumerConfig.getSelectors()) {
            receiver = topicSession.createDurableSubscriber(topic, this.consumerConfig
                    .getSubscriptionID(), this.consumerConfig.getSelectors(), false);
        } else {
            receiver = topicSession
                    .createDurableSubscriber(topic, this.consumerConfig.getSubscriptionID());
        }
    } else {
        // If selectors exists
        if (null != this.consumerConfig.getSelectors()) {
            receiver = topicSession
                    .createSubscriber(topic, this.consumerConfig.getSelectors(), false);
        } else {
            receiver = topicSession.createSubscriber(topic);
        }
    }
}
 
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void testMultipleTopicSubscribersOnSameSession(String port,
                                                      String adminUsername,
                                                      String adminPassword,
                                                      String brokerHostname)
        throws NamingException, JMSException, InterruptedException {
    String queueName = "testMultipleTopicSubscribersOnSameSession";
    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
            .withTopic(queueName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    TopicSession subscriberSession = connection.createTopicSession(false, TopicSession.CLIENT_ACKNOWLEDGE);
    Topic topic = (Topic) initialContext.lookup(queueName);

    int numberOfConsumers = 3;
    int messagesPerConsumer = 1000;
    int maxNumberOfMessages = numberOfConsumers * messagesPerConsumer;
    LinkedBlockingQueue<MessageResult> receiveQueue =  new LinkedBlockingQueue<>(maxNumberOfMessages);

    TopicSubscriber consumers[] = new TopicSubscriber[numberOfConsumers];
    int messageCount[] = new int[numberOfConsumers];

    for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
        consumers[consumerIndex] = subscriberSession.createSubscriber(topic);
        int finalConsumerIndex = consumerIndex;
        consumers[consumerIndex].setMessageListener(message -> {
            messageCount[finalConsumerIndex]++;
            try {
                message.acknowledge();
            } catch (JMSException e) {
                LOGGER.error("Message acknowledging failed.", e);
            }
            receiveQueue.offer(new MessageResult(message, finalConsumerIndex));
        });
    }

    // publish messages with property.
    TopicSession producerSession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
    TopicPublisher producer = producerSession.createPublisher(topic);

    TextMessage textMessage;
    String consumerMessage = "testMessage";
    for (int i = 0; i < messagesPerConsumer; i++) {
        textMessage = producerSession.createTextMessage(consumerMessage);
        producer.send(textMessage);
    }


    for (int i = 0; i < maxNumberOfMessages; i++) {
        MessageResult result = receiveQueue.poll(5, TimeUnit.SECONDS);
        if (result == null) {
            StringBuilder countSummary = new StringBuilder();
            for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
                countSummary.append("Consumer ")
                            .append(consumerIndex)
                            .append(" received ")
                            .append(messageCount[consumerIndex])
                            .append(" messages, ");
            }

            Assert.fail("Messages stopped receiving after " + i + " iterations. " + countSummary.toString());
        } else {
            TextMessage textMessage1 = (TextMessage) result.getMessage();
            Assert.assertEquals(textMessage1.getText(),
                                consumerMessage,
                                "Incorrect message received for consumer " + result.getConsumerId());
        }

    }

    for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
        Assert.assertEquals(messageCount[consumerIndex],
                            messagesPerConsumer,
                            "Message " + messageCount[consumerIndex]
                                    + " received for consumer " + consumerIndex + ".");
    }

    producer.close();
    for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
        consumers[consumerIndex].close();
    }

    connection.close();
}
 
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void test1966DurableTopicMessagesOrderSingleSubscriber(String port,
                                                              String adminUsername,
                                                              String adminPassword,
                                                              String brokerHostname)
        throws NamingException, JMSException {
    String topicName = "test1966DurableTopicMessagesOrderSingleSubscriber";
    List<String> subscriberOneMessages = new ArrayList<>();
    int numberOfMessages = 1966;

    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
            .withTopic(topicName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    // Initialize subscriber
    TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
    TopicSubscriber subscriber = subscriberSession.createDurableSubscriber(subscriberDestination, "1966_1");

    // publish 1966 messages
    TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    TopicPublisher producer = producerSession.createPublisher(subscriberDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        producer.publish(producerSession.createTextMessage(String.valueOf(i)));
    }

    producerSession.close();

    for (int i = 0; i < numberOfMessages; i++) {
        TextMessage message = (TextMessage) subscriber.receive(5000);
        Assert.assertNotNull(message, "Message #" + i + " was not received");
        subscriberOneMessages.add(message.getText());
    }

    subscriberSession.close();

    connection.close();

    // verify order is preserved
    boolean isOrderPreserved = true;
    for (int i = 0; i < numberOfMessages; i++) {
        if (!(i == Integer.parseInt(subscriberOneMessages.get(i)))) {
            isOrderPreserved = false;
            break;
        }
    }

    Assert.assertTrue(isOrderPreserved, "Topic messages order not preserved for single subscriber.");
}
 
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void testPublisherRollbackTransaction(String port,
                                             String adminUsername,
                                             String adminPassword,
                                             String brokerHostname) throws NamingException, JMSException {
    String topicName = "testPublisherRollbackTransaction";
    int numberOfMessages = 100;

    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
            .withTopic(topicName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    // initialize subscriber
    TopicSession subscriberSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
    Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
    TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);

    // publish 100 messages
    TopicSession producerSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
    TopicPublisher producer = producerSession.createPublisher(subscriberDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        producer.publish(producerSession.createTextMessage("Test message " + i));
    }
    // rollback all publish messages
    producerSession.rollback();

    // Consume published messages
    Message message = subscriber.receive(1000);
    Assert.assertNull(message, "Messages should not receive upon publisher rollback");

    producerSession.close();
    subscriberSession.close();
    connection.close();
}
 
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test(expectedExceptions = javax.jms.IllegalStateException.class,
        expectedExceptionsMessageRegExp = ".*Session is not transacted")
public void testRollbackOnNonTransactionTopicSession(String port,
                                                     String adminUsername,
                                                     String adminPassword,
                                                     String brokerHostname) throws NamingException, JMSException {
    String topicName = "testRollbackOnNonTransactionTopicSession";
    int numberOfMessages = 100;

    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
            .withTopic(topicName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    // Initialize subscriber
    TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
    TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);

    // publish 100 messages
    TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    TopicPublisher producer = producerSession.createPublisher(subscriberDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        producer.publish(producerSession.createTextMessage("Test message " + i));
    }
    try {
        // commit all publish messages
        producerSession.rollback();
        Message message = subscriber.receive(1000);
        Assert.assertNull(message, "Messages should not receive message after calling rollback on "
                                   + "non transaction channel");
    } catch (JMSException e) {
        throw e;
    } finally {
        producerSession.close();
        subscriberSession.close();
        connection.close();
    }
}
 
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void testPublisherCloseBeforeRollbackTransaction(String port,
                                                        String adminUsername,
                                                        String adminPassword,
                                                        String brokerHostname)
        throws NamingException, JMSException {
    String topicName = "testPublisherCloseBeforeRollbackTransaction";
    int numberOfMessages = 100;

    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
            .withTopic(topicName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    // initialize subscriber
    TopicSession subscriberSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
    Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
    TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);

    // publish 100 messages
    TopicSession producerSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
    TopicPublisher producer = producerSession.createPublisher(subscriberDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        producer.publish(producerSession.createTextMessage("Test message " + i));
    }

    // close publisher before rollback
    producer.close();

    // rollback all publish messages
    producerSession.rollback();

    Message message = subscriber.receive(1000);
    Assert.assertNull(message, "Messages should not receive upon publisher rollback");

    producerSession.close();
    subscriberSession.close();
    connection.close();
}
 
源代码18 项目: ballerina-message-broker   文件: JMSSelectorTest.java
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void testPositiveJMSSelectorConsumerProducer(String port,
                                                    String adminUsername,
                                                    String adminPassword,
                                                    String brokerHostname) throws NamingException, JMSException {
    String queueName = "testPositiveJMSSelectorConsumerProducer";
    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
            .withTopic(queueName)
            .build();


    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    TopicSession subscriberSession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
    Topic topic = (Topic) initialContext.lookup(queueName);

    // Subscribe with a selector
    String propertyName = "MyProperty";
    String propertyValue = "propertyValue";
    String jmsPropertySelector = propertyName + " = '" + propertyValue + "'";
    TopicSubscriber consumer = subscriberSession.createSubscriber(topic, jmsPropertySelector, false);

    // publish messages with property
    TopicSession producerSession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
    TopicPublisher producer = producerSession.createPublisher(topic);

    int numberOfMessages = 100;
    for (int i = 0; i < numberOfMessages; i++) {
        TextMessage textMessage = producerSession.createTextMessage("Test message " + i);
        textMessage.setStringProperty(propertyName, propertyValue);
        producer.send(textMessage);
    }

    // consume messages
    for (int i = 0; i < numberOfMessages; i++) {
        Message message = consumer.receive(1000);
        Assert.assertNotNull(message, "Message #" + i + " was not received");
    }

    producerSession.close();
    connection.close();
}
 
源代码19 项目: activemq-artemis   文件: ConnectionFactoryTest.java
@Test
public void testConnectionTypes() throws Exception {
   deployConnectionFactory(0, JMSFactoryType.CF, "ConnectionFactory", "/ConnectionFactory");
   deployConnectionFactory(0, JMSFactoryType.QUEUE_XA_CF, "CF_QUEUE_XA_TRUE", "/CF_QUEUE_XA_TRUE");
   deployConnectionFactory(0, JMSFactoryType.XA_CF, "CF_XA_TRUE", "/CF_XA_TRUE");
   deployConnectionFactory(0, JMSFactoryType.QUEUE_CF, "CF_QUEUE", "/CF_QUEUE");
   deployConnectionFactory(0, JMSFactoryType.TOPIC_CF, "CF_TOPIC", "/CF_TOPIC");
   deployConnectionFactory(0, JMSFactoryType.TOPIC_XA_CF, "CF_TOPIC_XA_TRUE", "/CF_TOPIC_XA_TRUE");

   Connection genericConnection = null;
   XAConnection xaConnection = null;
   QueueConnection queueConnection = null;
   TopicConnection topicConnection = null;
   XAQueueConnection xaQueueConnection = null;
   XATopicConnection xaTopicConnection = null;

   ConnectionFactory genericFactory = (ConnectionFactory) ic.lookup("/ConnectionFactory");
   genericConnection = genericFactory.createConnection();
   assertConnectionType(genericConnection, "generic");

   XAConnectionFactory xaFactory = (XAConnectionFactory) ic.lookup("/CF_XA_TRUE");
   xaConnection = xaFactory.createXAConnection();
   assertConnectionType(xaConnection, "xa");

   QueueConnectionFactory queueCF = (QueueConnectionFactory) ic.lookup("/CF_QUEUE");
   queueConnection = queueCF.createQueueConnection();
   assertConnectionType(queueConnection, "queue");

   TopicConnectionFactory topicCF = (TopicConnectionFactory) ic.lookup("/CF_TOPIC");
   topicConnection = topicCF.createTopicConnection();
   assertConnectionType(topicConnection, "topic");

   XAQueueConnectionFactory xaQueueCF = (XAQueueConnectionFactory) ic.lookup("/CF_QUEUE_XA_TRUE");
   xaQueueConnection = xaQueueCF.createXAQueueConnection();
   assertConnectionType(xaQueueConnection, "xa-queue");

   XATopicConnectionFactory xaTopicCF = (XATopicConnectionFactory) ic.lookup("/CF_TOPIC_XA_TRUE");
   xaTopicConnection = xaTopicCF.createXATopicConnection();
   assertConnectionType(xaTopicConnection, "xa-topic");

   genericConnection.close();
   xaConnection.close();
   queueConnection.close();
   topicConnection.close();
   xaQueueConnection.close();
   xaTopicConnection.close();

   undeployConnectionFactory("ConnectionFactory");
   undeployConnectionFactory("CF_QUEUE_XA_TRUE");
   undeployConnectionFactory("CF_XA_TRUE");
   undeployConnectionFactory("CF_QUEUE");
   undeployConnectionFactory("CF_TOPIC");
   undeployConnectionFactory("CF_TOPIC_XA_TRUE");
}
 
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test(expectedExceptions = javax.jms.IllegalStateException.class,
        expectedExceptionsMessageRegExp = ".*Session is not transacted")
public void testCommitOnNonTransactionTopicSession(String port,
        String adminUsername,
        String adminPassword,
        String brokerHostname) throws NamingException, JMSException {
    String topicName = "testCommitOnNonTransactionTopicSession";
    int numberOfMessages = 100;

    InitialContext initialContext = ClientHelper
            .getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
            .withTopic(topicName)
            .build();

    TopicConnectionFactory connectionFactory
            = (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
    TopicConnection connection = connectionFactory.createTopicConnection();
    connection.start();

    // initialize subscriber
    TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
    TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);

    // publish 100 messages
    TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
    TopicPublisher producer = producerSession.createPublisher(subscriberDestination);

    for (int i = 0; i < numberOfMessages; i++) {
        producer.publish(producerSession.createTextMessage("Test message " + i));
    }

    try {
        // commit all publish messages
        producerSession.commit();

        Message message = subscriber.receive(1000);
        Assert.assertNull(message, "Messages should not receive message after calling commit on "
                                   + "non transaction channel");

    } catch (JMSException e) {
        //catch exception and re-throw it since we need the connection to be closed
        throw e;
    } finally {
        producerSession.close();
        subscriberSession.close();
        connection.close();
    }
}