下面列出了怎么用javax.jms.XAConnectionFactory的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void setConnectionFactory(Object toUse) {
if (toUse instanceof XAConnectionFactory) {
try {
toUse.getClass().getMethod("createContext", String.class, String.class);
LOG.info("Provided ConnectionFactory is JMS 2.0+ capable.");
jmsContextSupported = true;
} catch (NoSuchMethodException | SecurityException e) {
LOG.info("Provided ConnectionFactory is not JMS 2.0+ capable.");
}
connectionFactory = toUse;
} else {
throw new IllegalArgumentException("connectionFactory should implement javax.jms.XAConnectionFactory");
}
}
/**
* Configure factory for broker here.
*/
private static XAConnectionFactory getXAConnectionFactory() {
// replace this line with a factory for your broker, for example:
// ActiveMQXAConnectionFactory factory = new ActiveMQXAConnectionFactory(BROKER_URL);
// return factory;
return null;
}
private XAResourceRecoveryHelper getRecoveryHelper(XAConnectionFactory xaConnectionFactory) {
if (this.properties.getRecoveryJmsUser() == null && this.properties.getRecoveryJmsPass() == null) {
return new JmsXAResourceRecoveryHelper(xaConnectionFactory);
}
return new JmsXAResourceRecoveryHelper(xaConnectionFactory, this.properties.getRecoveryJmsUser(),
this.properties.getRecoveryJmsPass());
}
@Override
protected XAConnection createProviderConnection(PooledConnectionKey key) throws JMSException {
if (connectionFactory instanceof XAConnectionFactory) {
if (key.getUserName() == null && key.getPassword() == null) {
return ((XAConnectionFactory) connectionFactory).createXAConnection();
} else {
return ((XAConnectionFactory) connectionFactory).createXAConnection(key.getUserName(), key.getPassword());
}
} else {
throw new IllegalStateException("connectionFactory should implement javax.jms.XAConnectionFactory");
}
}
@Override
protected XAJMSContext createProviderContext(String username, String password, int sessionMode) {
if (connectionFactory instanceof XAConnectionFactory) {
if (username == null && password == null) {
return ((XAConnectionFactory) connectionFactory).createXAContext();
} else {
return ((XAConnectionFactory) connectionFactory).createXAContext(username, password);
}
} else {
throw new javax.jms.IllegalStateRuntimeException("connectionFactory should implement javax.jms.XAConnectionFactory");
}
}
@Test
public void testPublisherWithRollback() throws NamingException, JMSException, XAException, IOException {
String queueName = "testPublisherWithRollback";
String testMessage = "testPublisherWithRollback-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
xaConnection.start();
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK");
xaResource.rollback(xid);
// Check whether the message is published to queue.
QueueMetadata queueMetadata = restApiClient.getQueueMetadata(queueName);
Assert.assertEquals((int) queueMetadata.getSize(), 0, "Queue should be empty");
session.close();
xaConnection.close();
}
/**
* Tests if committing a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while committing dtx session.*")
public void commitDtxBranchWithoutEnding()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxCommitTestCaseCommitDtxBranchWithoutEnding";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMSUCCESS);
// xaResource.prepare(xid);
xaResource.commit(xid, false);
session.close();
xaConnection.close();
}
/**
* Tests if preparing a DTX branch after setting fail flag in dtx.end throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class)
public void prepareDtxBranchAfterEndFails()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxPrepareTestCasePrepareDtxBranchAfterEndFails";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMFAIL);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
// Wrap the caching connection factories instead of its target, because it catches
// callbacks
// such as ExceptionListener. If we don't wrap, cached callbacks like this won't
// be traced.
if (bean instanceof CachingConnectionFactory) {
return new LazyConnectionFactory(this.beanFactory,
(CachingConnectionFactory) bean);
}
if (bean instanceof JmsMessageEndpointManager) {
JmsMessageEndpointManager manager = (JmsMessageEndpointManager) bean;
MessageListener listener = manager.getMessageListener();
if (listener != null) {
manager.setMessageListener(
new LazyMessageListener(this.beanFactory, listener));
}
return bean;
}
if (bean instanceof XAConnectionFactory && bean instanceof ConnectionFactory) {
return new LazyConnectionAndXaConnectionFactory(this.beanFactory,
(ConnectionFactory) bean, (XAConnectionFactory) bean);
}
// We check XA first in case the ConnectionFactory also implements
// XAConnectionFactory
else if (bean instanceof XAConnectionFactory) {
return new LazyXAConnectionFactory(this.beanFactory,
(XAConnectionFactory) bean);
}
else if (bean instanceof TopicConnectionFactory) {
return new LazyTopicConnectionFactory(this.beanFactory,
(TopicConnectionFactory) bean);
}
else if (bean instanceof ConnectionFactory) {
return new LazyConnectionFactory(this.beanFactory, (ConnectionFactory) bean);
}
return bean;
}
private XAConnectionFactory wrappedDelegate() {
if (this.wrappedDelegate != null) {
return this.wrappedDelegate;
}
this.wrappedDelegate = jmsTracing().xaConnectionFactory(this.delegate);
return this.wrappedDelegate;
}
LazyConnectionAndXaConnectionFactory(BeanFactory beanFactory,
ConnectionFactory connectionFactoryDelegate,
XAConnectionFactory xaConnectionFactoryDelegate) {
this.connectionFactoryDelegate = new LazyConnectionFactory(beanFactory,
connectionFactoryDelegate);
this.xaConnectionFactoryDelegate = new LazyXAConnectionFactory(beanFactory,
xaConnectionFactoryDelegate);
}
static void checkXAConnection(AssertableApplicationContext ctx) throws JMSException {
// Not using try-with-resources as that doesn't exist in JMS 1.1
XAConnection con = ctx.getBean(XAConnectionFactory.class).createXAConnection();
try {
con.setExceptionListener(exception -> {
});
assertThat(con.getExceptionListener().getClass().getName())
.startsWith("brave.jms.TracingExceptionListener");
}
finally {
con.close();
}
}
private void assertNTypes(ActiveMQConnectionFactory factory, final int total) {
StringBuilder text = new StringBuilder();
text.append(factory + "\n is instance of ");
int num = 0;
if (factory instanceof ConnectionFactory) {
num++;
text.append("ConnectionFactory ");
}
if (factory instanceof XAConnectionFactory) {
num++;
text.append("XAConnectionFactory ");
}
if (factory instanceof QueueConnectionFactory) {
num++;
text.append("QueueConnectionFactory ");
}
if (factory instanceof TopicConnectionFactory) {
num++;
text.append("TopicConnectionFactory ");
}
if (factory instanceof XAQueueConnectionFactory) {
num++;
text.append("XAQueueConnectionFactory ");
}
if (factory instanceof XATopicConnectionFactory) {
num++;
text.append("XATopicConnectionFactory ");
}
Assert.assertEquals(text.toString(), total, num);
}
TracingConnectionFactory(Object delegate, JmsTracing jmsTracing) {
this.delegate = delegate;
this.jmsTracing = jmsTracing;
int types = 0;
if (delegate instanceof ConnectionFactory) types |= TYPE_CF;
if (delegate instanceof QueueConnectionFactory) types |= TYPE_QUEUE_CF;
if (delegate instanceof TopicConnectionFactory) types |= TYPE_TOPIC_CF;
if (delegate instanceof XAConnectionFactory) types |= TYPE_XA_CF;
if (delegate instanceof XAQueueConnectionFactory) types |= TYPE_XA_QUEUE_CF;
if (delegate instanceof XATopicConnectionFactory) types |= TYPE_XA_TOPIC_CF;
this.types = types;
}
@Test public void connectionFactory_wrapsXaInput() {
abstract class Both implements XAConnectionFactory, ConnectionFactory {
}
assertThat(jmsTracing.connectionFactory(mock(Both.class)))
.isInstanceOf(XAConnectionFactory.class);
}
/** */
public static void main(String[] args) throws Exception {
XAConnectionFactory factory = getXAConnectionFactory();
if (factory == null) {
throw new IllegalArgumentException("Provide a factory for the broker in the getXAConnectionFactory() method");
}
// create a connection, session and XA transaction
XAConnection conn = factory.createXAConnection();
XASession sess = conn.createXASession();
XAResource xaRes = sess.getXAResource();
Xid xid = new MyXid(1);
// start the transaction and produce one message
xaRes.start(xid, XAResource.TMNOFLAGS);
MessageProducer producer = sess.createProducer(sess.createQueue("queue"));
producer.send(sess.createTextMessage("foo"));
xaRes.end(xid, XAResource.TMSUCCESS);
// prepare the transaction
xaRes.prepare(xid);
// now disconnect. Some brokers roll back the transaction, but this is not
// compatible with Jet's fault tolerance.
conn.close();
// connect again
conn = factory.createXAConnection();
conn.start();
sess = conn.createXASession();
xaRes = sess.getXAResource();
// commit the prepared transaction
xaRes.commit(xid, false);
// check that the message is there
MessageConsumer cons = sess.createConsumer(sess.createQueue("queue"));
TextMessage msg = (TextMessage) cons.receive(TIMEOUT);
if (msg == null || !msg.getText().equals("foo")) {
System.err.println("Message is missing or has wrong text, transaction probably lost");
} else {
System.out.println("Success!");
}
conn.close();
}
@Override
public ConnectionFactory wrapConnectionFactory(XAConnectionFactory xaConnectionFactory) {
XAResourceRecoveryHelper recoveryHelper = getRecoveryHelper(xaConnectionFactory);
this.xaRecoveryModule.addXAResourceRecoveryHelper(recoveryHelper);
return new ConnectionFactoryProxy(xaConnectionFactory, new TransactionHelperImpl(this.transactionManager));
}
XAConnectionFactoryOnly(XAConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Test
public void testPublisherWithCommit() throws NamingException, JMSException, XAException {
String queueName = "testPublisherWithCommit";
String testMessage = "testPublisherWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
xaConnection.start();
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK");
xaResource.commit(xid, false);
// Test by consuming the committed message.
ConnectionFactory connectionFactory =
(ConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
Connection connection = connectionFactory.createConnection();
Session receivingSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = receivingSession.createConsumer(receivingSession.createQueue(queueName));
connection.start();
TextMessage message = (TextMessage) consumer.receive(3000);
Assert.assertNotNull(message, "Didn't receive a message");
Assert.assertEquals(message.getText(), testMessage, "Received message should match sent message");
session.close();
xaConnection.close();
connection.close();
}
@Test
public void testConsumerWithCommit() throws Exception {
String queueName = "testConsumerWithCommit";
String testMessage = "testConsumerWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
// Setup XA connection
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
xaConnection.start();
producer.send(session.createTextMessage(testMessage));
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
TextMessage message = (TextMessage) consumer.receive(2000);
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK.");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
Assert.assertNotNull(message, "Sent message should be consumed by the consumer.");
Assert.assertEquals(message.getText(), testMessage, "Received message should match the sent message.");
// Check whether the message is published to queue.
QueueMetadata queueMetadata = restApiClient.getQueueMetadata(queueName);
Assert.assertEquals((int) queueMetadata.getSize(), 0, "Queue should be empty");
}
@Test
public void testConsumerWithRollback() throws Exception {
String queueName = "testConsumerWithRollback";
String testMessage = "testConsumerWithRollback-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
// Setup XA connection
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
xaConnection.start();
producer.send(session.createTextMessage(testMessage));
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
TextMessage message = (TextMessage) consumer.receive(2000);
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK.");
xaResource.rollback(xid);
session.close();
xaConnection.close();
Assert.assertNotNull(message, "Sent message should be consumed by the consumer.");
Assert.assertEquals(message.getText(), testMessage, "Received message should match the sent message.");
// Check whether the message is published to queue.
QueueMetadata queueMetadata = restApiClient.getQueueMetadata(queueName);
Assert.assertEquals((int) queueMetadata.getSize(), 1, "Queue should be non empty");
}
@Override
public XAConnectionFactory createXaConnectionFactory(JmsTopicConfig config) throws NamingException
{
return new TibjmsXAConnectionFactory(this.brokerUrl);
}
@Override
public XAConnectionFactory createXaConnectionFactory(JmsTopicConfig config) throws NamingException
{
return new InMemoryXaConnectionFactory(this);
}
/**
* Tests if one-phase commit in distributed transactions, is working correctly for message consumption. Steps
* are,
* 1. Publish a message to a queue
* 2. Using a distributed transacted session receive the message and commit in one-phase
* 3. Subscribe again using a normal session and verify that a message is not received
*/
@Test(groups = { "wso2.mb", "dtx" })
public void onePhaseCommitMessageConsumptionTest() throws Exception {
String queueName = "DtxOnePhaseCommitMessageConsumptionTest";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort()).withQueue(queueName).build();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
// Publish message to queue
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueSession.createQueue(queueName);
MessageProducer messageProducer = queueSession.createProducer(xaTestQueue);
messageProducer.send(queueSession.createTextMessage("Test message consumption"));
messageProducer.close();
// Publish to queue and commit in one-phase
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
MessageConsumer xaConsumer = session.createConsumer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
Message receivedMessage = xaConsumer.receive(10000);
xaResource.end(xid, XAResource.TMSUCCESS);
Assert.assertNotNull(receivedMessage, "No message received");
xaResource.commit(xid, true);
session.close();
xaConnection.close();
// subscribe and see if the message is received
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receivedMessageAfterOnephaseCommit = messageConsumer.receive(10000);
Assert.assertNull(receivedMessageAfterOnephaseCommit,
"Message received. One phase commit might have failed");
queueConnection.close();
}
/**
* Tests if one-phase commit in distributed transactions, is working correctly for message publishing.
* Steps are,
* 1. Using a distributed transaction publish a message to a queue and commit in one-phase
* 2. Subscribe to the published queue and see if the message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performClientQueuePublishTestCase() throws Exception {
String queueName = "DtxOnePhaseCommitMessagePublishingTest";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort()).withQueue(queueName).build();
// Publish to queue and commit in one-phase
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test Message publishing"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.commit(xid, true);
session.close();
xaConnection.close();
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(10000);
Assert.assertNotNull(receive, "Message was not received. One-phase commit might have failed");
queueConnection.close();
}
/**
* Tests if committing a published message works correctly without a client ID.Steps are,
* 1. Using a distributed transaction a message is published to a queue and committed
* 2. Subscribe to the published queue and see if the message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void performDtxClientQueuePublishTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "ClientIDNullTestCaseDtxPerformClientQueuePublishTestCase";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin",
"admin",
"localhost",
getAMQPPort())
.withNoClientId()
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
int ret = xaResource.prepare(xid);
Assert.assertEquals(ret, XAResource.XA_OK, "Dtx.prepare was not successful.");
xaResource.commit(xid, false);
session.close();
xaConnection.close();
// subscribe and see if the message is received
ConnectionFactory queueConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection queueConnection = queueConnectionFactory.createConnection();
queueConnection.start();
Session queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = queueSession.createConsumer(xaTestQueue);
// wait 5 seconds
Message receive = messageConsumer.receive(5000);
Assert.assertNotNull(receive, "Message was not received.");
queueConnection.close();
}
/**
* Tests if committing a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while committing dtx session.*")
public void commitDtxBranchWithoutStarting()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxCommitTestCaseCommitDtxBranchWithoutStarting";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
// xaResource.start(xid, XAResource.TMNOFLAGS);
// producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMSUCCESS);
// xaResource.prepare(xid);
xaResource.commit(xid, false);
session.close();
xaConnection.close();
}
/**
* Tests if committing a prepared branch with onephase throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while committing dtx session.*")
public void commitDtxBranchWithOnephaseAfterPrepare()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxCommitTestCaseCmmitDtxBranchWithOnephaseAfterPrepare";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
xaResource.prepare(xid);
// one phase should be false
xaResource.commit(xid, true);
session.close();
xaConnection.close();
}
/**
* Tests if committing a branch without preparing throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while committing dtx session.*")
public void commitDtxBranchWithoutPrepare()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxCommitTestCaseCommitDtxBranchWithoutEnding";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMSUCCESS);
// Should prepare before commit
// xaResource.prepare(xid);
xaResource.commit(xid, false);
session.close();
xaConnection.close();
}
/**
* Tests if rolling back a DTX branch without starting it throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class,
expectedExceptionsMessageRegExp = ".*Error while rolling back dtx session.*")
public void rollbackDtxBranchWithoutStarting()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxRollbackTestCaseRollbackDtxBranchWithoutStarting";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
// xaResource.start(xid, XAResource.TMNOFLAGS);
// producer.send(session.createTextMessage("Test 1"));
// xaResource.end(xid, XAResource.TMSUCCESS);
// xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}