下面列出了怎么用javax.jms.XAConnection的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Create a XA connection
*
* @param userName The user name
* @param password The password
* @return The connection
* @throws JMSException Thrown if the operation fails
*/
@Override
public XAConnection createXAConnection(final String userName, final String password) throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createXAConnection(" + userName + ", ****)");
}
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.XA_CONNECTION);
s.setUserName(userName);
s.setPassword(password);
validateUser(s);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("Created connection: " + s);
}
return s;
}
private void assertConnectionType(Connection conn, String type) {
if ("generic".equals(type) || "queue".equals(type) || "topic".equals(type)) {
//generic
Assert.assertFalse(conn instanceof XAConnection);
Assert.assertTrue(conn instanceof QueueConnection);
Assert.assertFalse(conn instanceof XAQueueConnection);
Assert.assertTrue(conn instanceof TopicConnection);
Assert.assertFalse(conn instanceof XATopicConnection);
} else if ("xa".equals(type) || "xa-queue".equals(type) || "xa-topic".equals(type)) {
Assert.assertTrue(conn instanceof XAConnection);
Assert.assertTrue(conn instanceof QueueConnection);
Assert.assertTrue(conn instanceof XAQueueConnection);
Assert.assertTrue(conn instanceof TopicConnection);
Assert.assertTrue(conn instanceof XATopicConnection);
} else {
Assert.fail("Unknown connection type: " + type);
}
}
@Test
public void testXAPrepare() throws Exception {
try {
XAConnection connection = xaFactory.createXAConnection();
XASession xasession = connection.createXASession();
Xid xid = newXID();
xasession.getXAResource().start(xid, XAResource.TMNOFLAGS);
Queue queue = xasession.createQueue(queueName);
MessageProducer producer = xasession.createProducer(queue);
producer.send(xasession.createTextMessage("hello"));
producer.send(xasession.createTextMessage("hello"));
xasession.getXAResource().end(xid, XAResource.TMSUCCESS);
xasession.getXAResource().prepare(xid);
connection.close();
System.err.println("Done!!!");
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testXAResourceCommittedRemoved() throws Exception {
Queue queue = null;
Xid xid = newXID();
try (XAConnection xaconnection = xaFactory.createXAConnection()) {
XASession session = xaconnection.createXASession();
queue = session.createQueue(queueName);
session.getXAResource().start(xid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("xa message"));
session.getXAResource().end(xid, XAResource.TMSUCCESS);
session.getXAResource().commit(xid, true);
}
XidImpl xid1 = new XidImpl(xid);
Transaction transaction = server.getResourceManager().getTransaction(xid1);
assertNull(transaction);
}
protected Session session() {
if (session == null) {
synchronized (this) {
if (closed) {
throw new IllegalStateRuntimeException("Context is closed");
}
if (session == null) {
try {
Connection connection = connection();
if (xa) {
session = XAConnection.class.cast(connection).createXASession();
} else {
session = connection.createSession(sessionMode);
}
} catch (final JMSException e) {
throw toRuntimeException(e);
}
}
}
}
return session;
}
@Test
public void testIsSamRM() throws Exception {
XAConnection conn = null;
conn = xacf.createXAConnection();
// Create a session
XASession sess1 = conn.createXASession();
XAResource res1 = sess1.getXAResource();
// Create a session
XASession sess2 = conn.createXASession();
XAResource res2 = sess2.getXAResource();
Assert.assertTrue(res1.isSameRM(res2));
}
public void testRollbackXaErrorCode() throws Exception {
String brokerName = "rollbackErrorCode";
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
broker.start();
broker.waitUntilStarted();
ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
XAConnection connection = (XAConnection) cf.createConnection();
connection.start();
XASession session = connection.createXASession();
XAResource resource = session.getXAResource();
Xid tid = createXid();
try {
resource.rollback(tid);
fail("Expected xa exception on no tx");
} catch (XAException expected) {
LOG.info("got expected xa", expected);
assertEquals("no tx", XAException.XAER_NOTA, expected.errorCode);
}
connection.close();
broker.stop();
}
/**
*
*/
private void checkSession() {
if (session == null) {
synchronized (this) {
if (closed)
throw new IllegalStateRuntimeException("Context is closed");
if (session == null) {
try {
if (xa) {
session = ((XAConnection) connection).createXASession();
} else {
session = connection.createSession(sessionMode);
}
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
}
}
}
}
@Override
protected XAConnection createProviderConnection(PooledConnectionKey key) throws JMSException {
if (connectionFactory instanceof XAConnectionFactory) {
if (key.getUserName() == null && key.getPassword() == null) {
return ((XAConnectionFactory) connectionFactory).createXAConnection();
} else {
return ((XAConnectionFactory) connectionFactory).createXAConnection(key.getUserName(), key.getPassword());
}
} else {
throw new IllegalStateException("connectionFactory should implement javax.jms.XAConnectionFactory");
}
}
@Test
public void testPublisherWithRollback() throws NamingException, JMSException, XAException, IOException {
String queueName = "testPublisherWithRollback";
String testMessage = "testPublisherWithRollback-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory()
.withQueue(queueName)
.build();
XAConnectionFactory xaConnectionFactory =
(XAConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XAConnection xaConnection = xaConnectionFactory.createXAConnection();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
xaConnection.start();
XidImpl xid = new XidImpl(0, "branchId_1".getBytes(), "globalId_1".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int prepareOK = xaResource.prepare(xid);
Assert.assertEquals(prepareOK, XAResource.XA_OK, "Prepare phase should return XA_OK");
xaResource.rollback(xid);
// Check whether the message is published to queue.
QueueMetadata queueMetadata = restApiClient.getQueueMetadata(queueName);
Assert.assertEquals((int) queueMetadata.getSize(), 0, "Queue should be empty");
session.close();
xaConnection.close();
}
@Override
public XAConnection createConnection() throws JMSException, NamingException
{
String durableConsumerName = this.getDurableConsumerName();
if (durableConsumerName != null)
{
return createConnectionForDurable(durableConsumerName);
}
if (usePooling)
{
return getOrCreatePooledConnection();
}
return createUnpooledConnection();
}
protected XAConnection createUnpooledConnection() throws JMSException, NamingException
{
XAConnection connection = connect();
String clientId = this.clientIdPrefix + "_" + publisherId.incrementAndGet();
LOGGER.debug("Setting client id to {} for topic {} on broker {}", clientId, this.getTopicName(), this.brokerUrl);
connection.setClientID(clientId);
return connection;
}
protected XAConnection createConnectionForDurable(String durableConsumerName) throws JMSException, NamingException
{
XAConnection connection = connect();
LOGGER.debug("Setting client id to {} for topic {} on broker {}", durableConsumerName, this.getTopicName(), this.brokerUrl);
connection.setClientID(durableConsumerName);
return connection;
}
protected XAConnection getOrCreatePooledConnection() throws JMSException, NamingException
{
if (this.assignedPostfix < 0)
{
this.assignedPostfix = publisherId.get();
}
XaConnnectionKey key = makeConnectionKey();
synchronized (pool)
{
RefCountedJmsXaConnection connection = pool.get(key);
if (connection == null)
{
XAConnection underlying = connect();
String clientId = this.clientIdPrefix + "_" + this.assignedPostfix + "_" + forcedConnectionCounter.incrementAndGet();
LOGGER.info("Pooled publisher connection to broker {} for user {} with client id {}", this.brokerUrl, this.getUserName(), clientId);
underlying.setClientID(clientId);
connection = new RefCountedJmsXaConnection(underlying);
pool.put(key, connection);
}
else
{
connection.incrementCount();
}
return connection;
}
}
/**
* Create a XA connection
*
* @return The connection
* @throws JMSException Thrown if the operation fails
*/
@Override
public XAConnection createXAConnection() throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createXAConnection()");
}
ActiveMQRASessionFactoryImpl s = new ActiveMQRASessionFactoryImpl(mcf, cm, getResourceAdapter().getTM(), ActiveMQRAConnectionFactory.XA_CONNECTION);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("Created connection: " + s);
}
return s;
}
@Test
public void testGetXAResource2() throws Exception {
XAConnection conn = getXAConnectionFactory().createXAConnection();
XASession sess = conn.createXASession();
sess.getXAResource();
conn.close();
}
@Override
public XASession createXASession() throws JMSException {
XASession session = ((XAConnection) connection.getPhysicalConnection()).createXASession();
try {
OpenEJB.getTransactionManager().getTransaction().enlistResource(session.getXAResource());
} catch (IllegalStateException | SystemException | RollbackException e) {
throw new RuntimeException(e);
}
return session;
}
@Test
public void testGetSession2() throws Exception {
deployConnectionFactory(0, JMSFactoryType.CF, "ConnectionFactory", "/ConnectionFactory");
XAConnection conn = getXAConnectionFactory().createXAConnection();
XASession sess = conn.createXASession();
sess.getSession();
conn.close();
}
/**
* Tests if preparing a DTX branch after setting fail flag in dtx.end throws an exception
*/
@Test(groups = { "wso2.mb", "dtx" }, expectedExceptions = XAException.class)
public void prepareDtxBranchAfterEndFails()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueName = "DtxPrepareTestCasePrepareDtxBranchAfterEndFails";
InitialContext initialContext = JMSClientHelper
.createInitialContextBuilder("admin", "admin", "localhost", getAMQPPort())
.withQueue(queueName)
.build();
// Publish to queue and rollback
XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
XAConnection xaConnection = connectionFactory.createXAConnection();
xaConnection.start();
XASession xaSession = xaConnection.createXASession();
XAResource xaResource = xaSession.getXAResource();
Session session = xaSession.getSession();
Destination xaTestQueue = (Destination) initialContext.lookup(queueName);
session.createQueue(queueName);
MessageProducer producer = session.createProducer(xaTestQueue);
Xid xid = JMSClientHelper.getNewXid();
// We are not starting the dtx branch
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(session.createTextMessage("Test 1"));
xaResource.end(xid, XAResource.TMFAIL);
xaResource.prepare(xid);
xaResource.rollback(xid);
session.close();
xaConnection.close();
}
static void checkXAConnection(AssertableApplicationContext ctx) throws JMSException {
// Not using try-with-resources as that doesn't exist in JMS 1.1
XAConnection con = ctx.getBean(XAConnectionFactory.class).createXAConnection();
try {
con.setExceptionListener(exception -> {
});
assertThat(con.getExceptionListener().getClass().getName())
.startsWith("brave.jms.TracingExceptionListener");
}
finally {
con.close();
}
}
@Test
public void xaCode() throws Exception {
assertNotNull(xacf);
final Connection connection = xacf.createXAConnection();
assertThat(connection, instanceOf(XAConnection.class));
testConnection(connection);
}
private void doTestCrashServerAfterXACommit(boolean onePhase) throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
XAConnection connection = connectionFactory.createXAConnection();
try {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("Queue1");
final XASession xaSession = connection.createXASession();
MessageConsumer consumer = xaSession.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello " + 1));
session.commit();
XAResource xaResource = xaSession.getXAResource();
final Xid xid = newXID();
xaResource.start(xid, XAResource.TMNOFLAGS);
connection.start();
Assert.assertNotNull(consumer.receive(5000));
xaResource.end(xid, XAResource.TMSUCCESS);
try {
xaResource.commit(xid, onePhase);
Assert.fail("didn't get expected exception!");
} catch (XAException xae) {
if (onePhase) {
//expected error code is XAER_RMFAIL
Assert.assertEquals(XAException.XAER_RMFAIL, xae.errorCode);
} else {
//expected error code is XA_RETRY
Assert.assertEquals(XAException.XA_RETRY, xae.errorCode);
}
}
} finally {
connection.close();
}
}
public void testCloseSendConnection() throws Exception {
String brokerName = "closeSend";
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
broker.start();
broker.waitUntilStarted();
ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
XAConnection connection = (XAConnection) cf.createConnection();
connection.start();
XASession session = connection.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
connection.close();
//comment out this check as it doesn't apply to artemis
//assertTransactionGoneFromBroker(tid);
broker.stop();
}
@Override
public XAConnection createXAConnection() throws JMSException {
return createXAConnection(user, password);
}
@Override
public XAConnection createXAConnection() throws JMSException {
return createProviderConnection(new PooledConnectionKey(null, null));
}
@Override
public XAConnection createXAConnection(String userName, String password) throws JMSException {
return createProviderConnection(new PooledConnectionKey(userName, password));
}
@Override
protected Session makeSession(PooledSessionKey key) throws JMSException {
return ((XAConnection) connection).createXASession();
}
@Override
public XAConnection createXAConnection() throws JMSException {
return connectionFactory.createXAConnection();
}
@Override
public XAConnection createXAConnection(String userName, String password) throws JMSException {
return connectionFactory.createXAConnection(userName, password);
}
@Override
public XASession createXASession() throws JMSException {
return addSession( ((XAConnection) connection).createXASession());
}