下面列出了javax.jms.XAConnection#createXASession ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testIsSamRM() throws Exception {
XAConnection conn = null;
conn = xacf.createXAConnection();
// Create a session
XASession sess1 = conn.createXASession();
XAResource res1 = sess1.getXAResource();
// Create a session
XASession sess2 = conn.createXASession();
XAResource res2 = sess2.getXAResource();
Assert.assertTrue(res1.isSameRM(res2));
}
@Test
public void testGetSession2() throws Exception {
deployConnectionFactory(0, JMSFactoryType.CF, "ConnectionFactory", "/ConnectionFactory");
XAConnection conn = getXAConnectionFactory().createXAConnection();
XASession sess = conn.createXASession();
sess.getSession();
conn.close();
}
/** */
public static void main(String[] args) throws Exception {
XAConnectionFactory factory = getXAConnectionFactory();
if (factory == null) {
throw new IllegalArgumentException("Provide a factory for the broker in the getXAConnectionFactory() method");
}
// create a connection, session and XA transaction
XAConnection conn = factory.createXAConnection();
XASession sess = conn.createXASession();
XAResource xaRes = sess.getXAResource();
Xid xid = new MyXid(1);
// start the transaction and produce one message
xaRes.start(xid, XAResource.TMNOFLAGS);
MessageProducer producer = sess.createProducer(sess.createQueue("queue"));
producer.send(sess.createTextMessage("foo"));
xaRes.end(xid, XAResource.TMSUCCESS);
// prepare the transaction
xaRes.prepare(xid);
// now disconnect. Some brokers roll back the transaction, but this is not
// compatible with Jet's fault tolerance.
conn.close();
// connect again
conn = factory.createXAConnection();
conn.start();
sess = conn.createXASession();
xaRes = sess.getXAResource();
// commit the prepared transaction
xaRes.commit(xid, false);
// check that the message is there
MessageConsumer cons = sess.createConsumer(sess.createQueue("queue"));
TextMessage msg = (TextMessage) cons.receive(TIMEOUT);
if (msg == null || !msg.getText().equals("foo")) {
System.err.println("Message is missing or has wrong text, transaction probably lost");
} else {
System.out.println("Success!");
}
conn.close();
}
@Test
public void testPublisherWithCommit() throws NamingException, JMSException, XAException {
String queueName = "testPublisherWithCommit";
String testMessage = "testPublisherWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
xaConnection.start();
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK");
xaResource.commit(xid, false);
// Test by consuming the committed message.
ConnectionFactory connectionFactory =
(ConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
Connection connection = connectionFactory.createConnection();
Session receivingSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = receivingSession.createConsumer(receivingSession.createQueue(queueName));
connection.start();
TextMessage message = (TextMessage) consumer.receive(3000);
Assert.assertNotNull(message, "Didn't receive a message");
Assert.assertEquals(message.getText(), testMessage, "Received message should match sent message");
session.close();
xaConnection.close();
connection.close();
}
@Test
public void test2PCSendFailOnPrepare() throws Exception {
XAConnection conn = null;
Connection conn2 = null;
try {
conn = xacf.createXAConnection();
tm.begin();
XASession sess = conn.createXASession();
XAResource res = sess.getXAResource();
// prevent 1Pc optimisation
// res.setForceNotSameRM(true);
XAResource res2 = new DummyXAResource(true);
XAResource res3 = new DummyXAResource();
XAResource res4 = new DummyXAResource();
Transaction tx = tm.getTransaction();
tx.enlistResource(res);
tx.enlistResource(res2);
tx.enlistResource(res3);
tx.enlistResource(res4);
MessageProducer prod = sess.createProducer(null);
prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Message m = sess.createTextMessage("XATest1");
prod.send(queue1, m);
m = sess.createTextMessage("XATest2");
prod.send(queue1, m);
tx.delistResource(res, XAResource.TMSUCCESS);
tx.delistResource(res2, XAResource.TMSUCCESS);
tx.delistResource(res3, XAResource.TMSUCCESS);
tx.delistResource(res4, XAResource.TMSUCCESS);
try {
tm.commit();
Assert.fail("should not get here");
} catch (Exception e) {
// We should expect this
}
conn2 = cf.createConnection();
conn2.start();
Session sessReceiver = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = sessReceiver.createConsumer(queue1);
Message m2 = cons.receive(100);
Assert.assertNull(m2);
} finally {
if (conn != null) {
conn.close();
}
if (conn2 != null) {
conn2.close();
}
}
}
/**
* Tests if message acknowledgement works correctly with session suspend and resume. Steps are,
* 1. Publish a message to a queue
* 2. Using a distributed transacted session receive the message and suspend the session
* 3. Publish a message to the queue
* 4. Resume the session again, ack and commit
* 5. Subscribe again using a normal session and see if any message is received
*/
@Test(groups = { "wso2.mb", "dtx" })
public void suspendResumeMessageAckTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxStartPositiveTestCaseSuspendResumeMessageAckTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
// Publish message to queue
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueSession.createQueue(queueName);
MessageProducer messageProducer = queueSession.createProducer(xaTestQueue);
messageProducer.send(queueSession.createTextMessage("Test message 1"));
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
MessageConsumer xaConsumer = session.createConsumer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
Message receivedMessage = xaConsumer.receive(5000);
xaResource.end(xid, XAResource.TMSUSPEND);
Assert.assertNotNull(receivedMessage, "No message received");
messageProducer.send(queueSession.createTextMessage("Test message 2"));
messageProducer.close();
xaResource.start(xid, XAResource.TMRESUME);
receivedMessage = xaConsumer.receive(5000);
xaResource.end(xid, XAResource.TMSUCCESS);
Assert.assertNotNull(receivedMessage, "No message received");
int ret = xaResource.prepare(xid);
Assert.assertEquals(ret, XAResource.XA_OK, "Dtx.prepare was not successful.");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
// subscribe and see if the message is received
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receivedMessageFromNormalConnection = messageConsumer.receive(5000);
Assert.assertNull(receivedMessageFromNormalConnection, "Message received. Commit might have failed");
queueConnection.close();
}
@Test
public void testConsumerWithRollback() throws Exception {
String queueName = "testConsumerWithRollback";
String testMessage = "testConsumerWithRollback-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
// Setup XA connection
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
xaConnection.start();
producer.send(session.createTextMessage(testMessage));
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
TextMessage message = (TextMessage) consumer.receive(2000);
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK.");
xaResource.rollback(xid);
session.close();
xaConnection.close();
Assert.assertNotNull(message, "Sent message should be consumed by the consumer.");
Assert.assertEquals(message.getText(), testMessage, "Received message should match the sent message.");
// Check whether the message is published to queue.
QueueMetadata queueMetadata = restApiClient.getQueueMetadata(queueName);
Assert.assertEquals((int) queueMetadata.getSize(), 1, "Queue should be non empty");
}
/**
* Tests if one-phase commit in distributed transactions, is working correctly for message consumption. Steps
* are,
* 1. Publish a message to a queue
* 2. Using a distributed transacted session receive the message and commit in one-phase
* 3. Subscribe again using a normal session and verify that a message is not received
*/
@Test(groups = { "wso2.mb", "dtx" })
public void onePhaseCommitMessageConsumptionTest() throws Exception {
String queueName = "DtxOnePhaseCommitMessageConsumptionTest";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort()).withQueue(queueName).build();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
// Publish message to queue
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueSession.createQueue(queueName);
MessageProducer messageProducer = queueSession.createProducer(xaTestQueue);
messageProducer.send(queueSession.createTextMessage("Test message consumption"));
messageProducer.close();
// Publish to queue and commit in one-phase
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
MessageConsumer xaConsumer = session.createConsumer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
Message receivedMessage = xaConsumer.receive(10000);
xaResource.end(xid, XAResource.TMSUCCESS);
Assert.assertNotNull(receivedMessage, "No message received");
xaResource.commit(xid, true);
session.close();
xaConnection.close();
// subscribe and see if the message is received
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receivedMessageAfterOnephaseCommit = messageConsumer.receive(10000);
Assert.assertNull(receivedMessageAfterOnephaseCommit,
"Message received. One phase commit might have failed");
queueConnection.close();
}
/**
* Tests if one-phase commit in distributed transactions, is working correctly for message publishing.
* Steps are,
* 1. Using a distributed transaction publish a message to a queue and commit in one-phase
* 2. Subscribe to the published queue and see if the message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performClientQueuePublishTestCase() throws Exception {
String queueName = "DtxOnePhaseCommitMessagePublishingTest";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort()).withQueue(queueName).build();
// Publish to queue and commit in one-phase
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test Message publishing"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.commit(xid, true);
session.close();
xaConnection.close();
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(10000);
Assert.assertNotNull(receive, "Message was not received. One-phase commit might have failed");
queueConnection.close();
}
/**
* Tests if preparing a DTX branch with publishing permission issues throws an error
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class)
public void prepareDtxBranchWithNoRoutesIssue()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxPrepareTestCasePrepareDtxBranchWithNoRoutesIssues";
User adminUser = getSuperTenantAdminUser();
InitialContext adminInitialContext
= JMSClientHelper.createInitialContextBuilder(adminUser.getUserNameWithoutDomain(),
adminUser.getPassword(),
getBrokerHost(),
getAMQPPort()).withQueue(queueName).build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory
= (XAConnectionFactory) adminInitialContext.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination testQueue = (Destination) adminInitialContext.lookup(queueName);
MessageProducer producer = session.createProducer(testQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
// Test should fail at prepare stage due to no route issue
int prepareResponseCode = xaResource.prepare(xid);
Assert.assertNotEquals(prepareResponseCode, XAResource.XA_OK, "Prepare should fail due to no route issue");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
}
@Test
public void testOneSessionTwoTransactionsCommitAcknowledge() throws Exception {
XAConnection conn = null;
Connection conn2 = null;
try {
// First send 2 messages
conn2 = cf.createConnection();
Session sessProducer = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sessProducer.createProducer(queue1);
Message m = sessProducer.createTextMessage("jellyfish1");
prod.send(m);
m = sessProducer.createTextMessage("jellyfish2");
prod.send(m);
conn = xacf.createXAConnection();
// Create a session
XASession sess1 = conn.createXASession();
XAResource res1 = sess1.getXAResource();
conn.start();
MessageConsumer cons1 = sess1.createConsumer(queue1);
tm.begin();
Transaction tx1 = tm.getTransaction();
tx1.enlistResource(res1);
// Receive one message in one tx
TextMessage r1 = (TextMessage) cons1.receive(5000);
Assert.assertNotNull(r1);
Assert.assertEquals("jellyfish1", r1.getText());
tx1.delistResource(res1, XAResource.TMSUCCESS);
// suspend the tx
Transaction suspended = tm.suspend();
tm.begin();
Transaction tx2 = tm.getTransaction();
tx2.enlistResource(res1);
// Receive 2nd message in a different tx
TextMessage r2 = (TextMessage) cons1.receive(5000);
Assert.assertNotNull(r2);
Assert.assertEquals("jellyfish2", r2.getText());
tx2.delistResource(res1, XAResource.TMSUCCESS);
// commit this transaction
tm.commit();
// verify that no messages are available
conn2.close();
conn2 = cf.createConnection();
Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn2.start();
MessageConsumer cons = sess.createConsumer(queue1);
TextMessage r3 = (TextMessage) cons.receive(100);
Assert.assertNull(r3);
// now resume the first tx and then commit it
tm.resume(suspended);
tm.commit();
} finally {
if (conn != null) {
conn.close();
}
if (conn2 != null) {
conn2.close();
}
}
}
/**
* Tests if committing a prepared branch with onephase throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while committing dtx session.*")
public void commitDtxBranchWithOnephaseAfterPrepare()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxCommitTestCaseCmmitDtxBranchWithOnephaseAfterPrepare";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
// one phase should be false
xaResource.commit(xid, true);
session.close();
xaConnection.close();
}
/**
* Tests if committing a branch without preparing throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while committing dtx session.*")
public void commitDtxBranchWithoutPrepare()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxCommitTestCaseCommitDtxBranchWithoutEnding";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
// Should prepare before commit
// xaResource.prepare(xid);
xaResource.commit(xid, false);
session.close();
xaConnection.close();
}
/**
* Tests if rolling back a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while rolling back dtx session.*")
public void rollbackDtxBranchWithoutStarting()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxRollbackTestCaseRollbackDtxBranchWithoutStarting";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
// xaResource.start(xid, XAResource.TMNOFLAGS);
// producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMSUCCESS);
// xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Tests if ending a dtx branch started in a different session throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while ending dtx session.*")
public void endDtxBranchBelongToADifferentSession()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxEndTestCaseEndDtxBranchBelongToADifferentSession";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
xaResource.start(xid, XAResource.TMNOFLAGS);
XAConnection secondXaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession secondXaSession = secondXaConnection.createXASession();
XAResource secondXaResource = secondXaSession.getXAResource();
producer.send(session.createTextMessage("Test 1"));
secondXaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Tests if recovering transactions return empty set when no prepared transactions are there. Steps are,
* 1. A distributed sessions is started
* 2. Before preparing recover and see we get an empty list
* 3. Go to prepare stage and see if we get one item in the list
* 4. Rollback and see if we get an empty list
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performDtxRecoverWithPublishTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxRecoverPositiveTestCasePerformDtxRecoverWithPublishTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid[] recoveredTransactions = xaResource.recover(XAResource.TMNOFLAGS);
Assert.assertEquals(recoveredTransactions.length, 0, "Recovered Transaction list length should be 0 since "
+ "we don't have not started any transaction");
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
recoveredTransactions = xaResource.recover(XAResource.TMNOFLAGS);
Assert.assertEquals(recoveredTransactions.length, 0, "Recovered Transaction list length should be 0 since "
+ "the transaction is not prepared yet");
int ret = xaResource.prepare(xid);
Assert.assertEquals(ret, XAResource.XA_OK, "Dtx.prepare was not successful.");
recoveredTransactions = xaResource.recover(XAResource.TMNOFLAGS);
Assert.assertEquals(recoveredTransactions.length, 1, "Recovered Transaction list length should be 1 since "
+ "the transaction is in prepared yet");
byte[] originalBranchQualifier = xid.getBranchQualifier();
byte[] originalGlobalTransactionId = xid.getGlobalTransactionId();
byte[] receivedBranchQualifier = recoveredTransactions[0].getBranchQualifier();
byte[] receivedGlobalTransactionId = recoveredTransactions[0].getGlobalTransactionId();
boolean matching = Arrays.equals(originalBranchQualifier, receivedBranchQualifier) &&
Arrays.equals(originalGlobalTransactionId, receivedGlobalTransactionId);
Assert.assertTrue(matching, "Received xid does not match the original xid" );
xaResource.rollback(xid);
recoveredTransactions = xaResource.recover(XAResource.TMNOFLAGS);
Assert.assertEquals(recoveredTransactions.length, 0, "Recovered Transaction list length should be 0 since "
+ "the transaction is not in prepared state");
session.close();
xaConnection.close();
}
/**
* Publish and then consume outside a distributed transaction
*
* @throws XPathExpressionException
* @throws NamingException
* @throws JMSException
* @throws XAException
*/
@Test(groups = { "wso2.mb", "dtx" })
public void publishConsumeOutsideTransactionTestCase() throws XPathExpressionException, NamingException,
JMSException, XAException {
String queueName = "publishConsumeOutsideTransactionTestCase";
String outsideTransactionMessage = "outside transaction";
InitialContext initialContext =
JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(xaTestQueue);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = new TestXidImpl(100, new byte[] { 0x01 }, new byte[] { 0x05 });
producer.send(session.createTextMessage(outsideTransactionMessage));
xaResource.start(xid, XAResource.TMNOFLAGS);
consumer.receive(30000);
xaResource.end(xid, XAResource.TMSUCCESS);
int status = xaResource.prepare(xid);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource.rollback(xid);
JMSTextMessage message = (JMSTextMessage) consumer.receive(30000);
Assert.assertNotNull(message, "Message did not receive from server");
Assert.assertEquals(message.getText(), outsideTransactionMessage, "Invalid Message received");
session.close();
xaConnection.close();
}
/**
* Tests if preparing a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while preparing dtx session.*")
public void prepareDtxBranchWithoutEnding()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxPrepareTestCasePrepareDtxBranchWithoutEnding";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMFAIL);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Consume messages on two separate transactions
* <p>
* publish two messages
* consume first message in a transaction
* consume second message in another transaction
* commit first transaction
* commit second transaction
*
* @throws XPathExpressionException Error on reading AMQP port
* @throws NamingException Throws when Initial context lookup failed
* @throws JMSException Exception related to JMS
* @throws XAException Throws when error occurred in XA transaction
*/
@Test(groups = {"wso2.mb", "dtx"})
public void consumeRollbackWithMultipleXidTestCase() throws XPathExpressionException, NamingException,
JMSException, XAException {
String queueName = "consumeRollbackWithMultipleXidTestCase";
String xid1Message = "consumeRollbackWithMultipleXidTestCase-Msg-1";
String xid2Message = "consumeRollbackWithMultipleXidTestCase-Msg-2";
InitialContext initialContext =
JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
XAConnectionFactory connectionFactory =
(XAConnectionFactory) initialContext.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(xaTestQueue);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid1 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x07});
Xid xid2 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x08});
producer.send(session.createTextMessage(xid1Message));
producer.send(session.createTextMessage(xid2Message));
xaResource.start(xid1, XAResource.TMNOFLAGS);
consumer.receive(30000);
xaResource.end(xid1, XAResource.TMSUCCESS);
xaResource.start(xid2, XAResource.TMNOFLAGS);
JMSTextMessage message2 = (JMSTextMessage) consumer.receive(30000);
xaResource.end(xid2, XAResource.TMSUCCESS);
// Xid 1
int status = xaResource.prepare(xid1);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource.rollback(xid1);
// Xid 2
status = xaResource.prepare(xid2);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
Assert.assertEquals(message2.getText(), xid2Message, "Invalid Message received");
xaResource.commit(xid2, false);
JMSTextMessage message3 = (JMSTextMessage) consumer.receive(30000);
Assert.assertNotNull(message3, "Didn't receive the message from server");
Assert.assertEquals(message3.getText(), xid1Message, "Invalid Message received");
session.close();
xaConnection.close();
}
@Test
public void testMultipleSessionsOneTxCommitSend1PCOptimization() throws Exception {
// Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
Connection conn2 = null;
try {
conn = xacf.createXAConnection();
conn.start();
tm.begin();
// Create 2 sessions and enlist them
XASession sess1 = conn.createXASession();
XAResource res1 = sess1.getXAResource();
XASession sess2 = conn.createXASession();
XAResource res2 = sess2.getXAResource();
Transaction tx = tm.getTransaction();
tx.enlistResource(res1);
tx.enlistResource(res2);
// Send 2 messages - one from each session
MessageProducer prod1 = sess1.createProducer(queue1);
MessageProducer prod2 = sess2.createProducer(queue1);
prod1.send(sess1.createTextMessage("echidna1"));
prod2.send(sess2.createTextMessage("echidna2"));
tx.delistResource(res1, XAResource.TMSUCCESS);
tx.delistResource(res2, XAResource.TMSUCCESS);
// commit
tm.commit();
// Messages should be in queue
conn2 = cf.createConnection();
Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = sess.createConsumer(queue1);
conn2.start();
TextMessage r1 = (TextMessage) cons.receive(5000);
Assert.assertNotNull(r1);
Assert.assertEquals("echidna1", r1.getText());
TextMessage r2 = (TextMessage) cons.receive(5000);
Assert.assertNotNull(r2);
Assert.assertEquals("echidna2", r2.getText());
} finally {
if (conn != null) {
conn.close();
}
if (conn2 != null) {
conn2.close();
}
}
}