下面列出了javax.jms.XASession#getXAResource ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testGetXAResource2() throws Exception {
XAConnection conn = getXAConnectionFactory().createXAConnection();
XASession sess = conn.createXASession();
sess.getXAResource();
conn.close();
}
/**
* Tests if committing a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while committing dtx session.*")
public void commitDtxBranchWithoutEnding()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxCommitTestCaseCommitDtxBranchWithoutEnding";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMSUCCESS);
// xaResource.prepare(xid);
xaResource.commit(xid, false);
session.close();
xaConnection.close();
}
@Test
public void testOutgoingXAResourceWrapper() throws Exception {
XAQueueConnection queueConnection = qraConnectionFactory.createXAQueueConnection();
XASession s = queueConnection.createXASession();
XAResource resource = s.getXAResource();
assertTrue(resource instanceof ActiveMQXAResourceWrapper);
ActiveMQXAResourceWrapperImpl xaResourceWrapper = (ActiveMQXAResourceWrapperImpl) resource;
assertTrue(xaResourceWrapper.getJndiName().equals("java://jmsXA NodeId:" + server.getNodeID()));
assertTrue(xaResourceWrapper.getProductVersion().equals(VersionLoader.getVersion().getFullVersion()));
assertTrue(xaResourceWrapper.getProductName().equals(ActiveMQResourceAdapter.PRODUCT_NAME));
}
/**
* Tests if message acknowledgement works correctly with session suspend and resume. Steps are,
* 1. Publish a message to a queue
* 2. Using a distributed transacted session receive the message and suspend the session
* 3. Publish a message to the queue
* 4. Resume the session again, ack and commit
* 5. Subscribe again using a normal session and see if any message is received
*/
@Test(groups = { "wso2.mb", "dtx" })
public void suspendResumeMessageAckTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxStartPositiveTestCaseSuspendResumeMessageAckTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
// Publish message to queue
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueSession.createQueue(queueName);
MessageProducer messageProducer = queueSession.createProducer(xaTestQueue);
messageProducer.send(queueSession.createTextMessage("Test message 1"));
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
MessageConsumer xaConsumer = session.createConsumer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
Message receivedMessage = xaConsumer.receive(5000);
xaResource.end(xid, XAResource.TMSUSPEND);
Assert.assertNotNull(receivedMessage, "No message received");
messageProducer.send(queueSession.createTextMessage("Test message 2"));
messageProducer.close();
xaResource.start(xid, XAResource.TMRESUME);
receivedMessage = xaConsumer.receive(5000);
xaResource.end(xid, XAResource.TMSUCCESS);
Assert.assertNotNull(receivedMessage, "No message received");
int ret = xaResource.prepare(xid);
Assert.assertEquals(ret, XAResource.XA_OK, "Dtx.prepare was not successful.");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
// subscribe and see if the message is received
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receivedMessageFromNormalConnection = messageConsumer.receive(5000);
Assert.assertNull(receivedMessageFromNormalConnection, "Message received. Commit might have failed");
queueConnection.close();
}
public void testSessionCloseTransactionalSendReceive() throws Exception {
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection) cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
session.close();
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
session.close();
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
assertNull(consumer.receive(1000));
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
}
@Test
public void testMultipleSessionsOneTxCommitSend1PCOptimization() throws Exception {
// Since both resources have some RM, TM will probably use 1PC optimization
XAConnection conn = null;
Connection conn2 = null;
try {
conn = xacf.createXAConnection();
conn.start();
tm.begin();
// Create 2 sessions and enlist them
XASession sess1 = conn.createXASession();
XAResource res1 = sess1.getXAResource();
XASession sess2 = conn.createXASession();
XAResource res2 = sess2.getXAResource();
Transaction tx = tm.getTransaction();
tx.enlistResource(res1);
tx.enlistResource(res2);
// Send 2 messages - one from each session
MessageProducer prod1 = sess1.createProducer(queue1);
MessageProducer prod2 = sess2.createProducer(queue1);
prod1.send(sess1.createTextMessage("echidna1"));
prod2.send(sess2.createTextMessage("echidna2"));
tx.delistResource(res1, XAResource.TMSUCCESS);
tx.delistResource(res2, XAResource.TMSUCCESS);
// commit
tm.commit();
// Messages should be in queue
conn2 = cf.createConnection();
Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = sess.createConsumer(queue1);
conn2.start();
TextMessage r1 = (TextMessage) cons.receive(5000);
Assert.assertNotNull(r1);
Assert.assertEquals("echidna1", r1.getText());
TextMessage r2 = (TextMessage) cons.receive(5000);
Assert.assertNotNull(r2);
Assert.assertEquals("echidna2", r2.getText());
} finally {
if (conn != null) {
conn.close();
}
if (conn2 != null) {
conn2.close();
}
}
}
@Test
public void testOneSessionTwoTransactionsCommitSend() throws Exception {
XAConnection conn = null;
Connection conn2 = null;
try {
conn = xacf.createXAConnection();
// Create a session
XASession sess1 = conn.createXASession();
XAResource res1 = sess1.getXAResource();
MessageProducer prod1 = sess1.createProducer(queue1);
tm.begin();
Transaction tx1 = tm.getTransaction();
tx1.enlistResource(res1);
// Send a message
prod1.send(sess1.createTextMessage("kangaroo1"));
tx1.delistResource(res1, XAResource.TMSUCCESS);
// suspend the tx
Transaction suspended = tm.suspend();
tm.begin();
// Send another message in another tx using the same session
Transaction tx2 = tm.getTransaction();
tx2.enlistResource(res1);
// Send a message
prod1.send(sess1.createTextMessage("kangaroo2"));
tx2.delistResource(res1, XAResource.TMSUCCESS);
// commit this transaction
tm.commit();
// verify only kangaroo2 message is sent
conn2 = cf.createConnection();
Session sess = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn2.start();
MessageConsumer cons = sess.createConsumer(queue1);
TextMessage r1 = (TextMessage) cons.receive(5000);
Assert.assertNotNull(r1);
Assert.assertEquals("kangaroo2", r1.getText());
TextMessage r2 = (TextMessage) cons.receive(100);
Assert.assertNull(r2);
// now resume the first tx and then commit it
tm.resume(suspended);
tm.commit();
// verify that the first text message is received
TextMessage r3 = (TextMessage) cons.receive(5000);
Assert.assertNotNull(r3);
Assert.assertEquals("kangaroo1", r3.getText());
} finally {
if (conn != null) {
conn.close();
}
if (conn2 != null) {
conn2.close();
}
}
}
@Test
public void testConsumerWithRollback() throws Exception {
String queueName = "testConsumerWithRollback";
String testMessage = "testConsumerWithRollback-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
// Setup XA connection
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
xaConnection.start();
producer.send(session.createTextMessage(testMessage));
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
TextMessage message = (TextMessage) consumer.receive(2000);
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK.");
xaResource.rollback(xid);
session.close();
xaConnection.close();
Assert.assertNotNull(message, "Sent message should be consumed by the consumer.");
Assert.assertEquals(message.getText(), testMessage, "Received message should match the sent message.");
// Check whether the message is published to queue.
QueueMetadata queueMetadata = restApiClient.getQueueMetadata(queueName);
Assert.assertEquals((int) queueMetadata.getSize(), 1, "Queue should be non empty");
}
/**
* Tests if one-phase commit in distributed transactions, is working correctly for message consumption. Steps
* are,
* 1. Publish a message to a queue
* 2. Using a distributed transacted session receive the message and commit in one-phase
* 3. Subscribe again using a normal session and verify that a message is not received
*/
@Test(groups = { "wso2.mb", "dtx" })
public void onePhaseCommitMessageConsumptionTest() throws Exception {
String queueName = "DtxOnePhaseCommitMessageConsumptionTest";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort()).withQueue(queueName).build();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
// Publish message to queue
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueSession.createQueue(queueName);
MessageProducer messageProducer = queueSession.createProducer(xaTestQueue);
messageProducer.send(queueSession.createTextMessage("Test message consumption"));
messageProducer.close();
// Publish to queue and commit in one-phase
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
MessageConsumer xaConsumer = session.createConsumer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
Message receivedMessage = xaConsumer.receive(10000);
xaResource.end(xid, XAResource.TMSUCCESS);
Assert.assertNotNull(receivedMessage, "No message received");
xaResource.commit(xid, true);
session.close();
xaConnection.close();
// subscribe and see if the message is received
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receivedMessageAfterOnephaseCommit = messageConsumer.receive(10000);
Assert.assertNull(receivedMessageAfterOnephaseCommit,
"Message received. One phase commit might have failed");
queueConnection.close();
}
/**
* Tests if one-phase commit in distributed transactions, is working correctly for message publishing.
* Steps are,
* 1. Using a distributed transaction publish a message to a queue and commit in one-phase
* 2. Subscribe to the published queue and see if the message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performClientQueuePublishTestCase() throws Exception {
String queueName = "DtxOnePhaseCommitMessagePublishingTest";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort()).withQueue(queueName).build();
// Publish to queue and commit in one-phase
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test Message publishing"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.commit(xid, true);
session.close();
xaConnection.close();
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(10000);
Assert.assertNotNull(receive, "Message was not received. One-phase commit might have failed");
queueConnection.close();
}
/**
* Tests if resuming a new XID will throw an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while starting dtx session.*")
public void startAnAlreadyStartedDtxBranch()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxStartTestCaseStartAnAlreadyStartedDtxBranch";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
//
XAConnection xaConnectionDuplicate = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSessionDuplicate = xaConnectionDuplicate.createXASession();
XAResource xaResourceDuplicate = xaSessionDuplicate.getXAResource();
Session sessionDuplicate = xaSessionDuplicate.getSession();
MessageProducer producerDuplicate = sessionDuplicate.createProducer(xaTestQueue);
xaResourceDuplicate.start(xid, XAResource.TMNOFLAGS);
// Below this line should not execute
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Tests if committing a prepared branch with onephase throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while committing dtx session.*")
public void commitDtxBranchWithOnephaseAfterPrepare()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxCommitTestCaseCmmitDtxBranchWithOnephaseAfterPrepare";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
// one phase should be false
xaResource.commit(xid, true);
session.close();
xaConnection.close();
}
/**
* Tests if acknowledging a messages works correctly with session joining. Steps are,
* 1. Publish two messages to two queues using two non-transacted sessions
* 2. Create two distributed transaction sessions and join one session to other.
* 3. Receive messages and ack using two sessions.
* 4. Commit the session
* 5. Subscribe to the published queue and see if any message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void xaStartJoinMessageAckTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueNameOne = "DtxStartPositiveTestCaseXaStartJoinMessageAckTestCaseOne";
String queueNameTwo = "DtxStartPositiveTestCaseXaStartJoinMessageAckTestCaseTwo";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueNameOne).build();
ConnectionFactory nonXaConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection nonXaQueueConnection = nonXaConnectionFactory.createConnection();
nonXaQueueConnection.start();
Session nonXaQueueSessionOne = nonXaQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination xaTestQueueOne = nonXaQueueSessionOne.createQueue(queueNameOne);
Destination xaTestQueueTwo = nonXaQueueSessionOne.createQueue(queueNameTwo);
MessageProducer nonXaQueueSessionProducerOne = nonXaQueueSessionOne.createProducer(xaTestQueueOne);
MessageProducer nonXaQueueSessionProducerTwo = nonXaQueueSessionOne.createProducer(xaTestQueueTwo);
nonXaQueueSessionProducerOne.send(nonXaQueueSessionOne.createTextMessage("Message 1"));
nonXaQueueSessionProducerTwo.send(nonXaQueueSessionOne.createTextMessage("Message 2"));
nonXaQueueSessionProducerOne.close();
nonXaQueueSessionProducerTwo.close();
XAConnectionFactory xaConnectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
// Create XA resource one
XAConnection xaConnectionOne = xaConnectionFactory.createXAConnection();
xaConnectionOne.start();
XASession xaSessionOne = xaConnectionOne.createXASession();
XAResource xaResourceOne = xaSessionOne.getXAResource();
Session sessionOne = xaSessionOne.getSession();
MessageConsumer xaConsumerOne = sessionOne.createConsumer(xaTestQueueOne);
// Create XA resource two
XAConnection xaConnectionTwo = xaConnectionFactory.createXAConnection();
xaConnectionTwo.start();
XASession xaSessionTwo = xaConnectionTwo.createXASession();
XAResource xaResourceTwo = xaSessionTwo.getXAResource();
Session sessionTwo = xaSessionTwo.getSession();
MessageConsumer xaConsumerTwo = sessionTwo.createConsumer(xaTestQueueTwo);
Xid xid = JMSClientHelper.getNewXid();
boolean sameRM = xaResourceOne.isSameRM(xaResourceTwo);
Assert.assertEquals(sameRM, true, "Resource one and resource two are connected to different resource "
+ "managers");
xaResourceOne.start(xid, XAResource.TMNOFLAGS);
xaResourceTwo.start(xid, XAResource.TMJOIN);
Message receivedMessageForQueueOne = xaConsumerOne.receive(5000);
Assert.assertNotNull(receivedMessageForQueueOne, "A message was not received for queue " + queueNameOne);
Message receivedMessageForQueueTwo = xaConsumerTwo.receive(5000);
Assert.assertNotNull(receivedMessageForQueueTwo, "A message was not received for queue " + queueNameTwo);
xaResourceOne.end(xid, XAResource.TMSUCCESS);
xaResourceOne.prepare(xid);
xaResourceOne.commit(xid, false);
xaConnectionOne.close();
xaConnectionTwo.close();
// subscribe and see if the message is received
MessageConsumer nonXaConsumerOne = nonXaQueueSessionOne.createConsumer(xaTestQueueOne);
Session nonXaQueueSessionTwo = nonXaQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer nonXaConsumerTwo = nonXaQueueSessionTwo.createConsumer(xaTestQueueTwo);
// wait 3 seconds
receivedMessageForQueueOne = nonXaConsumerOne.receive(3000);
Assert.assertNull(receivedMessageForQueueOne, "Message received after committing for queue " + queueNameOne);
receivedMessageForQueueTwo = nonXaConsumerTwo.receive(3000);
Assert.assertNull(receivedMessageForQueueTwo, "Message received after committing for queue " + queueNameTwo);
nonXaQueueConnection.close();
}
/**
* Tests if rolling back a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while rolling back dtx session.*")
public void rollbackDtxBranchWithoutStarting()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxRollbackTestCaseRollbackDtxBranchWithoutStarting";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
// xaResource.start(xid, XAResource.TMNOFLAGS);
// producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMSUCCESS);
// xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Tests if rolling back a message acknowledgement works correctly. Steps are,
* 1. Publish a message to a queue
* 2. Using a distributed transacted session receive the message and commit
* 3. Subscribe again using a normal session and see if any message is received
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performClientQueueAcknowledgeTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "CommitTestCasePerformClientQueueAcknowledgeTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort())
.withQueue(queueName).build();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
// Publish message to queue
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueSession.createQueue(queueName);
MessageProducer messageProducer = queueSession.createProducer(xaTestQueue);
messageProducer.send(queueSession.createTextMessage("Test message"));
messageProducer.close();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
MessageConsumer xaConsumer = session.createConsumer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
Message receivedMessage = xaConsumer.receive(5000);
xaResource.end(xid, XAResource.TMSUCCESS);
Assert.assertNotNull(receivedMessage, "No message received");
int ret = xaResource.prepare(xid);
Assert.assertEquals(ret, XAResource.XA_OK, "Dtx.prepare was not successful.");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
// subscribe and see if the message is received
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receivedMessageFromNormalConnection = messageConsumer.receive(5000);
Assert.assertNull(receivedMessageFromNormalConnection, "Message received. Commit might have failed");
queueConnection.close();
}
/**
* Tests if ending a dtx branch started in a different session throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while ending dtx session.*")
public void endDtxBranchBelongToADifferentSession()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxEndTestCaseEndDtxBranchBelongToADifferentSession";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
xaResource.start(xid, XAResource.TMNOFLAGS);
XAConnection secondXaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession secondXaSession = secondXaConnection.createXASession();
XAResource secondXaResource = secondXaSession.getXAResource();
producer.send(session.createTextMessage("Test 1"));
secondXaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
/**
* Publish two messages with the same publisher inside two separate transactions
* <p>
* publish messages within two separate transactions
* commit the second transaction
* receive the second message published
* commit the first transaction
* receive the first message published
*
* @throws XPathExpressionException Error on reading AMQP port
* @throws NamingException Throws when Initial context lookup failed
* @throws JMSException Exception related to JMS
* @throws XAException Throws when error occurred in XA transaction
*/
@Test(groups = {"wso2.mb", "dtx"})
public void PublishWithMultipleXidTestCase() throws XPathExpressionException, NamingException,
JMSException, XAException {
String queueName = "PublishWithMultipleXidTestCase";
String xid1Message = "PublishWithMultipleXidTestCase-Msg-1";
String xid2Message = "PublishWithMultipleXidTestCase-Msg-2";
InitialContext initialContext =
JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(xaTestQueue);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid1 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x05});
Xid xid2 = new TestXidImpl(100, new byte[]{0x01}, new byte[]{0x06});
xaResource.start(xid1, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage(xid1Message));
xaResource.end(xid1, XAResource.TMSUCCESS);
xaResource.start(xid2, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage(xid2Message));
xaResource.end(xid2, XAResource.TMSUCCESS);
// Xid 2
int status = xaResource.prepare(xid2);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource.commit(xid2, false);
JMSTextMessage message = (JMSTextMessage) consumer.receive(30000);
Assert.assertEquals(message.getText(), xid2Message, "Invalid Message received");
// Xid 1
status = xaResource.prepare(xid1);
Assert.assertEquals(status, XAResource.XA_OK, "Prepare state failed for distributed transaction");
xaResource.commit(xid1, false);
JMSTextMessage message2 = (JMSTextMessage) consumer.receive(30000);
Assert.assertEquals(message2.getText(), xid1Message, "Invalid Message received");
session.close();
xaConnection.close();
}
/**
* Tests if publishing messages works correctly with session joining. Steps are,
* 1. Create two distributed transaction sessions and join one session to other.
* 2. Publish messages using two sessions.
* 3. Subscribe to the published queue and see if any message is received.
* 4. Commit the session
* 5. Subscribe to the queue and see if two messages are received
*/
@Test(groups = { "wso2.mb", "dtx" })
public void xaMultiSessionPublishTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxStartPositiveTestCaseXaMultiSessionPublishTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueName).build();
XAConnectionFactory xaConnectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
// Create XA resource one
XAConnection xaConnectionOne = xaConnectionFactory.createXAConnection();
xaConnectionOne.start();
XASession xaSessionOne = xaConnectionOne.createXASession();
XAResource xaResourceOne = xaSessionOne.getXAResource();
Session sessionOne = xaSessionOne.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
sessionOne.createQueue(queueName);
MessageProducer producerOne = sessionOne.createProducer(xaTestQueue);
// Create XA resource two
XASession xaSessionTwo = xaConnectionOne.createXASession();
XAResource xaResourceTwo = xaSessionTwo.getXAResource();
Session sessionTwo = xaSessionTwo.getSession();
MessageProducer producerTwo = sessionTwo.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
boolean sameRM = xaResourceOne.isSameRM(xaResourceTwo);
Assert.assertEquals(sameRM, true, "Resource one and resource two are connected to different resource "
+ "managers");
xaResourceOne.start(xid, XAResource.TMNOFLAGS);
xaResourceTwo.start(xid, XAResource.TMJOIN);
producerOne.send(sessionOne.createTextMessage("Test 1"));
producerTwo.send(sessionTwo.createTextMessage("Test 2"));
xaResourceOne.end(xid, XAResource.TMSUCCESS);
// subscribe and see if the message is received
ConnectionFactory nonXaConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection nonXaQueueConnection = nonXaConnectionFactory.createConnection();
nonXaQueueConnection.start();
Session nonXaQueueSession = nonXaQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = nonXaQueueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(5000);
Assert.assertNull(receive, "Message received before committing");
xaResourceOne.prepare(xid);
xaResourceOne.commit(xid, false);
xaConnectionOne.close();
//This is only added to find out the reason for the intermittent failure of this test method. Should be removed
// once the issue is identified.
try {
// Logging in
LoginLogoutClient loginLogoutClientForAdmin = new LoginLogoutClient(super.automationContext);
String sessionCookie = loginLogoutClientForAdmin.login();
AndesAdminClient admin = new AndesAdminClient(super.backendURL, sessionCookie);
//Check message count in queue
org.wso2.carbon.andes.stub.admin.types.Message[] queueOneMessages
= admin.browseQueue(queueName, 0, 10);
Assert.assertEquals(queueOneMessages.length, 2, "Message not published to queue " + queueName);
//Logging out
loginLogoutClientForAdmin.logout();
} catch (RemoteException | AutomationUtilException | AndesAdminServiceBrokerManagerAdminException
| LogoutAuthenticationExceptionException e) {
e.printStackTrace();
}
receive = messageConsumer.receive(5000);
Assert.assertNotNull(receive, "Message not received");
receive = messageConsumer.receive(5000);
Assert.assertNotNull(receive, "Message not received");
nonXaQueueConnection.close();
}
/**
* Tests if preparing a DTX branch with publishing permission issues throws an error
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class)
public void prepareDtxBranchWithNoRoutesIssue()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxPrepareTestCasePrepareDtxBranchWithNoRoutesIssues";
User adminUser = getSuperTenantAdminUser();
InitialContext adminInitialContext
= JMSClientHelper.createInitialContextBuilder(adminUser.getUserNameWithoutDomain(),
adminUser.getPassword(),
getBrokerHost(),
getAMQPPort()).withQueue(queueName).build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory
= (XAConnectionFactory) adminInitialContext.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination testQueue = (Destination) adminInitialContext.lookup(queueName);
MessageProducer producer = session.createProducer(testQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
// Test should fail at prepare stage due to no route issue
int prepareResponseCode = xaResource.prepare(xid);
Assert.assertNotEquals(prepareResponseCode, XAResource.XA_OK, "Prepare should fail due to no route issue");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
}
/**
* Tests if rolling back a published message works correctly.Steps are,
* 1. Using a distributed transaction a message is published to a queue and rolled back
* 2. Subscribe to the published queue and see if any message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performClientQueuePublishTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "RollbackTestCasePerformClientQueuePublishTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort())
.withQueue(queueName).build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
xaResource.rollback(xid);
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(5000);
Assert.assertNull(receive, "Message received. Message was not rolled back");
session.close();
xaConnection.close();
queueConnection.close();
}