下面列出了javax.jms.XAConnection#start ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testPublisherWithRollback() throws NamingException, JMSException, XAException, IOException {
String queueName = "testPublisherWithRollback";
String testMessage = "testPublisherWithRollback-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
xaConnection.start();
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK");
xaResource.rollback(xid);
// Check whether the message is published to queue.
QueueMetadata queueMetadata = restApiClient.getQueueMetadata(queueName);
Assert.assertEquals((int) queueMetadata.getSize(), 0, "Queue should be empty");
session.close();
xaConnection.close();
}
/**
* Tests if preparing a DTX branch after setting fail flag in dtx.end throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class)
public void prepareDtxBranchAfterEndFails()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxPrepareTestCasePrepareDtxBranchAfterEndFails";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMFAIL);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
private void doTestCrashServerAfterXACommit(boolean onePhase) throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
XAConnection connection = connectionFactory.createXAConnection();
try {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("Queue1");
final XASession xaSession = connection.createXASession();
MessageConsumer consumer = xaSession.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello " + 1));
session.commit();
XAResource xaResource = xaSession.getXAResource();
final Xid xid = newXID();
xaResource.start(xid, XAResource.TMNOFLAGS);
connection.start();
Assert.assertNotNull(consumer.receive(5000));
xaResource.end(xid, XAResource.TMSUCCESS);
try {
xaResource.commit(xid, onePhase);
Assert.fail("didn't get expected exception!");
} catch (XAException xae) {
if (onePhase) {
//expected error code is XAER_RMFAIL
Assert.assertEquals(XAException.XAER_RMFAIL, xae.errorCode);
} else {
//expected error code is XA_RETRY
Assert.assertEquals(XAException.XA_RETRY, xae.errorCode);
}
}
} finally {
connection.close();
}
}
@Test
public void testMultipleSessionsOneTxCommitSend() throws Exception {
// Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
Connection conn2 = null;
try {
conn = xacf.createXAConnection();
conn.start();
tm.begin();
// Create 2 sessions and enlist them
XASession sess1 = conn.createXASession();
ClientSessionInternal res1 = (ClientSessionInternal) sess1.getXAResource();
XASession sess2 = conn.createXASession();
ClientSessionInternal res2 = (ClientSessionInternal) sess2.getXAResource();
res1.setForceNotSameRM(true);
res2.setForceNotSameRM(true);
Transaction tx = tm.getTransaction();
tx.enlistResource(res1);
tx.enlistResource(res2);
// Send 2 messages - one from each session
MessageProducer prod1 = sess1.createProducer(queue1);
MessageProducer prod2 = sess2.createProducer(queue1);
prod1.send(sess1.createTextMessage("echidna1"));
prod2.send(sess2.createTextMessage("echidna2"));
tx.delistResource(res1, XAResource.TMSUCCESS);
tx.delistResource(res2, XAResource.TMSUCCESS);
// commit
tm.commit();
// Messages should be in queue
conn2 = cf.createConnection();
Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = sess.createConsumer(queue1);
conn2.start();
TextMessage r1 = (TextMessage) cons.receive(5000);
Assert.assertNotNull(r1);
Assert.assertEquals("echidna1", r1.getText());
TextMessage r2 = (TextMessage) cons.receive(5000);
Assert.assertNotNull(r2);
Assert.assertEquals("echidna2", r2.getText());
} finally {
if (conn != null) {
conn.close();
}
if (conn2 != null) {
conn2.close();
}
}
}
@Test
public void testPublisherWithCommit() throws NamingException, JMSException, XAException {
String queueName = "testPublisherWithCommit";
String testMessage = "testPublisherWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
xaConnection.start();
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK");
xaResource.commit(xid, false);
// Test by consuming the committed message.
ConnectionFactory connectionFactory =
(ConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
Connection connection = connectionFactory.createConnection();
Session receivingSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = receivingSession.createConsumer(receivingSession.createQueue(queueName));
connection.start();
TextMessage message = (TextMessage) consumer.receive(3000);
Assert.assertNotNull(message, "Didn't receive a message");
Assert.assertEquals(message.getText(), testMessage, "Received message should match sent message");
session.close();
xaConnection.close();
connection.close();
}
@Test
public void testConsumerWithCommit() throws Exception {
String queueName = "testConsumerWithCommit";
String testMessage = "testConsumerWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
// Setup XA connection
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
xaConnection.start();
producer.send(session.createTextMessage(testMessage));
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
TextMessage message = (TextMessage) consumer.receive(2000);
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK.");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
Assert.assertNotNull(message, "Sent message should be consumed by the consumer.");
Assert.assertEquals(message.getText(), testMessage, "Received message should match the sent message.");
// Check whether the message is published to queue.
QueueMetadata queueMetadata = restApiClient.getQueueMetadata(queueName);
Assert.assertEquals((int) queueMetadata.getSize(), 0, "Queue should be empty");
}
/**
* Tests if preparing a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while preparing dtx session.*")
public void prepareDtxBranchWithoutStarting()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxPrepareTestCasePrepareDtxBranchWithoutStarting";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
// MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
// xaResource.start(xid, XAResource.TMNOFLAGS);
// producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Tests if resuming a new XID will throw an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while starting dtx session.*")
public void startAnAlreadyStartedDtxBranch()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxStartTestCaseStartAnAlreadyStartedDtxBranch";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
//
XAConnection xaConnectionDuplicate = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSessionDuplicate = xaConnectionDuplicate.createXASession();
XAResource xaResourceDuplicate = xaSessionDuplicate.getXAResource();
Session sessionDuplicate = xaSessionDuplicate.getSession();
MessageProducer producerDuplicate = sessionDuplicate.createProducer(xaTestQueue);
xaResourceDuplicate.start(xid, XAResource.TMNOFLAGS);
// Below this line should not execute
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
public void testConsumerCloseTransactionalSendReceive() throws Exception {
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection) cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
producer.close();
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session.close();
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
consumer.close();
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
assertNull(consumer.receive(1000));
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
}
public void testSessionCloseTransactionalSendReceive() throws Exception {
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection) cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
session.close();
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
session.close();
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
assertNull(consumer.receive(1000));
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
}
@Test
public void testMultipleSessionsOneTxRollbackSend() throws Exception {
XAConnection conn = null;
Connection conn2 = null;
try {
conn = xacf.createXAConnection();
conn.start();
tm.begin();
// Create 2 sessions and enlist them
XASession sess1 = conn.createXASession();
ClientSessionInternal res1 = (ClientSessionInternal) sess1.getXAResource();
XASession sess2 = conn.createXASession();
ClientSessionInternal res2 = (ClientSessionInternal) sess2.getXAResource();
res1.setForceNotSameRM(true);
res2.setForceNotSameRM(true);
Transaction tx = tm.getTransaction();
tx.enlistResource(res1);
tx.enlistResource(res2);
// Send 2 messages - one from each session
MessageProducer prod1 = sess1.createProducer(queue1);
MessageProducer prod2 = sess2.createProducer(queue1);
prod1.send(sess1.createTextMessage("echidna1"));
prod2.send(sess2.createTextMessage("echidna2"));
tx.delistResource(res1, XAResource.TMSUCCESS);
tx.delistResource(res2, XAResource.TMSUCCESS);
// rollback
tm.rollback();
// Messages should not be in queue
conn2 = cf.createConnection();
Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = sess.createConsumer(queue1);
conn2.start();
TextMessage r1 = (TextMessage) cons.receive(100);
Assert.assertNull(r1);
} finally {
if (conn != null) {
conn.close();
}
if (conn2 != null) {
conn2.close();
}
}
}
/**
* Tests if committing a branch without preparing throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while committing dtx session.*")
public void commitDtxBranchWithoutPrepare()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxCommitTestCaseCommitDtxBranchWithoutEnding";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
// Should prepare before commit
// xaResource.prepare(xid);
xaResource.commit(xid, false);
session.close();
xaConnection.close();
}
/**
* Tests if ending a dtx branch started in a different session throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while ending dtx session.*")
public void endDtxBranchBelongToADifferentSession()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxEndTestCaseEndDtxBranchBelongToADifferentSession";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
xaResource.start(xid, XAResource.TMNOFLAGS);
XAConnection secondXaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession secondXaSession = secondXaConnection.createXASession();
XAResource secondXaResource = secondXaSession.getXAResource();
producer.send(session.createTextMessage("Test 1"));
secondXaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Tests if ending DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while ending dtx session.*")
public void endDtxBranchWithoutStarting()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxEndTestCaseEndDtxBranchWithoutStarting";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
// xaResource.start(xid, XAResource.TMJOIN);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Tests if joining a new XID will throw an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while starting dtx session.*")
public void joinANonExistingDtxBranch()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxStartTestCaseJoinANonExistingDtxBranch";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMJOIN);
// Below this line should not execute
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Publish and then consume outside a distributed transaction
*
* @throws XPathExpressionException
* @throws NamingException
* @throws JMSException
* @throws XAException
*/
@Test(groups = { "wso2.mb", "dtx" })
public void publishConsumeOutsideTransactionTestCase() throws XPathExpressionException, NamingException,
JMSException, XAException {
String queueName = "publishConsumeOutsideTransactionTestCase";
String outsideTransactionMessage = "outside transaction";
InitialContext initialContext =
JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(xaTestQueue);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = new TestXidImpl(100, new byte[] { 0x01 }, new byte[] { 0x05 });
producer.send(session.createTextMessage(outsideTransactionMessage));
xaResource.start(xid, XAResource.TMNOFLAGS);
consumer.receive(30000);
xaResource.end(xid, XAResource.TMSUCCESS);
int status = xaResource.prepare(xid);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource.rollback(xid);
JMSTextMessage message = (JMSTextMessage) consumer.receive(30000);
Assert.assertNotNull(message, "Message did not receive from server");
Assert.assertEquals(message.getText(), outsideTransactionMessage, "Invalid Message received");
session.close();
xaConnection.close();
}
@Test
public void testMultipleSessionsOneTxRollbackSend1PCOptimization() throws Exception {
// Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
Connection conn2 = null;
try {
conn = xacf.createXAConnection();
conn.start();
tm.begin();
// Create 2 sessions and enlist them
XASession sess1 = conn.createXASession();
XAResource res1 = sess1.getXAResource();
XASession sess2 = conn.createXASession();
XAResource res2 = sess2.getXAResource();
Transaction tx = tm.getTransaction();
tx.enlistResource(res1);
tx.enlistResource(res2);
// Send 2 messages - one from each session
MessageProducer prod1 = sess1.createProducer(queue1);
MessageProducer prod2 = sess2.createProducer(queue1);
prod1.send(sess1.createTextMessage("echidna1"));
prod2.send(sess2.createTextMessage("echidna2"));
tx.delistResource(res1, XAResource.TMSUCCESS);
tx.delistResource(res2, XAResource.TMSUCCESS);
// rollback
tm.rollback();
// Messages should not be in queue
conn2 = cf.createConnection();
Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = sess.createConsumer(queue1);
conn2.start();
TextMessage r1 = (TextMessage) cons.receive(100);
Assert.assertNull(r1);
} finally {
if (conn != null) {
conn.close();
}
if (conn2 != null) {
conn2.close();
}
}
}
/**
* Consume messages on two separate transactions
* <p>
* publish two messages
* consume first message in a transaction
* consume second message in another transaction
* commit first transaction
* commit second transaction
*
* @throws XPathExpressionException Error on reading AMQP port
* @throws NamingException Throws when Initial context lookup failed
* @throws JMSException Exception related to JMS
* @throws XAException Throws when error occurred in XA transaction
*/
@Test(groups = {"wso2.mb", "dtx"})
public void consumeRollbackWithMultipleXidTestCase() throws XPathExpressionException, NamingException,
JMSException, XAException {
String queueName = "consumeRollbackWithMultipleXidTestCase";
String xid1Message = "consumeRollbackWithMultipleXidTestCase-Msg-1";
String xid2Message = "consumeRollbackWithMultipleXidTestCase-Msg-2";
InitialContext initialContext =
JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
XAConnectionFactory connectionFactory =
(XAConnectionFactory) initialContext.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(xaTestQueue);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid1 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x07});
Xid xid2 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x08});
producer.send(session.createTextMessage(xid1Message));
producer.send(session.createTextMessage(xid2Message));
xaResource.start(xid1, XAResource.TMNOFLAGS);
consumer.receive(30000);
xaResource.end(xid1, XAResource.TMSUCCESS);
xaResource.start(xid2, XAResource.TMNOFLAGS);
JMSTextMessage message2 = (JMSTextMessage) consumer.receive(30000);
xaResource.end(xid2, XAResource.TMSUCCESS);
// Xid 1
int status = xaResource.prepare(xid1);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource.rollback(xid1);
// Xid 2
status = xaResource.prepare(xid2);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
Assert.assertEquals(message2.getText(), xid2Message, "Invalid Message received");
xaResource.commit(xid2, false);
JMSTextMessage message3 = (JMSTextMessage) consumer.receive(30000);
Assert.assertNotNull(message3, "Didn't receive the message from server");
Assert.assertEquals(message3.getText(), xid1Message, "Invalid Message received");
session.close();
xaConnection.close();
}
/**
* Publish with two distinct connections with separate transactions and then consume the messages
* Messages should preserve the order in which messages were committed
*
* @throws XPathExpressionException
* @throws NamingException
* @throws JMSException
* @throws XAException
*/
@Test(groups = {"wso2.mb", "dtx"})
private void publishConsumeWithDistinctConnections() throws XPathExpressionException, NamingException,
JMSException, XAException {
String queueName = "publishConsumeWithDistinctConnections";
String xid1Message = "xid 1";
String xid2Message = "xid 2";
InitialContext initialContext =
JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection1 = connectionFactory.createXAConnection();
XAConnection xaConnection2 = connectionFactory.createXAConnection();
xaConnection1.start();
xaConnection2.start();
XASession xaSession1 = xaConnection1.createXASession();
XASession xaSession2 = xaConnection2.createXASession();
XAResource xaResource1 = xaSession1.getXAResource();
XAResource xaResource2 = xaSession2.getXAResource();
Session session1 = xaSession1.getSession();
Session session2 = xaSession2.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session1.createQueue(queueName);
MessageConsumer consumer = session1.createConsumer(xaTestQueue);
MessageProducer producer = session1.createProducer(xaTestQueue);
MessageProducer producer2 = session2.createProducer(xaTestQueue);
Xid xid1 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x09});
Xid xid2 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x10});
xaResource1.start(xid1, XAResource.TMNOFLAGS);
producer.send(session1.createTextMessage(xid1Message));
xaResource1.end(xid1, XAResource.TMSUCCESS);
xaResource2.start(xid2, XAResource.TMNOFLAGS);
producer2.send(session2.createTextMessage(xid2Message));
xaResource2.end(xid2, XAResource.TMSUCCESS);
// Xid 2
int status = xaResource2.prepare(xid2);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource2.commit(xid2, false);
JMSTextMessage message = (JMSTextMessage) consumer.receive(30000);
Assert.assertEquals(message.getText(), xid2Message, "Invalid Message received");
// Xid 1
status = xaResource1.prepare(xid1);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource1.commit(xid1, false);
message = (JMSTextMessage) consumer.receive(30000);
Assert.assertEquals(message.getText(), xid1Message, "Invalid Message received");
session1.close();
session2.close();
xaConnection1.close();
xaConnection2.close();
}
/**
* Tests if rolling back a published message works correctly.Steps are,
* 1. Using a distributed transaction a message is published to a queue and rolled back
* 2. Subscribe to the published queue and see if any message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performClientQueuePublishTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "RollbackTestCasePerformClientQueuePublishTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort())
.withQueue(queueName).build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(5000);
Assert.assertNull(receive, "Message received. Message was not rolled back");
session.close();
xaConnection.close();
queueConnection.close();
}