下面列出了javax.jms.XASession#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void testConsumerCloseTransactionalSendReceive() throws Exception {
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection) cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
producer.close();
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session.close();
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
consumer.close();
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
assertNull(consumer.receive(1000));
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
}
public void testSessionCloseTransactionalSendReceive() throws Exception {
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection) cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
session.close();
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
session.close();
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session = connection1.createXASession();
consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
assertNull(consumer.receive(1000));
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
}
public void testReadonlyNoLeak() throws Exception {
final String brokerName = "readOnlyNoLeak";
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
broker.setPersistent(false);
broker.start();
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")");
cf1.setStatsEnabled(true);
ActiveMQXAConnection xaConnection = (ActiveMQXAConnection) cf1.createConnection();
xaConnection.start();
XASession session = xaConnection.createXASession();
XAResource resource = session.getXAResource();
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
session.close();
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
//not apply to artemis
//assertTransactionGoneFromBroker(tid);
//assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
assertSessionGone(xaConnection, session);
assertTransactionGoneFromFailoverState(xaConnection, tid);
// two phase
session = xaConnection.createXASession();
resource = session.getXAResource();
tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
session.close();
resource.end(tid, XAResource.TMSUCCESS);
assertEquals(XAResource.XA_RDONLY, resource.prepare(tid));
// no need for a commit on read only
//assertTransactionGoneFromBroker(tid);
//assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
assertSessionGone(xaConnection, session);
assertTransactionGoneFromFailoverState(xaConnection, tid);
xaConnection.close();
broker.stop();
}