下面列出了怎么用javax.jms.MessageConsumer的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testAckedMessageAreConsumed() throws Exception {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello"));
// Consume the message...
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(this);
latch.await(10, TimeUnit.SECONDS);
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Attempt to Consume the message...check if message was acknowledge
consumer = session.createConsumer(queue);
Message msg = consumer.receive(1000);
assertNull(msg);
session.close();
}
@Test
public void testGetQueue() throws Exception {
Connection consumerConnection = null;
try {
consumerConnection = createConnection();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer queueConsumer = consumerSession.createConsumer(queue1);
Queue q = ((QueueReceiver) queueConsumer).getQueue();
ProxyAssertSupport.assertEquals(queue1, q);
} finally {
if (consumerConnection != null) {
consumerConnection.close();
}
}
}
public void testVerifyConsumerAndSessionCloseRedeliveryWithFailoverTransport() throws Throwable {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive(1000);
String id = message.getJMSMessageID();
assertNotNull(message);
LOG.info("got message " + message);
consumer.close();
session.close();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumer = session.createConsumer(destination);
message = consumer.receive(1000);
session.commit();
assertNotNull(message);
assertEquals("redelivered message", id, message.getJMSMessageID());
assertEquals(2, message.getLongProperty("JMSXDeliveryCount"));
}
@Test
public void testRemoveQueueAndProduceBeforeNewConsumerAdded() throws Exception {
MessageConsumer firstConsumer = registerConsumer();
produceMessage();
Message message = firstConsumer.receive(5000);
LOG.info("Received message " + message);
assertEquals(1, numberOfMessages());
firstConsumer.close();
session.commit();
Thread.sleep(1000);
removeQueue();
Thread.sleep(1000);
produceMessage();
MessageConsumer secondConsumer = registerConsumer();
message = secondConsumer.receive(5000);
LOG.debug("Received message " + message);
assertEquals(1, numberOfMessages());
secondConsumer.close();
}
public void testAddingConsumer() throws Exception {
ActiveMQDestination destination = new ActiveMQQueue("TEST");
// Setup a first connection
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
//MessageConsumer consumer = session.createConsumer(destination);
TextMessage message = session.createTextMessage("message");
message.setStringProperty("JMSXGroupID", "TEST-GROUP");
LOG.info("sending message: " + message);
producer.send(message);
MessageConsumer consumer = session.createConsumer(destination);
TextMessage msg = (TextMessage) consumer.receive();
assertNotNull(msg);
boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer");
assertTrue(first);
}
@Test
public void testGetSelector() throws Exception {
Connection consumerConnection = null;
try {
consumerConnection = createConnection();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String selector = "JMSType = 'something'";
MessageConsumer topicConsumer = consumerSession.createConsumer(ActiveMQServerTestCase.topic1, selector);
ProxyAssertSupport.assertEquals(selector, topicConsumer.getMessageSelector());
} finally {
if (consumerConnection != null) {
consumerConnection.close();
}
}
}
@Test
public void testDupsOkConsumer() throws Exception {
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
MessageConsumer consumer = session.createConsumer(destination);
// Send the messages
sendMessages(session, destination, 4);
// Make sure only 4 message are delivered.
for (int i = 0; i < 4; i++) {
Message m = consumer.receive(1000);
assertNotNull(m);
}
assertNull(consumer.receive(1000));
// Close out the consumer.. no other messages should be left on the queue.
consumer.close();
consumer = session.createConsumer(destination);
assertNull(consumer.receive(1000));
}
/**
* Execute the listener for a message received from the given consumer,
* wrapping the entire operation in an external transaction if demanded.
* @param session the JMS Session to work on
* @param consumer the MessageConsumer to work on
* @return whether a message has been received
* @throws JMSException if thrown by JMS methods
* @see #doReceiveAndExecute
*/
protected boolean receiveAndExecute(
Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer)
throws JMSException {
if (this.transactionManager != null) {
// Execute receive within transaction.
TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
boolean messageReceived;
try {
messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
}
catch (JMSException | RuntimeException | Error ex) {
rollbackOnException(this.transactionManager, status, ex);
throw ex;
}
this.transactionManager.commit(status);
return messageReceived;
}
else {
// Execute receive outside of transaction.
return doReceiveAndExecute(invoker, session, consumer, null);
}
}
protected String consume() throws Exception {
Connection con = null;
MessageConsumer c = consumer;
if (connectionPerMessage) {
con = factory.createConnection();
con.start();
Session s = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
c = s.createConsumer(getConsumeDestination());
}
TextMessage result = (TextMessage) c.receive(timeout);
if (result != null) {
if (audit.isDuplicate(result.getJMSMessageID())) {
throw new JMSException("Received duplicate " + result.getText());
}
if (!audit.isInOrder(result.getJMSMessageID())) {
throw new JMSException("Out of order " + result.getText());
}
if (connectionPerMessage) {
Thread.sleep(SLEEP_TIME);//give the broker a chance
con.close();
}
}
return result != null ? result.getText() : null;
}
@Test
public void testJMSXGroupIdCanBeSet() throws Exception {
final String jmsxGroupID = "JMSXGroupID";
MessageConsumer consumer = session.createConsumer(queue);
conn.connect(defUser, defPass);
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
.addHeader("JMSXGroupID", jmsxGroupID)
.setBody("Hello World");
conn.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
// differ from StompConnect
Assert.assertEquals(jmsxGroupID, message.getStringProperty("JMSXGroupID"));
}
protected List<CompletableFuture<List<Message>>> receiveMessagesAsync(int count, MessageConsumer... consumer) throws JMSException {
AtomicInteger totalCount = new AtomicInteger(count);
List<CompletableFuture<List<Message>>> resultsList = new ArrayList<>();
List<List<Message>> receivedResList = new ArrayList<>();
for (int i = 0; i < consumer.length; i++) {
final int index = i;
resultsList.add(new CompletableFuture<>());
receivedResList.add(new ArrayList<>());
MessageListener myListener = message -> {
instanceLog.debug("Mesages received" + message + " count: " + totalCount.get());
receivedResList.get(index).add(message);
if (totalCount.decrementAndGet() == 0) {
for (int j = 0; j < consumer.length; j++) {
resultsList.get(j).complete(receivedResList.get(j));
}
}
};
consumer[i].setMessageListener(myListener);
}
return resultsList;
}
@Test
public void testSendMessage() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
conn.connect(defUser, defPass);
send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
// Assert default priority 4 is used when priority header is not set
Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow = System.currentTimeMillis();
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
/**
* Make sure that a temp queue does not drop message if there are no active
* consumers.
*
* @throws JMSException
*/
@Test
public void testTempQueueHoldsMessagesWithoutConsumers() throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = session.createTextMessage("Hello");
producer.send(message);
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
Message message2 = consumer.receive(3000);
Assert.assertNotNull(message2);
Assert.assertTrue("Expected message to be a TextMessage", message2 instanceof TextMessage);
Assert.assertTrue("Expected message to be a '" + message.getText() + "'", ((TextMessage) message2).getText().equals(message.getText()));
}
@Test
public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
conn.connect(defUser, defPass);
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
.addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
.addHeader("foo", "abc")
.addHeader("bar", "123")
.setBody("Hello World");
conn.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
}
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
Destination destination = ActiveMQDestination.fromPrefixedName("queue://shipping");
try (Connection conn = factory.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("Received Message: ");
Order order = Jms.getEntity(message, Order.class);
System.out.println(order);
}
});
conn.start();
Thread.sleep(1000000);
}
}
public void testDestinationGCWithActiveConsumers() throws Exception {
assertEquals(1, broker.getAdminView().getQueues().length);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?create=false");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createProducer(otherQueue).close();
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
}
});
connection.start();
TimeUnit.SECONDS.sleep(5);
assertTrue("After GC runs there should be one Queue.", Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getAdminView().getQueues().length == 1;
}
}));
connection.close();
}
/**
* BrokerA <- BrokerB -> BrokerC
*/
public void testBAandBCbrokerNetworkWithSelectorsSubscribeFirst() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
startAllBrokers();
waitForBridgeFormation();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
MessageConsumer clientA = createConsumer("BrokerA", dest, "broker = 'BROKER_A'");
MessageConsumer clientC = createConsumer("BrokerC", dest, "broker = 'BROKER_C'");
Thread.sleep(2000); //et subscriptions get propagated
// Send messages for broker A
HashMap<String, Object> props = new HashMap<>();
props.put("broker", "BROKER_A");
sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
//Send messages for broker C
props.clear();
props.put("broker", "BROKER_C");
sendMessages("BrokerB", dest, MESSAGE_COUNT, props);
// Let's try to wait for any messages.
Thread.sleep(1000);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
// Total received should be 100
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT, msgsC.getMessageCount());
}
private void testSharedDurableConsumer(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session1.createTopic(getTopicName());
Topic topic2 = session2.createTopic(getTopicName());
final MessageConsumer consumer1 = session1.createSharedDurableConsumer(topic, "SharedConsumer");
final MessageConsumer consumer2 = session2.createSharedDurableConsumer(topic2, "SharedConsumer");
MessageProducer producer = session1.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message message1 = consumer1.receive(100);
Message message2 = consumer2.receive(100);
Message received = null;
if (message1 != null) {
assertNull("Message should only be delivered once per subscribtion but see twice", message2);
received = message1;
} else {
received = message2;
}
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
consumer1.close();
consumer2.close();
session1.unsubscribe("SharedConsumer");
} finally {
connection1.close();
connection2.close();
}
}
@Test
public void testAsyncReceiveWithoutExpirationChecks() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
factory.setConsumerExpiryCheckEnabled(false);
final CountDownLatch received = new CountDownLatch(1);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(name.getMethodName());
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
received.countDown();
}
});
MessageProducer producer = session.createProducer(destination);
producer.setTimeToLive(TimeUnit.SECONDS.toMillis(2));
producer.send(session.createTextMessage("test"));
// Allow message to expire in the prefetch buffer
TimeUnit.SECONDS.sleep(4);
connection.start();
assertTrue(received.await(5, TimeUnit.SECONDS));
connection.close();
}
private static void checkUserReceiveNoSend(final Topic topic,
final Connection connection,
final String user,
final Connection sendingConn) throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(topic);
MessageConsumer consumer = session.createConsumer(topic);
TextMessage msg = session.createTextMessage("hello-world-1");
try {
producer.send(msg);
throw new IllegalStateException("Security setting is broken! User " + user +
" can send message [" +
msg.getText() +
"] to topic " +
topic);
} catch (JMSException e) {
System.out.println("User " + user + " cannot send message [" + msg.getText() + "] to topic: " + topic);
}
// Now send a good message
Session session1 = sendingConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session1.createProducer(topic);
producer.send(msg);
TextMessage receivedMsg = (TextMessage) consumer.receive(2000);
if (receivedMsg != null) {
System.out.println("User " + user + " can receive message [" + receivedMsg.getText() + "] from topic " + topic);
} else {
throw new IllegalStateException("Security setting is broken! User " + user + " cannot receive message from topic " + topic);
}
session1.close();
session.close();
}
/**
* BrokerA -> BrokerB <- BrokerC
*/
public void testABandCBbrokerNetwork() throws Exception {
// Setup broker networks
bridgeBrokers("BrokerA", "BrokerB");
bridgeBrokers("BrokerC", "BrokerB");
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", true);
// Setup consumers
MessageConsumer clientA = createConsumer("BrokerA", dest);
MessageConsumer clientB = createConsumer("BrokerB", dest);
MessageConsumer clientC = createConsumer("BrokerC", dest);
//let consumers propagate around the network
Thread.sleep(2000);
// Send messages
sendMessages("BrokerA", dest, MESSAGE_COUNT);
sendMessages("BrokerB", dest, MESSAGE_COUNT);
sendMessages("BrokerC", dest, MESSAGE_COUNT);
// Get message count
MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3);
msgsC.waitForMessagesToArrive(MESSAGE_COUNT);
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT, msgsC.getMessageCount());
}
private void process() throws JMSException {
long end = System.currentTimeMillis() + 20000;
int transCount = 0;
LOG.info(toString() + " ONLINE.");
Connection con = openConnection();
Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = sess.createDurableSubscriber(topic, subName, selector, false);
try {
do {
long max = end - System.currentTimeMillis();
if (max <= 0) {
break;
}
Message message = consumer.receive(max);
if (message == null) {
continue;
}
LOG.info("Received Trans[id=" + message.getIntProperty("TRANS") + ", count=" + transCount + "] in " + this + ".");
} while (true);
} finally {
sess.close();
con.close();
LOG.info(toString() + " OFFLINE.");
}
}
protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception {
MessageConsumer consumer;
for (int i = 0; i < consumerCount; i++) {
TimedMessageListener list = new TimedMessageListener();
consumer = createConsumer(factory.createConnection(), dest);
consumer.setMessageListener(list);
consumers.put(consumer, list);
}
}
@Test
public void runtimeSelectorError() throws Exception
{
Connection connection = getConnection();
Queue queue = createQueue(getTestName());
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue , "testproperty % 5 = 1");
MessageProducer producer = session.createProducer(queue);
Message message = session.createMessage();
message.setIntProperty("testproperty", 1); // 1 % 5
producer.send(message);
connection.start();
Message receivedMessage = consumer.receive(getReceiveTimeout());
assertNotNull("Message matching selector should be received", receivedMessage);
message.setStringProperty("testproperty", "hello"); // "hello" % 5 would cause a runtime error
producer.send(message);
receivedMessage = consumer.receive(getReceiveTimeout());
assertNull("Message causing runtime selector error should not be received", receivedMessage);
MessageConsumer consumerWithoutSelector = session.createConsumer(queue);
receivedMessage = consumerWithoutSelector.receive(getReceiveTimeout());
assertNotNull("Message that previously caused a runtime error should be consumable by another consumer", receivedMessage);
}
finally
{
connection.close();
}
}
public void testReceiveWildcardTopicEndAsterisk() throws Exception {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination1 = (ActiveMQDestination) session.createTopic(destination1String);
ActiveMQDestination destination3 = (ActiveMQDestination) session.createTopic(destination3String);
Message m = null;
MessageConsumer consumer = null;
String text = null;
ActiveMQDestination destination6 = (ActiveMQDestination) session.createTopic("TEST.ONE.*");
consumer = session.createConsumer(destination6);
sendMessage(session, destination1, destination1String);
sendMessage(session, destination3, destination3String);
m = consumer.receive(1000);
assertNotNull(m);
text = ((TextMessage) m).getText();
if (!(text.equals(destination1String) || text.equals(destination3String))) {
fail("unexpected message:" + text);
}
m = consumer.receive(1000);
assertNotNull(m);
text = ((TextMessage) m).getText();
if (!(text.equals(destination1String) || text.equals(destination3String))) {
fail("unexpected message:" + text);
}
assertNull(consumer.receiveNoWait());
}
public void testLoadTempAdvisoryQueues() throws Exception {
for (int i = 0; i < MESSAGE_COUNT; i++) {
TemporaryQueue tempQueue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(tempQueue);
MessageProducer producer = session.createProducer(tempQueue);
consumer.close();
producer.close();
tempQueue.delete();
}
AdvisoryBroker ab = (AdvisoryBroker) broker.getBroker().getAdaptor(AdvisoryBroker.class);
assertTrue(ab.getAdvisoryDestinations().size() == 0);
assertTrue(ab.getAdvisoryConsumers().size() == 0);
assertTrue(ab.getAdvisoryProducers().size() == 0);
RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class);
for (Destination dest : rb.getDestinationMap().values()) {
LOG.debug("Destination: {}", dest);
}
// there should be at least 2 destinations - advisories -
// 1 for the connection + 1 generic ones
assertTrue("Should be at least 2 destinations", rb.getDestinationMap().size() > 2);
}
@Test
public void testAckModeClient2() throws Exception {
conn.connect(defUser, defPass);
subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
Thread.sleep(1000);
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++) {
this.sendJmsMessage("client-ack" + i);
}
ClientStompFrame frame = null;
for (int i = 0; i < num; i++) {
frame = conn.receiveFrame();
assertNotNull(frame);
//ack the 49th
if (i == num - 2) {
ack(conn, "sub1", frame);
}
}
unsubscribe(conn, "sub1");
conn.disconnect();
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(10000);
Assert.assertNotNull(message);
message = consumer.receive(1000);
Assert.assertNull(message);
}
protected MessageConsumer createConsumerForLocalBroker() throws JMSException {
Connection connection = localConnectionFactory.createConnection();
connections.add(connection);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
return session.createConsumer(inbound);
}
private void doTestCrashServerAfterXACommit(boolean onePhase) throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
XAConnection connection = connectionFactory.createXAConnection();
try {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("Queue1");
final XASession xaSession = connection.createXASession();
MessageConsumer consumer = xaSession.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello " + 1));
session.commit();
XAResource xaResource = xaSession.getXAResource();
final Xid xid = newXID();
xaResource.start(xid, XAResource.TMNOFLAGS);
connection.start();
Assert.assertNotNull(consumer.receive(5000));
xaResource.end(xid, XAResource.TMSUCCESS);
try {
xaResource.commit(xid, onePhase);
Assert.fail("didn't get expected exception!");
} catch (XAException xae) {
if (onePhase) {
//expected error code is XAER_RMFAIL
Assert.assertEquals(XAException.XAER_RMFAIL, xae.errorCode);
} else {
//expected error code is XA_RETRY
Assert.assertEquals(XAException.XA_RETRY, xae.errorCode);
}
}
} finally {
connection.close();
}
}
public MessageConsumer createDurableSubscriber(Topic dest, Session sess, String name) throws Exception {
MessageConsumer client = sess.createDurableSubscriber(dest, name);
MessageIdList messageIdList = new MessageIdList();
messageIdList.setParent(allMessages);
client.setMessageListener(messageIdList);
consumers.put(client, messageIdList);
return client;
}