下面列出了怎么用javax.jms.XATopicConnectionFactory的API类实例代码及写法,或者点击链接到github查看源代码。
@Test(expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = "Error while committing dtx session.*")
public void testOnePhaseCommitAfterPrepare() throws Exception {
String topicName = "testSubscriberWithCommit";
String testMessage = "testSubscriberWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory().withTopic(topicName).build();
XATopicConnectionFactory xaTopicConnectionFactory =
(XATopicConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
xaConnection = xaTopicConnectionFactory.createXATopicConnection();
xaSession = xaConnection.createXATopicSession();
xaResource = xaSession.getXAResource();
Topic topic = (Topic) initialContext.lookup(topicName);
MessageProducer producer = xaSession.createProducer(topic);
xaConnection.start();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(xaSession.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int response = xaResource.prepare(xid);
Assert.assertEquals(response, XAResource.XA_OK, "Prepare stage failed.");
xaResource.commit(xid, true);
}
@Test(expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = "Error while committing dtx session.*")
public void testTwoPhaseCommitWithoutPrepare() throws Exception {
String topicName = "testSubscriberWithCommit";
String testMessage = "testSubscriberWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory().withTopic(topicName).build();
XATopicConnectionFactory xaTopicConnectionFactory =
(XATopicConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
xaConnection = xaTopicConnectionFactory.createXATopicConnection();
xaSession = xaConnection.createXATopicSession();
xaResource = xaSession.getXAResource();
Topic topic = (Topic) initialContext.lookup(topicName);
MessageProducer producer = xaSession.createProducer(topic);
xaConnection.start();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(xaSession.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.commit(xid, false);
}
@AfterMethod
public void tearDown() throws Exception {
restApiClient.close();
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory().build();
XATopicConnectionFactory xaTopicConnectionFactory =
(XATopicConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XATopicConnection xaTopicConnection = xaTopicConnectionFactory.createXATopicConnection();
xaSession = xaTopicConnection.createXATopicSession();
xaResource = xaSession.getXAResource();
xaResource.rollback(xid);
xaSession.close();
xaTopicConnection.close();
xaConnection.close();
}
@Test
public void testPublisherWithRollback() throws Exception {
String subscriptionId = "sub-testPublisherWithRollback";
String topicName = "testPublisherWithRollback";
String testMessage = "testPublisherWithRollback-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory().withTopic(topicName).build();
Topic topic = (Topic) initialContext.lookup(topicName);
// Setup XA producer.
XATopicConnectionFactory xaTopicConnectionFactory =
(XATopicConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XATopicConnection xaTopicConnection = xaTopicConnectionFactory.createXATopicConnection();
XATopicSession xaTopicSession = xaTopicConnection.createXATopicSession();
XAResource xaResource = xaTopicSession.getXAResource();
MessageProducer producer = xaTopicSession.createProducer(topic);
// Setup non-transactional consumer.
TopicSession topicSession = xaTopicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
TopicSubscriber durableSubscriber = topicSession.createDurableSubscriber(topic, subscriptionId);
xaTopicConnection.start();
// Send message withing a XA transaction.
XidImpl xid = new XidImpl(0, "branchId".getBytes(), "globalId".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(xaTopicSession.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int response = xaResource.prepare(xid);
Assert.assertEquals(response, XAResource.XA_OK, "Prepare stage failed.");
xaResource.rollback(xid);
durableSubscriber.close();
xaTopicSession.close();
xaTopicConnection.close();
QueueMetadata queueMetadata = restApiClient.getQueueMetadata("carbon:" + subscriptionId);
Assert.assertEquals((int) queueMetadata.getSize(), 0, "Queue is not empty");
}
private void assertNTypes(ActiveMQConnectionFactory factory, final int total) {
StringBuilder text = new StringBuilder();
text.append(factory + "\n is instance of ");
int num = 0;
if (factory instanceof ConnectionFactory) {
num++;
text.append("ConnectionFactory ");
}
if (factory instanceof XAConnectionFactory) {
num++;
text.append("XAConnectionFactory ");
}
if (factory instanceof QueueConnectionFactory) {
num++;
text.append("QueueConnectionFactory ");
}
if (factory instanceof TopicConnectionFactory) {
num++;
text.append("TopicConnectionFactory ");
}
if (factory instanceof XAQueueConnectionFactory) {
num++;
text.append("XAQueueConnectionFactory ");
}
if (factory instanceof XATopicConnectionFactory) {
num++;
text.append("XATopicConnectionFactory ");
}
Assert.assertEquals(text.toString(), total, num);
}
TracingConnectionFactory(Object delegate, JmsTracing jmsTracing) {
this.delegate = delegate;
this.jmsTracing = jmsTracing;
int types = 0;
if (delegate instanceof ConnectionFactory) types |= TYPE_CF;
if (delegate instanceof QueueConnectionFactory) types |= TYPE_QUEUE_CF;
if (delegate instanceof TopicConnectionFactory) types |= TYPE_TOPIC_CF;
if (delegate instanceof XAConnectionFactory) types |= TYPE_XA_CF;
if (delegate instanceof XAQueueConnectionFactory) types |= TYPE_XA_QUEUE_CF;
if (delegate instanceof XATopicConnectionFactory) types |= TYPE_XA_TOPIC_CF;
this.types = types;
}
@Override public XATopicConnection createXATopicConnection(String userName, String password)
throws JMSException {
checkTopicConnectionFactory();
XATopicConnectionFactory xaqcf = (XATopicConnectionFactory) delegate;
return TracingXAConnection.create(xaqcf.createXATopicConnection(userName, password),
jmsTracing);
}
@Test
public void testPublisherWithCommit() throws Exception {
String subscriptionId = "sub-testPublisherWithCommit";
String topicName = "testPublisherWithCommit";
String testMessage = "testPublisherWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory().withTopic(topicName).build();
Topic topic = (Topic) initialContext.lookup(topicName);
// Setup XA producer.
XATopicConnectionFactory xaTopicConnectionFactory =
(XATopicConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XATopicConnection xaTopicConnection = xaTopicConnectionFactory.createXATopicConnection();
XATopicSession xaTopicSession = xaTopicConnection.createXATopicSession();
XAResource xaResource = xaTopicSession.getXAResource();
MessageProducer producer = xaTopicSession.createProducer(topic);
// Setup non-transactional consumer.
TopicSession topicSession = xaTopicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
TopicSubscriber durableSubscriber = topicSession.createDurableSubscriber(topic, subscriptionId);
xaTopicConnection.start();
// Send message within XA transaction.
XidImpl xid = new XidImpl(0, "branchId".getBytes(), "globalId".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(xaTopicSession.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int response = xaResource.prepare(xid);
Assert.assertEquals(response, XAResource.XA_OK, "Prepare stage failed.");
xaResource.commit(xid, false);
TextMessage message = (TextMessage) durableSubscriber.receive(2000);
Assert.assertNotNull(message, "Didn't receive a message");
Assert.assertEquals(message.getText(), testMessage, "Received message content didn't match sent message.");
topicSession.close();
xaTopicSession.close();
xaTopicConnection.close();
}
@Test
public void testSubscriberWithCommit() throws Exception {
String subscriptionId = "sub-testSubscriberWithCommit";
String topicName = "testSubscriberWithCommit";
String testMessage = "testSubscriberWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory().withTopic(topicName).build();
Topic topic = (Topic) initialContext.lookup(topicName);
// Create XA consumer.
XATopicConnectionFactory xaTopicConnectionFactory =
(XATopicConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XATopicConnection xaTopicConnection = xaTopicConnectionFactory.createXATopicConnection();
XATopicSession xaTopicSession = xaTopicConnection.createXATopicSession();
XAResource xaResource = xaTopicSession.getXAResource();
TopicSubscriber durableSubscriber = xaTopicSession.createDurableSubscriber(topic, subscriptionId);
// Create non transactional producer.
TopicSession topicSession = xaTopicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
MessageProducer producer = topicSession.createProducer(topic);
xaTopicConnection.start();
producer.send(xaTopicSession.createTextMessage(testMessage));
// Consume message within a XA transaction.
XidImpl xid = new XidImpl(0, "branchId".getBytes(), "globalId".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
TextMessage message = (TextMessage) durableSubscriber.receive(2000);
xaResource.end(xid, XAResource.TMSUCCESS);
int response = xaResource.prepare(xid);
Assert.assertEquals(response, XAResource.XA_OK, "Prepare stage failed.");
xaResource.commit(xid, false);
Assert.assertNotNull(message, "Didn't receive a message");
Assert.assertEquals(message.getText(), testMessage, "Received message content didn't match sent message.");
topicSession.close();
xaTopicSession.close();
xaTopicConnection.close();
QueueMetadata queueMetadata = restApiClient.getQueueMetadata("carbon:" + subscriptionId);
Assert.assertEquals((int) queueMetadata.getSize(), 0, "Queue should be empty.");
}
@Test
public void testSubscriberWithRollback() throws Exception {
String subscriptionId = "sub-testSubscriberWithRollback";
String topicName = "testSubscriberWithCommit";
String testMessage = "testSubscriberWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory().withTopic(topicName).build();
Topic topic = (Topic) initialContext.lookup(topicName);
// Setup XA consumer.
XATopicConnectionFactory xaTopicConnectionFactory =
(XATopicConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XATopicConnection xaTopicConnection = xaTopicConnectionFactory.createXATopicConnection();
XATopicSession xaTopicSession = xaTopicConnection.createXATopicSession();
XAResource xaResource = xaTopicSession.getXAResource();
TopicSubscriber durableSubscriber = xaTopicSession.createDurableSubscriber(topic, subscriptionId);
// Setup non-transactional message publisher.
TopicSession topicSession = xaTopicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
MessageProducer producer = topicSession.createProducer(topic);
xaTopicConnection.start();
producer.send(xaTopicSession.createTextMessage(testMessage));
// Consume messages within a XA transaction.
XidImpl xid = new XidImpl(0, "branchId".getBytes(), "globalId".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
TextMessage message = (TextMessage) durableSubscriber.receive(2000);
xaResource.end(xid, XAResource.TMSUCCESS);
int response = xaResource.prepare(xid);
Assert.assertEquals(response, XAResource.XA_OK, "Prepare stage failed.");
xaResource.rollback(xid);
Assert.assertNotNull(message, "Didn't receive a message");
Assert.assertEquals(message.getText(), testMessage, "Received message content didn't match sent message.");
topicSession.close();
xaTopicSession.close();
xaTopicConnection.close();
QueueMetadata queueMetadata = restApiClient.getQueueMetadata("carbon:" + subscriptionId);
Assert.assertEquals((int) queueMetadata.getSize(), 1, "Queue shouldn't be empty.");
}
@Test
public void testConnectionTypes() throws Exception {
deployConnectionFactory(0, JMSFactoryType.CF, "ConnectionFactory", "/ConnectionFactory");
deployConnectionFactory(0, JMSFactoryType.QUEUE_XA_CF, "CF_QUEUE_XA_TRUE", "/CF_QUEUE_XA_TRUE");
deployConnectionFactory(0, JMSFactoryType.XA_CF, "CF_XA_TRUE", "/CF_XA_TRUE");
deployConnectionFactory(0, JMSFactoryType.QUEUE_CF, "CF_QUEUE", "/CF_QUEUE");
deployConnectionFactory(0, JMSFactoryType.TOPIC_CF, "CF_TOPIC", "/CF_TOPIC");
deployConnectionFactory(0, JMSFactoryType.TOPIC_XA_CF, "CF_TOPIC_XA_TRUE", "/CF_TOPIC_XA_TRUE");
Connection genericConnection = null;
XAConnection xaConnection = null;
QueueConnection queueConnection = null;
TopicConnection topicConnection = null;
XAQueueConnection xaQueueConnection = null;
XATopicConnection xaTopicConnection = null;
ConnectionFactory genericFactory = (ConnectionFactory) ic.lookup("/ConnectionFactory");
genericConnection = genericFactory.createConnection();
assertConnectionType(genericConnection, "generic");
XAConnectionFactory xaFactory = (XAConnectionFactory) ic.lookup("/CF_XA_TRUE");
xaConnection = xaFactory.createXAConnection();
assertConnectionType(xaConnection, "xa");
QueueConnectionFactory queueCF = (QueueConnectionFactory) ic.lookup("/CF_QUEUE");
queueConnection = queueCF.createQueueConnection();
assertConnectionType(queueConnection, "queue");
TopicConnectionFactory topicCF = (TopicConnectionFactory) ic.lookup("/CF_TOPIC");
topicConnection = topicCF.createTopicConnection();
assertConnectionType(topicConnection, "topic");
XAQueueConnectionFactory xaQueueCF = (XAQueueConnectionFactory) ic.lookup("/CF_QUEUE_XA_TRUE");
xaQueueConnection = xaQueueCF.createXAQueueConnection();
assertConnectionType(xaQueueConnection, "xa-queue");
XATopicConnectionFactory xaTopicCF = (XATopicConnectionFactory) ic.lookup("/CF_TOPIC_XA_TRUE");
xaTopicConnection = xaTopicCF.createXATopicConnection();
assertConnectionType(xaTopicConnection, "xa-topic");
genericConnection.close();
xaConnection.close();
queueConnection.close();
topicConnection.close();
xaQueueConnection.close();
xaTopicConnection.close();
undeployConnectionFactory("ConnectionFactory");
undeployConnectionFactory("CF_QUEUE_XA_TRUE");
undeployConnectionFactory("CF_XA_TRUE");
undeployConnectionFactory("CF_QUEUE");
undeployConnectionFactory("CF_TOPIC");
undeployConnectionFactory("CF_TOPIC_XA_TRUE");
}
@Override public XATopicConnection createXATopicConnection() throws JMSException {
checkTopicConnectionFactory();
XATopicConnectionFactory xaqcf = (XATopicConnectionFactory) delegate;
return TracingXAConnection.create(xaqcf.createXATopicConnection(), jmsTracing);
}