下面列出了javax.jms.XASession#getSession ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testPublisherWithRollback() throws NamingException, JMSException, XAException, IOException {
String queueName = "testPublisherWithRollback";
String testMessage = "testPublisherWithRollback-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
xaConnection.start();
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK");
xaResource.rollback(xid);
// Check whether the message is published to queue.
QueueMetadata queueMetadata = restApiClient.getQueueMetadata(queueName);
Assert.assertEquals((int) queueMetadata.getSize(), 0, "Queue should be empty");
session.close();
xaConnection.close();
}
/**
* Tests if preparing a DTX branch after setting fail flag in dtx.end throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class)
public void prepareDtxBranchAfterEndFails()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxPrepareTestCasePrepareDtxBranchAfterEndFails";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMFAIL);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
@Test
public void testPublisherWithCommit() throws NamingException, JMSException, XAException {
String queueName = "testPublisherWithCommit";
String testMessage = "testPublisherWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
xaConnection.start();
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK");
xaResource.commit(xid, false);
// Test by consuming the committed message.
ConnectionFactory connectionFactory =
(ConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
Connection connection = connectionFactory.createConnection();
Session receivingSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = receivingSession.createConsumer(receivingSession.createQueue(queueName));
connection.start();
TextMessage message = (TextMessage) consumer.receive(3000);
Assert.assertNotNull(message, "Didn't receive a message");
Assert.assertEquals(message.getText(), testMessage, "Received message should match sent message");
session.close();
xaConnection.close();
connection.close();
}
/**
* Tests if preparing a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while preparing dtx session.*")
public void prepareDtxBranchWithoutStarting()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxPrepareTestCasePrepareDtxBranchWithoutStarting";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
// MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
// xaResource.start(xid, XAResource.TMNOFLAGS);
// producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
@Test
public void testConsumerWithRollback() throws Exception {
String queueName = "testConsumerWithRollback";
String testMessage = "testConsumerWithRollback-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
// Setup XA connection
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
xaConnection.start();
producer.send(session.createTextMessage(testMessage));
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
TextMessage message = (TextMessage) consumer.receive(2000);
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK.");
xaResource.rollback(xid);
session.close();
xaConnection.close();
Assert.assertNotNull(message, "Sent message should be consumed by the consumer.");
Assert.assertEquals(message.getText(), testMessage, "Received message should match the sent message.");
// Check whether the message is published to queue.
QueueMetadata queueMetadata = restApiClient.getQueueMetadata(queueName);
Assert.assertEquals((int) queueMetadata.getSize(), 1, "Queue should be non empty");
}
/**
* Tests if publishing messages works correctly with session suspending and resuming.Steps are,
* 1. Using a distributed transaction a message is published to a queue and session is suspended
* 2. Subscribe to the published queue and see if any message is received.
* 3. Resume the suspended session and publish another message and commit
* 4. Subscribe to the queue and see if two messages are received
*/
@Test(groups = { "wso2.mb", "dtx" })
public void suspendResumeQueuePublishTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxStartPositiveTestCaseSuspendResumeQueuePublishTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUSPEND);
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(5000);
Assert.assertNull(receive, "Message received. Message was not rolled back");
xaResource.start(xid, XAResource.TMRESUME);
producer.send(session.createTextMessage("Test 2"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.commit(xid, false);
session.close();
xaConnection.close();
receive = messageConsumer.receive(5000);
Assert.assertNotNull(receive, "Message not received");
receive = messageConsumer.receive(5000);
Assert.assertNotNull(receive, "Message not received");
queueConnection.close();
}
/**
* Tests if one-phase commit in distributed transactions, is working correctly for message publishing.
* Steps are,
* 1. Using a distributed transaction publish a message to a queue and commit in one-phase
* 2. Subscribe to the published queue and see if the message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performClientQueuePublishTestCase() throws Exception {
String queueName = "DtxOnePhaseCommitMessagePublishingTest";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort()).withQueue(queueName).build();
// Publish to queue and commit in one-phase
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test Message publishing"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.commit(xid, true);
session.close();
xaConnection.close();
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(10000);
Assert.assertNotNull(receive, "Message was not received. One-phase commit might have failed");
queueConnection.close();
}
/**
* Tests if committing a published message works correctly without a client ID.Steps are,
* 1. Using a distributed transaction a message is published to a queue and committed
* 2. Subscribe to the published queue and see if the message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performDtxClientQueuePublishTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "ClientIDNullTestCaseDtxPerformClientQueuePublishTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin",
"admin",
"localhost",
getAMQPPort())
.withNoClientId()
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
int ret = xaResource.prepare(xid);
Assert.assertEquals(ret, XAResource.XA_OK, "Dtx.prepare was not successful.");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(5000);
Assert.assertNotNull(receive, "Message was not received.");
queueConnection.close();
}
/**
* Tests if committing a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while committing dtx session.*")
public void commitDtxBranchWithoutStarting()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxCommitTestCaseCommitDtxBranchWithoutStarting";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
// xaResource.start(xid, XAResource.TMNOFLAGS);
// producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMSUCCESS);
// xaResource.prepare(xid);
xaResource.commit(xid, false);
session.close();
xaConnection.close();
}
/**
* Tests if committing a published message works correctly.Steps are,
* 1. Using a distributed transaction a message is published to a queue and committed
* 2. Subscribe to the published queue and see if the message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performClientQueuePublishTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "CommitTestCasePerformClientQueuePublishTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort())
.withQueue(queueName).build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
int ret = xaResource.prepare(xid);
Assert.assertEquals(ret, XAResource.XA_OK, "Dtx.prepare was not successful.");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(5000);
Assert.assertNotNull(receive, "Message was not received.");
queueConnection.close();
}
/**
* Tests if committing a prepared branch with onephase throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while committing dtx session.*")
public void commitDtxBranchWithOnephaseAfterPrepare()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxCommitTestCaseCmmitDtxBranchWithOnephaseAfterPrepare";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
// one phase should be false
xaResource.commit(xid, true);
session.close();
xaConnection.close();
}
/**
* Tests if committing a branch without preparing throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while committing dtx session.*")
public void commitDtxBranchWithoutPrepare()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxCommitTestCaseCommitDtxBranchWithoutEnding";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
// Should prepare before commit
// xaResource.prepare(xid);
xaResource.commit(xid, false);
session.close();
xaConnection.close();
}
/**
* Tests if resuming a new XID will throw an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while starting dtx session.*")
public void resumeANonExistingDtxBranch()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxStartTestCaseResumeANonExistingDtxBranch";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMRESUME);
// Below this line should not execute
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Tests if rolling back a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while rolling back dtx session.*")
public void rollbackDtxBranchWithoutEnding()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxRollbackTestCaseRollbackDtxBranchWithoutEnding";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMFAIL);
// xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Tests if recovering transactions return empty set when no prepared transactions are there. Steps are,
* 1. A distributed sessions is started
* 2. Before preparing recover and see we get an empty list
* 3. Go to prepare stage and see if we get one item in the list
* 4. Rollback and see if we get an empty list
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performDtxRecoverWithPublishTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxRecoverPositiveTestCasePerformDtxRecoverWithPublishTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid[] recoveredTransactions = xaResource.recover(XAResource.TMNOFLAGS);
Assert.assertEquals(recoveredTransactions.length, 0, "Recovered Transaction list length should be 0 since "
+ "we don't have not started any transaction");
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
recoveredTransactions = xaResource.recover(XAResource.TMNOFLAGS);
Assert.assertEquals(recoveredTransactions.length, 0, "Recovered Transaction list length should be 0 since "
+ "the transaction is not prepared yet");
int ret = xaResource.prepare(xid);
Assert.assertEquals(ret, XAResource.XA_OK, "Dtx.prepare was not successful.");
recoveredTransactions = xaResource.recover(XAResource.TMNOFLAGS);
Assert.assertEquals(recoveredTransactions.length, 1, "Recovered Transaction list length should be 1 since "
+ "the transaction is in prepared yet");
byte[] originalBranchQualifier = xid.getBranchQualifier();
byte[] originalGlobalTransactionId = xid.getGlobalTransactionId();
byte[] receivedBranchQualifier = recoveredTransactions[0].getBranchQualifier();
byte[] receivedGlobalTransactionId = recoveredTransactions[0].getGlobalTransactionId();
boolean matching = Arrays.equals(originalBranchQualifier, receivedBranchQualifier) &&
Arrays.equals(originalGlobalTransactionId, receivedGlobalTransactionId);
Assert.assertTrue(matching, "Received xid does not match the original xid" );
xaResource.rollback(xid);
recoveredTransactions = xaResource.recover(XAResource.TMNOFLAGS);
Assert.assertEquals(recoveredTransactions.length, 0, "Recovered Transaction list length should be 0 since "
+ "the transaction is not in prepared state");
session.close();
xaConnection.close();
}
/**
* Publish and then consume outside a distributed transaction
*
* @throws XPathExpressionException
* @throws NamingException
* @throws JMSException
* @throws XAException
*/
@Test(groups = { "wso2.mb", "dtx" })
public void publishConsumeOutsideTransactionTestCase() throws XPathExpressionException, NamingException,
JMSException, XAException {
String queueName = "publishConsumeOutsideTransactionTestCase";
String outsideTransactionMessage = "outside transaction";
InitialContext initialContext =
JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(xaTestQueue);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = new TestXidImpl(100, new byte[] { 0x01 }, new byte[] { 0x05 });
producer.send(session.createTextMessage(outsideTransactionMessage));
xaResource.start(xid, XAResource.TMNOFLAGS);
consumer.receive(30000);
xaResource.end(xid, XAResource.TMSUCCESS);
int status = xaResource.prepare(xid);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource.rollback(xid);
JMSTextMessage message = (JMSTextMessage) consumer.receive(30000);
Assert.assertNotNull(message, "Message did not receive from server");
Assert.assertEquals(message.getText(), outsideTransactionMessage, "Invalid Message received");
session.close();
xaConnection.close();
}
/**
* Tests if message acknowledgement works correctly with session suspend and resume. Steps are,
* 1. Publish a message to a queue
* 2. Using a distributed transacted session receive the message and suspend the session
* 3. Publish a message to the queue
* 4. Resume the session again, ack and commit
* 5. Subscribe again using a normal session and see if any message is received
*/
@Test(groups = { "wso2.mb", "dtx" })
public void suspendResumeMessageAckTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxStartPositiveTestCaseSuspendResumeMessageAckTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
// Publish message to queue
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueSession.createQueue(queueName);
MessageProducer messageProducer = queueSession.createProducer(xaTestQueue);
messageProducer.send(queueSession.createTextMessage("Test message 1"));
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
MessageConsumer xaConsumer = session.createConsumer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
Message receivedMessage = xaConsumer.receive(5000);
xaResource.end(xid, XAResource.TMSUSPEND);
Assert.assertNotNull(receivedMessage, "No message received");
messageProducer.send(queueSession.createTextMessage("Test message 2"));
messageProducer.close();
xaResource.start(xid, XAResource.TMRESUME);
receivedMessage = xaConsumer.receive(5000);
xaResource.end(xid, XAResource.TMSUCCESS);
Assert.assertNotNull(receivedMessage, "No message received");
int ret = xaResource.prepare(xid);
Assert.assertEquals(ret, XAResource.XA_OK, "Dtx.prepare was not successful.");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
// subscribe and see if the message is received
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receivedMessageFromNormalConnection = messageConsumer.receive(5000);
Assert.assertNull(receivedMessageFromNormalConnection, "Message received. Commit might have failed");
queueConnection.close();
}
/**
* Publish with two distinct connections with separate transactions and then consume the messages
* Messages should preserve the order in which messages were committed
*
* @throws XPathExpressionException
* @throws NamingException
* @throws JMSException
* @throws XAException
*/
@Test(groups = {"wso2.mb", "dtx"})
private void publishConsumeWithDistinctConnections() throws XPathExpressionException, NamingException,
JMSException, XAException {
String queueName = "publishConsumeWithDistinctConnections";
String xid1Message = "xid 1";
String xid2Message = "xid 2";
InitialContext initialContext =
JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection1 = connectionFactory.createXAConnection();
XAConnection xaConnection2 = connectionFactory.createXAConnection();
xaConnection1.start();
xaConnection2.start();
XASession xaSession1 = xaConnection1.createXASession();
XASession xaSession2 = xaConnection2.createXASession();
XAResource xaResource1 = xaSession1.getXAResource();
XAResource xaResource2 = xaSession2.getXAResource();
Session session1 = xaSession1.getSession();
Session session2 = xaSession2.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session1.createQueue(queueName);
MessageConsumer consumer = session1.createConsumer(xaTestQueue);
MessageProducer producer = session1.createProducer(xaTestQueue);
MessageProducer producer2 = session2.createProducer(xaTestQueue);
Xid xid1 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x09});
Xid xid2 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x10});
xaResource1.start(xid1, XAResource.TMNOFLAGS);
producer.send(session1.createTextMessage(xid1Message));
xaResource1.end(xid1, XAResource.TMSUCCESS);
xaResource2.start(xid2, XAResource.TMNOFLAGS);
producer2.send(session2.createTextMessage(xid2Message));
xaResource2.end(xid2, XAResource.TMSUCCESS);
// Xid 2
int status = xaResource2.prepare(xid2);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource2.commit(xid2, false);
JMSTextMessage message = (JMSTextMessage) consumer.receive(30000);
Assert.assertEquals(message.getText(), xid2Message, "Invalid Message received");
// Xid 1
status = xaResource1.prepare(xid1);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource1.commit(xid1, false);
message = (JMSTextMessage) consumer.receive(30000);
Assert.assertEquals(message.getText(), xid1Message, "Invalid Message received");
session1.close();
session2.close();
xaConnection1.close();
xaConnection2.close();
}
/**
* Tests if rolling back a published message works correctly.Steps are,
* 1. Using a distributed transaction a message is published to a queue and rolled back
* 2. Subscribe to the published queue and see if any message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performClientQueuePublishTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "RollbackTestCasePerformClientQueuePublishTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort())
.withQueue(queueName).build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(5000);
Assert.assertNull(receive, "Message received. Message was not rolled back");
session.close();
xaConnection.close();
queueConnection.close();
}
/**
* Tests if rolling back a message acknowledgement works correctly.Steps are,
* 1. Publish a message to a queue
* 2. Using a distributed transacted session receive the message and roll back
* 3. Subscribe again using a normal session and see if the message is received
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performClientQueueAcknowledgeTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "RollbackTestCasePerformClientQueueAcknowledgeTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort())
.withQueue(queueName).build();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
// Publish message to queue
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueSession.createQueue(queueName);
MessageProducer messageProducer = queueSession.createProducer(xaTestQueue);
messageProducer.send(queueSession.createTextMessage("Test message"));
messageProducer.close();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
MessageConsumer xaConsumer = session.createConsumer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
Message receivedMessage = xaConsumer.receive(5000);
xaResource.end(xid, XAResource.TMSUCCESS);
Assert.assertNotNull(receivedMessage, "No message received");
xaResource.prepare(xid);
xaResource.rollback(xid);
xaResource.start(xid, XAResource.TMNOFLAGS);
receivedMessage = xaConsumer.receive(5000);
xaResource.end(xid, XAResource.TMSUCCESS);
Assert.assertNotNull(receivedMessage, "No message received. Roll back might have failed");
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
queueConnection.close();
}