下面列出了javax.jms.TopicConnectionFactory#createTopicConnection ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static void publish() throws JMSException {
// get topic connection
TopicConnectionFactory connectionFactory = new WeEventConnectionFactory(defaultBrokerUrl);
TopicConnection connection = connectionFactory.createTopicConnection();
// start connection
connection.start();
// create session
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// create topic
Topic topic = session.createTopic(topicName);
// create publisher
TopicPublisher publisher = session.createPublisher(topic);
// send message
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(("hello WeEvent").getBytes(StandardCharsets.UTF_8));
publisher.publish(msg);
System.out.print("send done.");
connection.close();
}
public JMSSink(final String tcfBindingName, final String topicBindingName, final String username,
final String password) {
try {
final Context ctx = new InitialContext();
final TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) lookup(ctx,
tcfBindingName);
final TopicConnection topicConnection =
topicConnectionFactory.createTopicConnection(username,
password);
topicConnection.start();
final TopicSession topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
final Topic topic = (Topic) ctx.lookup(topicBindingName);
final TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);
topicSubscriber.setMessageListener(this);
} catch (final Exception e) {
logger.error("Could not read JMS message.", e);
}
}
/**
* Create all administrated objects connections and sessions ready to use for tests.
* <br />
* Start connections.
*/
@Override
@Before
public void setUp() throws Exception {
super.setUp();
try {
// ...and creates administrated objects and binds them
admin.createTopicConnectionFactory(PubSubTestCase.TCF_NAME);
admin.createTopic(PubSubTestCase.TOPIC_NAME);
Context ctx = admin.createContext();
publisherTCF = (TopicConnectionFactory) ctx.lookup(PubSubTestCase.TCF_NAME);
publisherTopic = (Topic) ctx.lookup(PubSubTestCase.TOPIC_NAME);
publisherConnection = publisherTCF.createTopicConnection();
publisherConnection.setClientID("publisherConnection");
publisherSession = publisherConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
publisher = publisherSession.createPublisher(publisherTopic);
subscriberTCF = (TopicConnectionFactory) ctx.lookup(PubSubTestCase.TCF_NAME);
subscriberTopic = (Topic) ctx.lookup(PubSubTestCase.TOPIC_NAME);
subscriberConnection = subscriberTCF.createTopicConnection();
subscriberConnection.setClientID("subscriberConnection");
subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
subscriber = subscriberSession.createSubscriber(subscriberTopic);
publisherConnection.start();
subscriberConnection.start();
// end of client step
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
/**
* Test that ConnectionFactory can be cast to TopicConnectionFactory and TopicConnection can be
* created.
*/
@Test
public void testTopicConnectionFactory() throws Exception {
deployConnectionFactory(0, JMSFactoryType.TOPIC_CF, "CF_TOPIC_XA_FALSE", "/CF_TOPIC_XA_FALSE");
TopicConnectionFactory qcf = (TopicConnectionFactory) ic.lookup("/CF_TOPIC_XA_FALSE");
TopicConnection tc = qcf.createTopicConnection();
tc.close();
undeployConnectionFactory("CF_TOPIC_XA_FALSE");
}
@Parameters({ "broker-port"})
@Test
public void testSubscriberPublisher(String port) throws Exception {
String topicName = "MyTopic1";
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder("admin", "admin", "localhost", port)
.withTopic(topicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
// Initialize subscriber
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
// publish 100 messages
TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher producer = producerSession.createPublisher(subscriberDestination);
for (int i = 0; i < numberOfMessages; i++) {
producer.publish(producerSession.createTextMessage("Test message " + i));
}
producerSession.close();
for (int i = 0; i < numberOfMessages; i++) {
Message message = subscriber.receive(1000);
Assert.assertNotNull(message, "Message #" + i + " was not received");
}
connection.close();
}
/**
* This implementation delegates to the {@code createTopicConnection(username, password)}
* method of the target TopicConnectionFactory, passing in the specified user credentials.
* If the specified username is empty, it will simply delegate to the standard
* {@code createTopicConnection()} method of the target ConnectionFactory.
* @param username the username to use
* @param password the password to use
* @return the Connection
* @see javax.jms.TopicConnectionFactory#createTopicConnection(String, String)
* @see javax.jms.TopicConnectionFactory#createTopicConnection()
*/
protected TopicConnection doCreateTopicConnection(String username, String password) throws JMSException {
Assert.state(this.targetConnectionFactory != null, "'targetConnectionFactory' is required");
if (!(this.targetConnectionFactory instanceof TopicConnectionFactory)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a TopicConnectionFactory");
}
TopicConnectionFactory queueFactory = (TopicConnectionFactory) this.targetConnectionFactory;
if (StringUtils.hasLength(username)) {
return queueFactory.createTopicConnection(username, password);
}
else {
return queueFactory.createTopicConnection();
}
}
private TopicConnection getTopicConnection(InitialContext initialContext) throws NamingException, JMSException {
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
return connection;
}
private void assertNotNullWithPublishSubscribeForTopics(String publishTopicName,
String subscribeTopicName) throws Exception {
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder("admin", "admin", "localhost", port)
.withTopic(publishTopicName)
.withTopic(subscribeTopicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subscriberDestination = (Topic) initialContext.lookup(subscribeTopicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
TopicSession publisherSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic publisherDestination = (Topic) initialContext.lookup(publishTopicName);
TopicPublisher publisher = publisherSession.createPublisher(publisherDestination);
for (int i = 0; i < numberOfMessages; i++) {
publisher.publish(publisherSession.createTextMessage("Test message " + i));
}
publisherSession.close();
for (int i = 0; i < numberOfMessages; i++) {
Message message = subscriber.receive(1000);
Assert.assertNotNull(message, "Message #" + i + " was not received");
}
subscriberSession.close();
connection.close();
}
@Parameters({ "broker-port", "admin-username", "admin-password" })
@Test(description = "Test user with valid credentials")
public void testValidClientConnection(String port, String adminUsername, String adminPassword) throws Exception {
String topicName = "MyTopic1";
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, "localhost", port).withTopic(topicName).build();
TopicConnectionFactory connectionFactory = (TopicConnectionFactory) initialContext
.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
connection.close();
}
@Parameters({ "broker-port", "test-username", "test-password" })
@Test(description = "Test valid user password with special characters")
public void testPasswordWithSpecialCharacters(String port, String testUsername, String testPassword) throws
Exception {
String topicName = "MyTopic1";
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(testUsername, testPassword, "localhost", port).withTopic(topicName).build();
TopicConnectionFactory connectionFactory = (TopicConnectionFactory) initialContext
.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
connection.close();
}
@Parameters({ "broker-port", "admin-username" })
@Test(description = "Test user with invalid credentials",
expectedExceptions = JMSException.class)
public void testInvalidClientConnection(String port, String adminUsername) throws Exception {
String topicName = "MyTopic1";
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(adminUsername, "invalidPassword", "localhost", port).withTopic(topicName)
.build();
TopicConnectionFactory connectionFactory = (TopicConnectionFactory) initialContext
.lookup(ClientHelper.CONNECTION_FACTORY);
connectionFactory.createTopicConnection();
}
/**
* Creates a topic connection, session and receiver.
*
* @throws NamingException
* @throws JMSException
*/
private void createTopicConnection() throws NamingException, JMSException {
// Creates a topic connection, sessions and receiver
TopicConnectionFactory connFactory = (TopicConnectionFactory) super.getInitialContext()
.lookup(AndesClientConstants.CF_NAME);
TopicConnection topicConnection = connFactory.createTopicConnection();
topicConnection.setClientID(this.consumerConfig.getSubscriptionID());
topicConnection.start();
TopicSession topicSession;
// Sets acknowledgement mode
if (TopicSession.SESSION_TRANSACTED == this.consumerConfig.getAcknowledgeMode().getType()) {
topicSession = topicConnection
.createTopicSession(true, this.consumerConfig.getAcknowledgeMode().getType());
} else {
topicSession = topicConnection
.createTopicSession(false, this.consumerConfig.getAcknowledgeMode().getType());
}
Topic topic =
(Topic) super.getInitialContext().lookup(this.consumerConfig.getDestinationName());
connection = topicConnection;
session = topicSession;
// If topic is durable
if (this.consumerConfig.isDurable()) {
// If selectors exists
if (null != this.consumerConfig.getSelectors()) {
receiver = topicSession.createDurableSubscriber(topic, this.consumerConfig
.getSubscriptionID(), this.consumerConfig.getSelectors(), false);
} else {
receiver = topicSession
.createDurableSubscriber(topic, this.consumerConfig.getSubscriptionID());
}
} else {
// If selectors exists
if (null != this.consumerConfig.getSelectors()) {
receiver = topicSession
.createSubscriber(topic, this.consumerConfig.getSelectors(), false);
} else {
receiver = topicSession.createSubscriber(topic);
}
}
}
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void testMultipleTopicSubscribersOnSameSession(String port,
String adminUsername,
String adminPassword,
String brokerHostname)
throws NamingException, JMSException, InterruptedException {
String queueName = "testMultipleTopicSubscribersOnSameSession";
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withTopic(queueName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, TopicSession.CLIENT_ACKNOWLEDGE);
Topic topic = (Topic) initialContext.lookup(queueName);
int numberOfConsumers = 3;
int messagesPerConsumer = 1000;
int maxNumberOfMessages = numberOfConsumers * messagesPerConsumer;
LinkedBlockingQueue<MessageResult> receiveQueue = new LinkedBlockingQueue<>(maxNumberOfMessages);
TopicSubscriber consumers[] = new TopicSubscriber[numberOfConsumers];
int messageCount[] = new int[numberOfConsumers];
for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
consumers[consumerIndex] = subscriberSession.createSubscriber(topic);
int finalConsumerIndex = consumerIndex;
consumers[consumerIndex].setMessageListener(message -> {
messageCount[finalConsumerIndex]++;
try {
message.acknowledge();
} catch (JMSException e) {
LOGGER.error("Message acknowledging failed.", e);
}
receiveQueue.offer(new MessageResult(message, finalConsumerIndex));
});
}
// publish messages with property.
TopicSession producerSession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
TopicPublisher producer = producerSession.createPublisher(topic);
TextMessage textMessage;
String consumerMessage = "testMessage";
for (int i = 0; i < messagesPerConsumer; i++) {
textMessage = producerSession.createTextMessage(consumerMessage);
producer.send(textMessage);
}
for (int i = 0; i < maxNumberOfMessages; i++) {
MessageResult result = receiveQueue.poll(5, TimeUnit.SECONDS);
if (result == null) {
StringBuilder countSummary = new StringBuilder();
for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
countSummary.append("Consumer ")
.append(consumerIndex)
.append(" received ")
.append(messageCount[consumerIndex])
.append(" messages, ");
}
Assert.fail("Messages stopped receiving after " + i + " iterations. " + countSummary.toString());
} else {
TextMessage textMessage1 = (TextMessage) result.getMessage();
Assert.assertEquals(textMessage1.getText(),
consumerMessage,
"Incorrect message received for consumer " + result.getConsumerId());
}
}
for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
Assert.assertEquals(messageCount[consumerIndex],
messagesPerConsumer,
"Message " + messageCount[consumerIndex]
+ " received for consumer " + consumerIndex + ".");
}
producer.close();
for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
consumers[consumerIndex].close();
}
connection.close();
}
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void test1966DurableTopicMessagesOrderSingleSubscriber(String port,
String adminUsername,
String adminPassword,
String brokerHostname)
throws NamingException, JMSException {
String topicName = "test1966DurableTopicMessagesOrderSingleSubscriber";
List<String> subscriberOneMessages = new ArrayList<>();
int numberOfMessages = 1966;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withTopic(topicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
// Initialize subscriber
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
TopicSubscriber subscriber = subscriberSession.createDurableSubscriber(subscriberDestination, "1966_1");
// publish 1966 messages
TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher producer = producerSession.createPublisher(subscriberDestination);
for (int i = 0; i < numberOfMessages; i++) {
producer.publish(producerSession.createTextMessage(String.valueOf(i)));
}
producerSession.close();
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message = (TextMessage) subscriber.receive(5000);
Assert.assertNotNull(message, "Message #" + i + " was not received");
subscriberOneMessages.add(message.getText());
}
subscriberSession.close();
connection.close();
// verify order is preserved
boolean isOrderPreserved = true;
for (int i = 0; i < numberOfMessages; i++) {
if (!(i == Integer.parseInt(subscriberOneMessages.get(i)))) {
isOrderPreserved = false;
break;
}
}
Assert.assertTrue(isOrderPreserved, "Topic messages order not preserved for single subscriber.");
}
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void testPublisherRollbackTransaction(String port,
String adminUsername,
String adminPassword,
String brokerHostname) throws NamingException, JMSException {
String topicName = "testPublisherRollbackTransaction";
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withTopic(topicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
// initialize subscriber
TopicSession subscriberSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
// publish 100 messages
TopicSession producerSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
TopicPublisher producer = producerSession.createPublisher(subscriberDestination);
for (int i = 0; i < numberOfMessages; i++) {
producer.publish(producerSession.createTextMessage("Test message " + i));
}
// rollback all publish messages
producerSession.rollback();
// Consume published messages
Message message = subscriber.receive(1000);
Assert.assertNull(message, "Messages should not receive upon publisher rollback");
producerSession.close();
subscriberSession.close();
connection.close();
}
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test(expectedExceptions = javax.jms.IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*Session is not transacted")
public void testRollbackOnNonTransactionTopicSession(String port,
String adminUsername,
String adminPassword,
String brokerHostname) throws NamingException, JMSException {
String topicName = "testRollbackOnNonTransactionTopicSession";
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withTopic(topicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
// Initialize subscriber
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
// publish 100 messages
TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher producer = producerSession.createPublisher(subscriberDestination);
for (int i = 0; i < numberOfMessages; i++) {
producer.publish(producerSession.createTextMessage("Test message " + i));
}
try {
// commit all publish messages
producerSession.rollback();
Message message = subscriber.receive(1000);
Assert.assertNull(message, "Messages should not receive message after calling rollback on "
+ "non transaction channel");
} catch (JMSException e) {
throw e;
} finally {
producerSession.close();
subscriberSession.close();
connection.close();
}
}
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void testPublisherCloseBeforeRollbackTransaction(String port,
String adminUsername,
String adminPassword,
String brokerHostname)
throws NamingException, JMSException {
String topicName = "testPublisherCloseBeforeRollbackTransaction";
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withTopic(topicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
// initialize subscriber
TopicSession subscriberSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
// publish 100 messages
TopicSession producerSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
TopicPublisher producer = producerSession.createPublisher(subscriberDestination);
for (int i = 0; i < numberOfMessages; i++) {
producer.publish(producerSession.createTextMessage("Test message " + i));
}
// close publisher before rollback
producer.close();
// rollback all publish messages
producerSession.rollback();
Message message = subscriber.receive(1000);
Assert.assertNull(message, "Messages should not receive upon publisher rollback");
producerSession.close();
subscriberSession.close();
connection.close();
}
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void testPositiveJMSSelectorConsumerProducer(String port,
String adminUsername,
String adminPassword,
String brokerHostname) throws NamingException, JMSException {
String queueName = "testPositiveJMSSelectorConsumerProducer";
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withTopic(queueName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = (Topic) initialContext.lookup(queueName);
// Subscribe with a selector
String propertyName = "MyProperty";
String propertyValue = "propertyValue";
String jmsPropertySelector = propertyName + " = '" + propertyValue + "'";
TopicSubscriber consumer = subscriberSession.createSubscriber(topic, jmsPropertySelector, false);
// publish messages with property
TopicSession producerSession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
TopicPublisher producer = producerSession.createPublisher(topic);
int numberOfMessages = 100;
for (int i = 0; i < numberOfMessages; i++) {
TextMessage textMessage = producerSession.createTextMessage("Test message " + i);
textMessage.setStringProperty(propertyName, propertyValue);
producer.send(textMessage);
}
// consume messages
for (int i = 0; i < numberOfMessages; i++) {
Message message = consumer.receive(1000);
Assert.assertNotNull(message, "Message #" + i + " was not received");
}
producerSession.close();
connection.close();
}
@Test
public void testConnectionTypes() throws Exception {
deployConnectionFactory(0, JMSFactoryType.CF, "ConnectionFactory", "/ConnectionFactory");
deployConnectionFactory(0, JMSFactoryType.QUEUE_XA_CF, "CF_QUEUE_XA_TRUE", "/CF_QUEUE_XA_TRUE");
deployConnectionFactory(0, JMSFactoryType.XA_CF, "CF_XA_TRUE", "/CF_XA_TRUE");
deployConnectionFactory(0, JMSFactoryType.QUEUE_CF, "CF_QUEUE", "/CF_QUEUE");
deployConnectionFactory(0, JMSFactoryType.TOPIC_CF, "CF_TOPIC", "/CF_TOPIC");
deployConnectionFactory(0, JMSFactoryType.TOPIC_XA_CF, "CF_TOPIC_XA_TRUE", "/CF_TOPIC_XA_TRUE");
Connection genericConnection = null;
XAConnection xaConnection = null;
QueueConnection queueConnection = null;
TopicConnection topicConnection = null;
XAQueueConnection xaQueueConnection = null;
XATopicConnection xaTopicConnection = null;
ConnectionFactory genericFactory = (ConnectionFactory) ic.lookup("/ConnectionFactory");
genericConnection = genericFactory.createConnection();
assertConnectionType(genericConnection, "generic");
XAConnectionFactory xaFactory = (XAConnectionFactory) ic.lookup("/CF_XA_TRUE");
xaConnection = xaFactory.createXAConnection();
assertConnectionType(xaConnection, "xa");
QueueConnectionFactory queueCF = (QueueConnectionFactory) ic.lookup("/CF_QUEUE");
queueConnection = queueCF.createQueueConnection();
assertConnectionType(queueConnection, "queue");
TopicConnectionFactory topicCF = (TopicConnectionFactory) ic.lookup("/CF_TOPIC");
topicConnection = topicCF.createTopicConnection();
assertConnectionType(topicConnection, "topic");
XAQueueConnectionFactory xaQueueCF = (XAQueueConnectionFactory) ic.lookup("/CF_QUEUE_XA_TRUE");
xaQueueConnection = xaQueueCF.createXAQueueConnection();
assertConnectionType(xaQueueConnection, "xa-queue");
XATopicConnectionFactory xaTopicCF = (XATopicConnectionFactory) ic.lookup("/CF_TOPIC_XA_TRUE");
xaTopicConnection = xaTopicCF.createXATopicConnection();
assertConnectionType(xaTopicConnection, "xa-topic");
genericConnection.close();
xaConnection.close();
queueConnection.close();
topicConnection.close();
xaQueueConnection.close();
xaTopicConnection.close();
undeployConnectionFactory("ConnectionFactory");
undeployConnectionFactory("CF_QUEUE_XA_TRUE");
undeployConnectionFactory("CF_XA_TRUE");
undeployConnectionFactory("CF_QUEUE");
undeployConnectionFactory("CF_TOPIC");
undeployConnectionFactory("CF_TOPIC_XA_TRUE");
}
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test(expectedExceptions = javax.jms.IllegalStateException.class,
expectedExceptionsMessageRegExp = ".*Session is not transacted")
public void testCommitOnNonTransactionTopicSession(String port,
String adminUsername,
String adminPassword,
String brokerHostname) throws NamingException, JMSException {
String topicName = "testCommitOnNonTransactionTopicSession";
int numberOfMessages = 100;
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withTopic(topicName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
// initialize subscriber
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic subscriberDestination = (Topic) initialContext.lookup(topicName);
TopicSubscriber subscriber = subscriberSession.createSubscriber(subscriberDestination);
// publish 100 messages
TopicSession producerSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher producer = producerSession.createPublisher(subscriberDestination);
for (int i = 0; i < numberOfMessages; i++) {
producer.publish(producerSession.createTextMessage("Test message " + i));
}
try {
// commit all publish messages
producerSession.commit();
Message message = subscriber.receive(1000);
Assert.assertNull(message, "Messages should not receive message after calling commit on "
+ "non transaction channel");
} catch (JMSException e) {
//catch exception and re-throw it since we need the connection to be closed
throw e;
} finally {
producerSession.close();
subscriberSession.close();
connection.close();
}
}