下面列出了javax.jms.XAConnection#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void testRollbackXaErrorCode() throws Exception {
String brokerName = "rollbackErrorCode";
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
broker.start();
broker.waitUntilStarted();
ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
XAConnection connection = (XAConnection) cf.createConnection();
connection.start();
XASession session = connection.createXASession();
XAResource resource = session.getXAResource();
Xid tid = createXid();
try {
resource.rollback(tid);
fail("Expected xa exception on no tx");
} catch (XAException expected) {
LOG.info("got expected xa", expected);
assertEquals("no tx", XAException.XAER_NOTA, expected.errorCode);
}
connection.close();
broker.stop();
}
@Test
public void testGetSession2() throws Exception {
deployConnectionFactory(0, JMSFactoryType.CF, "ConnectionFactory", "/ConnectionFactory");
XAConnection conn = getXAConnectionFactory().createXAConnection();
XASession sess = conn.createXASession();
sess.getSession();
conn.close();
}
@Test
public void testPublisherWithRollback() throws NamingException, JMSException, XAException, IOException {
String queueName = "testPublisherWithRollback";
String testMessage = "testPublisherWithRollback-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
xaConnection.start();
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK");
xaResource.rollback(xid);
// Check whether the message is published to queue.
QueueMetadata queueMetadata = restApiClient.getQueueMetadata(queueName);
Assert.assertEquals((int) queueMetadata.getSize(), 0, "Queue should be empty");
session.close();
xaConnection.close();
}
/**
* Tests if message acknowledgement works correctly with session suspend and resume. Steps are,
* 1. Publish a message to a queue
* 2. Using a distributed transacted session receive the message and suspend the session
* 3. Publish a message to the queue
* 4. Resume the session again, ack and commit
* 5. Subscribe again using a normal session and see if any message is received
*/
@Test(groups = { "wso2.mb", "dtx" })
public void suspendResumeMessageAckTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxStartPositiveTestCaseSuspendResumeMessageAckTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
// Publish message to queue
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueSession.createQueue(queueName);
MessageProducer messageProducer = queueSession.createProducer(xaTestQueue);
messageProducer.send(queueSession.createTextMessage("Test message 1"));
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
MessageConsumer xaConsumer = session.createConsumer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
Message receivedMessage = xaConsumer.receive(5000);
xaResource.end(xid, XAResource.TMSUSPEND);
Assert.assertNotNull(receivedMessage, "No message received");
messageProducer.send(queueSession.createTextMessage("Test message 2"));
messageProducer.close();
xaResource.start(xid, XAResource.TMRESUME);
receivedMessage = xaConsumer.receive(5000);
xaResource.end(xid, XAResource.TMSUCCESS);
Assert.assertNotNull(receivedMessage, "No message received");
int ret = xaResource.prepare(xid);
Assert.assertEquals(ret, XAResource.XA_OK, "Dtx.prepare was not successful.");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
// subscribe and see if the message is received
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receivedMessageFromNormalConnection = messageConsumer.receive(5000);
Assert.assertNull(receivedMessageFromNormalConnection, "Message received. Commit might have failed");
queueConnection.close();
}
@Test
public void testMultipleSessionsOneTxCommitSend1PCOptimization() throws Exception {
// Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
Connection conn2 = null;
try {
conn = xacf.createXAConnection();
conn.start();
tm.begin();
// Create 2 sessions and enlist them
XASession sess1 = conn.createXASession();
XAResource res1 = sess1.getXAResource();
XASession sess2 = conn.createXASession();
XAResource res2 = sess2.getXAResource();
Transaction tx = tm.getTransaction();
tx.enlistResource(res1);
tx.enlistResource(res2);
// Send 2 messages - one from each session
MessageProducer prod1 = sess1.createProducer(queue1);
MessageProducer prod2 = sess2.createProducer(queue1);
prod1.send(sess1.createTextMessage("echidna1"));
prod2.send(sess2.createTextMessage("echidna2"));
tx.delistResource(res1, XAResource.TMSUCCESS);
tx.delistResource(res2, XAResource.TMSUCCESS);
// commit
tm.commit();
// Messages should be in queue
conn2 = cf.createConnection();
Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = sess.createConsumer(queue1);
conn2.start();
TextMessage r1 = (TextMessage) cons.receive(5000);
Assert.assertNotNull(r1);
Assert.assertEquals("echidna1", r1.getText());
TextMessage r2 = (TextMessage) cons.receive(5000);
Assert.assertNotNull(r2);
Assert.assertEquals("echidna2", r2.getText());
} finally {
if (conn != null) {
conn.close();
}
if (conn2 != null) {
conn2.close();
}
}
}
/**
* Tests if resuming a new XID will throw an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while starting dtx session.*")
public void resumeANonExistingDtxBranch()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxStartTestCaseResumeANonExistingDtxBranch";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMRESUME);
// Below this line should not execute
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Tests if one-phase commit in distributed transactions, is working correctly for message consumption. Steps
* are,
* 1. Publish a message to a queue
* 2. Using a distributed transacted session receive the message and commit in one-phase
* 3. Subscribe again using a normal session and verify that a message is not received
*/
@Test(groups = { "wso2.mb", "dtx" })
public void onePhaseCommitMessageConsumptionTest() throws Exception {
String queueName = "DtxOnePhaseCommitMessageConsumptionTest";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort()).withQueue(queueName).build();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
// Publish message to queue
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueSession.createQueue(queueName);
MessageProducer messageProducer = queueSession.createProducer(xaTestQueue);
messageProducer.send(queueSession.createTextMessage("Test message consumption"));
messageProducer.close();
// Publish to queue and commit in one-phase
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
MessageConsumer xaConsumer = session.createConsumer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
Message receivedMessage = xaConsumer.receive(10000);
xaResource.end(xid, XAResource.TMSUCCESS);
Assert.assertNotNull(receivedMessage, "No message received");
xaResource.commit(xid, true);
session.close();
xaConnection.close();
// subscribe and see if the message is received
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receivedMessageAfterOnephaseCommit = messageConsumer.receive(10000);
Assert.assertNull(receivedMessageAfterOnephaseCommit,
"Message received. One phase commit might have failed");
queueConnection.close();
}
/**
* Tests if one-phase commit in distributed transactions, is working correctly for message publishing.
* Steps are,
* 1. Using a distributed transaction publish a message to a queue and commit in one-phase
* 2. Subscribe to the published queue and see if the message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performClientQueuePublishTestCase() throws Exception {
String queueName = "DtxOnePhaseCommitMessagePublishingTest";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort()).withQueue(queueName).build();
// Publish to queue and commit in one-phase
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test Message publishing"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.commit(xid, true);
session.close();
xaConnection.close();
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(10000);
Assert.assertNotNull(receive, "Message was not received. One-phase commit might have failed");
queueConnection.close();
}
/**
* Tests if committing a published message works correctly without a client ID.Steps are,
* 1. Using a distributed transaction a message is published to a queue and committed
* 2. Subscribe to the published queue and see if the message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performDtxClientQueuePublishTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "ClientIDNullTestCaseDtxPerformClientQueuePublishTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin",
"admin",
"localhost",
getAMQPPort())
.withNoClientId()
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
int ret = xaResource.prepare(xid);
Assert.assertEquals(ret, XAResource.XA_OK, "Dtx.prepare was not successful.");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(5000);
Assert.assertNotNull(receive, "Message was not received.");
queueConnection.close();
}
/**
* Tests if committing a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while committing dtx session.*")
public void commitDtxBranchWithoutStarting()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxCommitTestCaseCommitDtxBranchWithoutStarting";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
// xaResource.start(xid, XAResource.TMNOFLAGS);
// producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMSUCCESS);
// xaResource.prepare(xid);
xaResource.commit(xid, false);
session.close();
xaConnection.close();
}
/**
* Tests if ending DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while ending dtx session.*")
public void endDtxBranchWithoutStarting()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxEndTestCaseEndDtxBranchWithoutStarting";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
// xaResource.start(xid, XAResource.TMJOIN);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
@Test
public void testOneSessionTwoTransactionsCommitAcknowledge() throws Exception {
XAConnection conn = null;
Connection conn2 = null;
try {
// First send 2 messages
conn2 = cf.createConnection();
Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sessProducer.createProducer(queue1);
Message m = sessProducer.createTextMessage("jellyfish1");
prod.send(m);
m = sessProducer.createTextMessage("jellyfish2");
prod.send(m);
conn = xacf.createXAConnection();
// Create a session
XASession sess1 = conn.createXASession();
XAResource res1 = sess1.getXAResource();
conn.start();
MessageConsumer cons1 = sess1.createConsumer(queue1);
tm.begin();
Transaction tx1 = tm.getTransaction();
tx1.enlistResource(res1);
// Receive one message in one tx
TextMessage r1 = (TextMessage) cons1.receive(5000);
Assert.assertNotNull(r1);
Assert.assertEquals("jellyfish1", r1.getText());
tx1.delistResource(res1, XAResource.TMSUCCESS);
// suspend the tx
Transaction suspended = tm.suspend();
tm.begin();
Transaction tx2 = tm.getTransaction();
tx2.enlistResource(res1);
// Receive 2nd message in a different tx
TextMessage r2 = (TextMessage) cons1.receive(5000);
Assert.assertNotNull(r2);
Assert.assertEquals("jellyfish2", r2.getText());
tx2.delistResource(res1, XAResource.TMSUCCESS);
// commit this transaction
tm.commit();
// verify that no messages are available
conn2.close();
conn2 = cf.createConnection();
Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn2.start();
MessageConsumer cons = sess.createConsumer(queue1);
TextMessage r3 = (TextMessage) cons.receive(100);
Assert.assertNull(r3);
// now resume the first tx and then commit it
tm.resume(suspended);
tm.commit();
} finally {
if (conn != null) {
conn.close();
}
if (conn2 != null) {
conn2.close();
}
}
}
/**
* Tests if committing a published message works correctly.Steps are,
* 1. Using a distributed transaction a message is published to a queue and committed
* 2. Subscribe to the published queue and see if the message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performClientQueuePublishTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "CommitTestCasePerformClientQueuePublishTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort())
.withQueue(queueName).build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
int ret = xaResource.prepare(xid);
Assert.assertEquals(ret, XAResource.XA_OK, "Dtx.prepare was not successful.");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(5000);
Assert.assertNotNull(receive, "Message was not received.");
queueConnection.close();
}
/**
* Tests if rolling back a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while rolling back dtx session.*")
public void rollbackDtxBranchWithoutStarting()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxRollbackTestCaseRollbackDtxBranchWithoutStarting";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
// xaResource.start(xid, XAResource.TMNOFLAGS);
// producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMSUCCESS);
// xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
@Test
public void testConnectionTypes() throws Exception {
deployConnectionFactory(0, JMSFactoryType.CF, "ConnectionFactory", "/ConnectionFactory");
deployConnectionFactory(0, JMSFactoryType.QUEUE_XA_CF, "CF_QUEUE_XA_TRUE", "/CF_QUEUE_XA_TRUE");
deployConnectionFactory(0, JMSFactoryType.XA_CF, "CF_XA_TRUE", "/CF_XA_TRUE");
deployConnectionFactory(0, JMSFactoryType.QUEUE_CF, "CF_QUEUE", "/CF_QUEUE");
deployConnectionFactory(0, JMSFactoryType.TOPIC_CF, "CF_TOPIC", "/CF_TOPIC");
deployConnectionFactory(0, JMSFactoryType.TOPIC_XA_CF, "CF_TOPIC_XA_TRUE", "/CF_TOPIC_XA_TRUE");
Connection genericConnection = null;
XAConnection xaConnection = null;
QueueConnection queueConnection = null;
TopicConnection topicConnection = null;
XAQueueConnection xaQueueConnection = null;
XATopicConnection xaTopicConnection = null;
ConnectionFactory genericFactory = (ConnectionFactory) ic.lookup("/ConnectionFactory");
genericConnection = genericFactory.createConnection();
assertConnectionType(genericConnection, "generic");
XAConnectionFactory xaFactory = (XAConnectionFactory) ic.lookup("/CF_XA_TRUE");
xaConnection = xaFactory.createXAConnection();
assertConnectionType(xaConnection, "xa");
QueueConnectionFactory queueCF = (QueueConnectionFactory) ic.lookup("/CF_QUEUE");
queueConnection = queueCF.createQueueConnection();
assertConnectionType(queueConnection, "queue");
TopicConnectionFactory topicCF = (TopicConnectionFactory) ic.lookup("/CF_TOPIC");
topicConnection = topicCF.createTopicConnection();
assertConnectionType(topicConnection, "topic");
XAQueueConnectionFactory xaQueueCF = (XAQueueConnectionFactory) ic.lookup("/CF_QUEUE_XA_TRUE");
xaQueueConnection = xaQueueCF.createXAQueueConnection();
assertConnectionType(xaQueueConnection, "xa-queue");
XATopicConnectionFactory xaTopicCF = (XATopicConnectionFactory) ic.lookup("/CF_TOPIC_XA_TRUE");
xaTopicConnection = xaTopicCF.createXATopicConnection();
assertConnectionType(xaTopicConnection, "xa-topic");
genericConnection.close();
xaConnection.close();
queueConnection.close();
topicConnection.close();
xaQueueConnection.close();
xaTopicConnection.close();
undeployConnectionFactory("ConnectionFactory");
undeployConnectionFactory("CF_QUEUE_XA_TRUE");
undeployConnectionFactory("CF_XA_TRUE");
undeployConnectionFactory("CF_QUEUE");
undeployConnectionFactory("CF_TOPIC");
undeployConnectionFactory("CF_TOPIC_XA_TRUE");
}
/**
* Tests if preparing a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while preparing dtx session.*")
public void prepareDtxBranchWithoutEnding()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxPrepareTestCasePrepareDtxBranchWithoutEnding";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMFAIL);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Publish and then consume outside a distributed transaction
*
* @throws XPathExpressionException
* @throws NamingException
* @throws JMSException
* @throws XAException
*/
@Test(groups = { "wso2.mb", "dtx" })
public void publishConsumeOutsideTransactionTestCase() throws XPathExpressionException, NamingException,
JMSException, XAException {
String queueName = "publishConsumeOutsideTransactionTestCase";
String outsideTransactionMessage = "outside transaction";
InitialContext initialContext =
JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(xaTestQueue);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = new TestXidImpl(100, new byte[] { 0x01 }, new byte[] { 0x05 });
producer.send(session.createTextMessage(outsideTransactionMessage));
xaResource.start(xid, XAResource.TMNOFLAGS);
consumer.receive(30000);
xaResource.end(xid, XAResource.TMSUCCESS);
int status = xaResource.prepare(xid);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource.rollback(xid);
JMSTextMessage message = (JMSTextMessage) consumer.receive(30000);
Assert.assertNotNull(message, "Message did not receive from server");
Assert.assertEquals(message.getText(), outsideTransactionMessage, "Invalid Message received");
session.close();
xaConnection.close();
}
/**
* Publish two messages with the same publisher inside two separate transactions
* <p>
* publish messages within two separate transactions
* commit the second transaction
* receive the second message published
* commit the first transaction
* receive the first message published
*
* @throws XPathExpressionException Error on reading AMQP port
* @throws NamingException Throws when Initial context lookup failed
* @throws JMSException Exception related to JMS
* @throws XAException Throws when error occurred in XA transaction
*/
@Test(groups = {"wso2.mb", "dtx"})
public void PublishWithMultipleXidTestCase() throws XPathExpressionException, NamingException,
JMSException, XAException {
String queueName = "PublishWithMultipleXidTestCase";
String xid1Message = "PublishWithMultipleXidTestCase-Msg-1";
String xid2Message = "PublishWithMultipleXidTestCase-Msg-2";
InitialContext initialContext =
JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(xaTestQueue);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid1 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x05});
Xid xid2 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x06});
xaResource.start(xid1, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage(xid1Message));
xaResource.end(xid1, XAResource.TMSUCCESS);
xaResource.start(xid2, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage(xid2Message));
xaResource.end(xid2, XAResource.TMSUCCESS);
// Xid 2
int status = xaResource.prepare(xid2);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource.commit(xid2, false);
JMSTextMessage message = (JMSTextMessage) consumer.receive(30000);
Assert.assertEquals(message.getText(), xid2Message, "Invalid Message received");
// Xid 1
status = xaResource.prepare(xid1);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource.commit(xid1, false);
JMSTextMessage message2 = (JMSTextMessage) consumer.receive(30000);
Assert.assertEquals(message2.getText(), xid1Message, "Invalid Message received");
session.close();
xaConnection.close();
}
/**
* Consume messages on two separate transactions
* <p>
* publish two messages
* consume first message in a transaction
* consume second message in another transaction
* commit first transaction
* commit second transaction
*
* @throws XPathExpressionException Error on reading AMQP port
* @throws NamingException Throws when Initial context lookup failed
* @throws JMSException Exception related to JMS
* @throws XAException Throws when error occurred in XA transaction
*/
@Test(groups = {"wso2.mb", "dtx"})
public void consumeRollbackWithMultipleXidTestCase() throws XPathExpressionException, NamingException,
JMSException, XAException {
String queueName = "consumeRollbackWithMultipleXidTestCase";
String xid1Message = "consumeRollbackWithMultipleXidTestCase-Msg-1";
String xid2Message = "consumeRollbackWithMultipleXidTestCase-Msg-2";
InitialContext initialContext =
JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
XAConnectionFactory connectionFactory =
(XAConnectionFactory) initialContext.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(xaTestQueue);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid1 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x07});
Xid xid2 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x08});
producer.send(session.createTextMessage(xid1Message));
producer.send(session.createTextMessage(xid2Message));
xaResource.start(xid1, XAResource.TMNOFLAGS);
consumer.receive(30000);
xaResource.end(xid1, XAResource.TMSUCCESS);
xaResource.start(xid2, XAResource.TMNOFLAGS);
JMSTextMessage message2 = (JMSTextMessage) consumer.receive(30000);
xaResource.end(xid2, XAResource.TMSUCCESS);
// Xid 1
int status = xaResource.prepare(xid1);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource.rollback(xid1);
// Xid 2
status = xaResource.prepare(xid2);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
Assert.assertEquals(message2.getText(), xid2Message, "Invalid Message received");
xaResource.commit(xid2, false);
JMSTextMessage message3 = (JMSTextMessage) consumer.receive(30000);
Assert.assertNotNull(message3, "Didn't receive the message from server");
Assert.assertEquals(message3.getText(), xid1Message, "Invalid Message received");
session.close();
xaConnection.close();
}
/**
* Publish with two distinct connections with separate transactions and then consume the messages
* Messages should preserve the order in which messages were committed
*
* @throws XPathExpressionException
* @throws NamingException
* @throws JMSException
* @throws XAException
*/
@Test(groups = {"wso2.mb", "dtx"})
private void publishConsumeWithDistinctConnections() throws XPathExpressionException, NamingException,
JMSException, XAException {
String queueName = "publishConsumeWithDistinctConnections";
String xid1Message = "xid 1";
String xid2Message = "xid 2";
InitialContext initialContext =
JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection1 = connectionFactory.createXAConnection();
XAConnection xaConnection2 = connectionFactory.createXAConnection();
xaConnection1.start();
xaConnection2.start();
XASession xaSession1 = xaConnection1.createXASession();
XASession xaSession2 = xaConnection2.createXASession();
XAResource xaResource1 = xaSession1.getXAResource();
XAResource xaResource2 = xaSession2.getXAResource();
Session session1 = xaSession1.getSession();
Session session2 = xaSession2.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session1.createQueue(queueName);
MessageConsumer consumer = session1.createConsumer(xaTestQueue);
MessageProducer producer = session1.createProducer(xaTestQueue);
MessageProducer producer2 = session2.createProducer(xaTestQueue);
Xid xid1 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x09});
Xid xid2 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x10});
xaResource1.start(xid1, XAResource.TMNOFLAGS);
producer.send(session1.createTextMessage(xid1Message));
xaResource1.end(xid1, XAResource.TMSUCCESS);
xaResource2.start(xid2, XAResource.TMNOFLAGS);
producer2.send(session2.createTextMessage(xid2Message));
xaResource2.end(xid2, XAResource.TMSUCCESS);
// Xid 2
int status = xaResource2.prepare(xid2);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource2.commit(xid2, false);
JMSTextMessage message = (JMSTextMessage) consumer.receive(30000);
Assert.assertEquals(message.getText(), xid2Message, "Invalid Message received");
// Xid 1
status = xaResource1.prepare(xid1);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource1.commit(xid1, false);
message = (JMSTextMessage) consumer.receive(30000);
Assert.assertEquals(message.getText(), xid1Message, "Invalid Message received");
session1.close();
session2.close();
xaConnection1.close();
xaConnection2.close();
}