下面列出了怎么用javax.jms.XATopicSession的API类实例代码及写法,或者点击链接到github查看源代码。
public static ManagedSession create( Session session ) {
if ( (session instanceof XAQueueSession) && (session instanceof XATopicSession))
return new ManagedXAQueueTopicSession(session);
if (session instanceof XAQueueSession)
return new ManagedXAQueueSession((XAQueueSession) session);
if (session instanceof XATopicSession)
return new ManagedXATopicSession((XATopicSession) session);
if ( (session instanceof QueueSession) && (session instanceof TopicSession))
return new ManagedQueueTopicSession(session);
if (session instanceof QueueSession)
return new ManagedQueueSession((QueueSession) session);
if (session instanceof TopicSession)
return new ManagedTopicSession((TopicSession) session);
return new ManagedSession(session);
}
/**
* Create a XA topic session
*
* @return The XA topic session
* @throws JMSException Thrown if an error occurs
*/
@Override
public XATopicSession createXATopicSession() throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createXATopicSession()");
}
checkClosed();
if (type == ActiveMQRAConnectionFactory.CONNECTION || type == ActiveMQRAConnectionFactory.QUEUE_CONNECTION ||
type == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION) {
throw new IllegalStateException("Can not get a topic session from a queue connection");
}
return allocateConnection(type);
}
@Test
public void testPublisherWithRollback() throws Exception {
String subscriptionId = "sub-testPublisherWithRollback";
String topicName = "testPublisherWithRollback";
String testMessage = "testPublisherWithRollback-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory().withTopic(topicName).build();
Topic topic = (Topic) initialContext.lookup(topicName);
// Setup XA producer.
XATopicConnectionFactory xaTopicConnectionFactory =
(XATopicConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XATopicConnection xaTopicConnection = xaTopicConnectionFactory.createXATopicConnection();
XATopicSession xaTopicSession = xaTopicConnection.createXATopicSession();
XAResource xaResource = xaTopicSession.getXAResource();
MessageProducer producer = xaTopicSession.createProducer(topic);
// Setup non-transactional consumer.
TopicSession topicSession = xaTopicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
TopicSubscriber durableSubscriber = topicSession.createDurableSubscriber(topic, subscriptionId);
xaTopicConnection.start();
// Send message withing a XA transaction.
XidImpl xid = new XidImpl(0, "branchId".getBytes(), "globalId".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(xaTopicSession.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int response = xaResource.prepare(xid);
Assert.assertEquals(response, XAResource.XA_OK, "Prepare stage failed.");
xaResource.rollback(xid);
durableSubscriber.close();
xaTopicSession.close();
xaTopicConnection.close();
QueueMetadata queueMetadata = restApiClient.getQueueMetadata("carbon:" + subscriptionId);
Assert.assertEquals((int) queueMetadata.getSize(), 0, "Queue is not empty");
}
@Override public XATopicSession createXATopicSession() throws JMSException {
if ((types & TYPE_XA_TOPIC) != TYPE_XA_TOPIC) {
throw new IllegalStateException(delegate + " is not an XATopicConnection");
}
XATopicSession xats = ((XATopicConnection) delegate).createXATopicSession();
return TracingXASession.create(xats, jmsTracing);
}
@Override public TopicSession getTopicSession() throws JMSException {
if ((types & TYPE_XA_TOPIC) != TYPE_XA_TOPIC) {
throw new IllegalStateException(delegate + " is not an XATopicSession");
}
TopicSession xats = ((XATopicSession) delegate).getTopicSession();
return TracingSession.create(xats, jmsTracing);
}
TracingSession(Session delegate, JmsTracing jmsTracing) {
this.delegate = delegate;
this.jmsTracing = jmsTracing;
int types = 0;
if (delegate instanceof QueueSession) types |= TYPE_QUEUE;
if (delegate instanceof TopicSession) types |= TYPE_TOPIC;
if (delegate instanceof XASession) types |= TYPE_XA;
if (delegate instanceof XAQueueSession) types |= TYPE_XA_QUEUE;
if (delegate instanceof XATopicSession) types |= TYPE_XA_TOPIC;
this.types = types;
}
@Override
public XATopicSession createXATopicSession() throws JMSException {
return addSession(xaTopicConnection.createXATopicSession());
}
@Override
public XATopicSession createXATopicSession() throws JMSException {
return addSession( ((XATopicConnection) connection).createXATopicSession());
}
@Override
public TopicSession getTopicSession() throws JMSException {
return addSession( ((XATopicSession) session).getTopicSession());
}
public ManagedXATopicSession( final XATopicSession session ) {
super(session);
xaTopicSession = session;
}
@Test
public void testPublisherWithCommit() throws Exception {
String subscriptionId = "sub-testPublisherWithCommit";
String topicName = "testPublisherWithCommit";
String testMessage = "testPublisherWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory().withTopic(topicName).build();
Topic topic = (Topic) initialContext.lookup(topicName);
// Setup XA producer.
XATopicConnectionFactory xaTopicConnectionFactory =
(XATopicConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XATopicConnection xaTopicConnection = xaTopicConnectionFactory.createXATopicConnection();
XATopicSession xaTopicSession = xaTopicConnection.createXATopicSession();
XAResource xaResource = xaTopicSession.getXAResource();
MessageProducer producer = xaTopicSession.createProducer(topic);
// Setup non-transactional consumer.
TopicSession topicSession = xaTopicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
TopicSubscriber durableSubscriber = topicSession.createDurableSubscriber(topic, subscriptionId);
xaTopicConnection.start();
// Send message within XA transaction.
XidImpl xid = new XidImpl(0, "branchId".getBytes(), "globalId".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
producer.send(xaTopicSession.createTextMessage(testMessage));
xaResource.end(xid, XAResource.TMSUCCESS);
int response = xaResource.prepare(xid);
Assert.assertEquals(response, XAResource.XA_OK, "Prepare stage failed.");
xaResource.commit(xid, false);
TextMessage message = (TextMessage) durableSubscriber.receive(2000);
Assert.assertNotNull(message, "Didn't receive a message");
Assert.assertEquals(message.getText(), testMessage, "Received message content didn't match sent message.");
topicSession.close();
xaTopicSession.close();
xaTopicConnection.close();
}
@Test
public void testSubscriberWithCommit() throws Exception {
String subscriptionId = "sub-testSubscriberWithCommit";
String topicName = "testSubscriberWithCommit";
String testMessage = "testSubscriberWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory().withTopic(topicName).build();
Topic topic = (Topic) initialContext.lookup(topicName);
// Create XA consumer.
XATopicConnectionFactory xaTopicConnectionFactory =
(XATopicConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XATopicConnection xaTopicConnection = xaTopicConnectionFactory.createXATopicConnection();
XATopicSession xaTopicSession = xaTopicConnection.createXATopicSession();
XAResource xaResource = xaTopicSession.getXAResource();
TopicSubscriber durableSubscriber = xaTopicSession.createDurableSubscriber(topic, subscriptionId);
// Create non transactional producer.
TopicSession topicSession = xaTopicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
MessageProducer producer = topicSession.createProducer(topic);
xaTopicConnection.start();
producer.send(xaTopicSession.createTextMessage(testMessage));
// Consume message within a XA transaction.
XidImpl xid = new XidImpl(0, "branchId".getBytes(), "globalId".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
TextMessage message = (TextMessage) durableSubscriber.receive(2000);
xaResource.end(xid, XAResource.TMSUCCESS);
int response = xaResource.prepare(xid);
Assert.assertEquals(response, XAResource.XA_OK, "Prepare stage failed.");
xaResource.commit(xid, false);
Assert.assertNotNull(message, "Didn't receive a message");
Assert.assertEquals(message.getText(), testMessage, "Received message content didn't match sent message.");
topicSession.close();
xaTopicSession.close();
xaTopicConnection.close();
QueueMetadata queueMetadata = restApiClient.getQueueMetadata("carbon:" + subscriptionId);
Assert.assertEquals((int) queueMetadata.getSize(), 0, "Queue should be empty.");
}
@Test
public void testSubscriberWithRollback() throws Exception {
String subscriptionId = "sub-testSubscriberWithRollback";
String topicName = "testSubscriberWithCommit";
String testMessage = "testSubscriberWithCommit-Message";
InitialContext initialContext = initialContextBuilder.withXaConnectionFactory().withTopic(topicName).build();
Topic topic = (Topic) initialContext.lookup(topicName);
// Setup XA consumer.
XATopicConnectionFactory xaTopicConnectionFactory =
(XATopicConnectionFactory) initialContext.lookup(ClientHelper.XA_CONNECTION_FACTORY);
XATopicConnection xaTopicConnection = xaTopicConnectionFactory.createXATopicConnection();
XATopicSession xaTopicSession = xaTopicConnection.createXATopicSession();
XAResource xaResource = xaTopicSession.getXAResource();
TopicSubscriber durableSubscriber = xaTopicSession.createDurableSubscriber(topic, subscriptionId);
// Setup non-transactional message publisher.
TopicSession topicSession = xaTopicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
MessageProducer producer = topicSession.createProducer(topic);
xaTopicConnection.start();
producer.send(xaTopicSession.createTextMessage(testMessage));
// Consume messages within a XA transaction.
XidImpl xid = new XidImpl(0, "branchId".getBytes(), "globalId".getBytes());
xaResource.start(xid, XAResource.TMNOFLAGS);
TextMessage message = (TextMessage) durableSubscriber.receive(2000);
xaResource.end(xid, XAResource.TMSUCCESS);
int response = xaResource.prepare(xid);
Assert.assertEquals(response, XAResource.XA_OK, "Prepare stage failed.");
xaResource.rollback(xid);
Assert.assertNotNull(message, "Didn't receive a message");
Assert.assertEquals(message.getText(), testMessage, "Received message content didn't match sent message.");
topicSession.close();
xaTopicSession.close();
xaTopicConnection.close();
QueueMetadata queueMetadata = restApiClient.getQueueMetadata("carbon:" + subscriptionId);
Assert.assertEquals((int) queueMetadata.getSize(), 1, "Queue shouldn't be empty.");
}
@Override
public synchronized XATopicSession createXATopicSession() throws JMSException {
checkClosed();
return (XATopicSession) createSessionInternal(isXA(), true, Session.SESSION_TRANSACTED, ActiveMQSession.TYPE_TOPIC_SESSION);
}