javax.jms.XASession#close ( )源码实例Demo

下面列出了javax.jms.XASession#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

public void testConsumerCloseTransactionalSendReceive() throws Exception {

      ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
      XAConnection connection1 = (XAConnection) cf1.createConnection();
      connection1.start();
      XASession session = connection1.createXASession();
      XAResource resource = session.getXAResource();
      Destination dest = new ActiveMQQueue(getName());

      // publish a message
      Xid tid = createXid();
      resource.start(tid, XAResource.TMNOFLAGS);
      MessageProducer producer = session.createProducer(dest);
      ActiveMQTextMessage message = new ActiveMQTextMessage();
      message.setText(getName());
      producer.send(message);
      producer.close();
      resource.end(tid, XAResource.TMSUCCESS);
      resource.commit(tid, true);
      session.close();

      session = connection1.createXASession();
      MessageConsumer consumer = session.createConsumer(dest);
      tid = createXid();
      resource = session.getXAResource();
      resource.start(tid, XAResource.TMNOFLAGS);
      TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
      consumer.close();
      assertNotNull(receivedMessage);
      assertEquals(getName(), receivedMessage.getText());
      resource.end(tid, XAResource.TMSUCCESS);
      resource.commit(tid, true);

      session = connection1.createXASession();
      consumer = session.createConsumer(dest);
      tid = createXid();
      resource = session.getXAResource();
      resource.start(tid, XAResource.TMNOFLAGS);
      assertNull(consumer.receive(1000));
      resource.end(tid, XAResource.TMSUCCESS);
      resource.commit(tid, true);

   }
 
public void testSessionCloseTransactionalSendReceive() throws Exception {

      ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
      XAConnection connection1 = (XAConnection) cf1.createConnection();
      connection1.start();
      XASession session = connection1.createXASession();
      XAResource resource = session.getXAResource();
      Destination dest = new ActiveMQQueue(getName());

      // publish a message
      Xid tid = createXid();
      resource.start(tid, XAResource.TMNOFLAGS);
      MessageProducer producer = session.createProducer(dest);
      ActiveMQTextMessage message = new ActiveMQTextMessage();
      message.setText(getName());
      producer.send(message);
      session.close();
      resource.end(tid, XAResource.TMSUCCESS);
      resource.commit(tid, true);

      session = connection1.createXASession();
      MessageConsumer consumer = session.createConsumer(dest);
      tid = createXid();
      resource = session.getXAResource();
      resource.start(tid, XAResource.TMNOFLAGS);
      TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
      session.close();
      assertNotNull(receivedMessage);
      assertEquals(getName(), receivedMessage.getText());
      resource.end(tid, XAResource.TMSUCCESS);
      resource.commit(tid, true);

      session = connection1.createXASession();
      consumer = session.createConsumer(dest);
      tid = createXid();
      resource = session.getXAResource();
      resource.start(tid, XAResource.TMNOFLAGS);
      assertNull(consumer.receive(1000));
      resource.end(tid, XAResource.TMSUCCESS);
      resource.commit(tid, true);
   }
 
public void testReadonlyNoLeak() throws Exception {
   final String brokerName = "readOnlyNoLeak";
   BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
   broker.setPersistent(false);
   broker.start();
   ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")");
   cf1.setStatsEnabled(true);
   ActiveMQXAConnection xaConnection = (ActiveMQXAConnection) cf1.createConnection();
   xaConnection.start();
   XASession session = xaConnection.createXASession();
   XAResource resource = session.getXAResource();
   Xid tid = createXid();
   resource.start(tid, XAResource.TMNOFLAGS);
   session.close();
   resource.end(tid, XAResource.TMSUCCESS);
   resource.commit(tid, true);

   //not apply to artemis
   //assertTransactionGoneFromBroker(tid);
   //assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
   assertSessionGone(xaConnection, session);
   assertTransactionGoneFromFailoverState(xaConnection, tid);

   // two phase
   session = xaConnection.createXASession();
   resource = session.getXAResource();
   tid = createXid();
   resource.start(tid, XAResource.TMNOFLAGS);
   session.close();
   resource.end(tid, XAResource.TMSUCCESS);
   assertEquals(XAResource.XA_RDONLY, resource.prepare(tid));

   // no need for a commit on read only
   //assertTransactionGoneFromBroker(tid);
   //assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
   assertSessionGone(xaConnection, session);
   assertTransactionGoneFromFailoverState(xaConnection, tid);

   xaConnection.close();
   broker.stop();

}