下面列出了javax.jms.MessageConsumer#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testAbortAlreadyClosingConsumers() throws Exception {
consumerCount = 1;
startConsumers(withPrefetch(2, destination));
for (MessageIdList list : consumers.values()) {
list.setProcessingDelay(6 * 1000);
}
for (Connection c : connections) {
c.setExceptionListener(this);
}
startProducers(destination, 100);
allMessagesList.waitForMessagesToArrive(consumerCount);
for (MessageConsumer consumer : consumers.keySet()) {
LOG.info("closing consumer: " + consumer);
/// will block waiting for on message till 6secs expire
consumer.close();
}
}
public void testDeleteTempTopicDeletesAvisoryTopics() throws Exception {
topicConsumerEventSource.start();
MessageConsumer consumer = createConsumer(tempTopic);
assertConsumerEvent(1, true);
Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(tempTopic);
assertTrue(destinationExists(advisoryTopic));
consumer.close();
// Once we delete the topic, the advisory topic for the destination
// should also be deleted.
tempTopic.delete();
assertFalse(destinationExists(advisoryTopic));
}
@Test
public void testRemoveQueueAndProduceAfterNewConsumerAdded() throws Exception {
MessageConsumer firstConsumer = registerConsumer();
produceMessage();
Message message = firstConsumer.receive(5000);
LOG.info("Received message " + message);
assertEquals(1, numberOfMessages());
firstConsumer.close();
session.commit();
Thread.sleep(1000);
removeQueue();
Thread.sleep(1000);
MessageConsumer secondConsumer = registerConsumer();
produceMessage();
message = secondConsumer.receive(5000);
LOG.debug("Received message " + message);
assertEquals(1, numberOfMessages());
secondConsumer.close();
}
@Test
public void testGetTopicOnClosedConsumer() throws Exception {
Connection consumerConnection = null;
try {
consumerConnection = createConnection();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer topicConsumer = consumerSession.createConsumer(ActiveMQServerTestCase.topic1);
topicConsumer.close();
try {
((TopicSubscriber) topicConsumer).getTopic();
Assert.fail("must throw a JMS IllegalStateException");
} catch (javax.jms.IllegalStateException e) {
// OK
}
} finally {
if (consumerConnection != null) {
consumerConnection.close();
}
}
}
public void testNoReceiveConsumerDoesNotIncrementRedelivery() throws Exception {
connection.setClientID(getName());
connection.start();
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue-" + getName());
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = createProducer(session, queue);
producer.send(createTextMessage(session));
session.commit();
TimeUnit.SECONDS.sleep(1);
consumer.close();
consumer = session.createConsumer(queue);
Message msg = consumer.receive(1000);
assertNotNull(msg);
assertFalse("Message should not be redelivered.", msg.getJMSRedelivered());
session.commit();
session.close();
}
/**
* Function to retrieve message from specified message queue
*
* @param queueName Name of the queue
* @param timeout Timeout value (in milliseconds)
* @return Retrieved message from the queue
* @throws JMSException if error occurred
*/
public Message consumeMessageFromQueue(String queueName, long timeout) throws JMSException {
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
// Create a Connection
connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create a Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue(queueName);
// Create a MessageConsumer from the Session to the Topic or Queue
consumer = session.createConsumer(destination);
// Wait for a message
return consumer.receive(timeout);
} finally {
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public boolean closeConsumer(MessageConsumer messageConsumer, boolean forcefully) {
try {
if (this.cacheLevel < JMSConstants.CACHE_CONSUMER || forcefully) {
messageConsumer.close();
cachedMessageConsumer = null;
}
} catch (JMSException e) {
logger.error("JMS Exception while closing the consumer.", e);
}
return false;
}
protected void useConnection(Connection connection) throws Exception {
connection.setClientID("foo");
connection.start();
Session session = connection.createSession(transacted, authMode);
Destination destination = createDestination();
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
Message message = session.createTextMessage("Hello World");
producer.send(message);
Thread.sleep(1000);
consumer.close();
}
private void testFederationStreamConsumerAddress(String address) throws Exception {
ConnectionFactory cf1 = getCF(1);
ConnectionFactory cf0 = getCF(0);
try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) {
connection1.start();
connection0.start();
Session session0 = connection0.createSession();
Session session1 = connection1.createSession();
Topic topic0 = session0.createTopic(address);
Topic topic1 = session1.createTopic(address);
MessageConsumer consumer0 = session0.createConsumer(topic0);
MessageProducer producer1 = session1.createProducer(topic1);
assertTrue(Wait.waitFor(() -> getServer(1).getPostOffice().getBindingsForAddress(
SimpleString.toSimpleString(address)).getBindings().size() == 1, 5000, 500));
verifier0.validatePluginMethodsEquals(1, 5000, 500, BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER,
AFTER_CREATE_FEDERATED_QUEUE_CONSUMER, FEDERATED_ADDRESS_CONDITIONAL_CREATE_CONSUMER);
verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER,
AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER);
producer1.send(session1.createTextMessage("hello"));
assertNotNull(consumer0.receive(5000));
consumer0.close();
verifier0.validatePluginMethodsEquals(1, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER,
AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER, BEFORE_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED,
AFTER_FEDERATED_QUEUE_CONSUMER_MESSAGE_HANDLED);
}
}
@AfterClass
private void destroyJmsResources() throws JMSException {
for (MessageConsumer messageConsumer : messageConsumers) {
if (Objects.nonNull(messageConsumer)) {
messageConsumer.close();
}
}
if (Objects.nonNull(session)) {
session.close();
}
if (Objects.nonNull(connection)) {
connection.close();
}
}
private void testFederatedQueueBiDirectional(String queueName, boolean shared) throws Exception {
ConnectionFactory cf1 = getCF(1);
ConnectionFactory cf0 = getCF(0);
try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) {
connection0.start();
Session session0 = connection0.createSession();
Queue queue0 = session0.createQueue(queueName);
MessageProducer producer0 = session0.createProducer(queue0);
connection1.start();
Session session1 = connection1.createSession();
Queue queue1 = session1.createQueue(queueName);
MessageProducer producer1 = session1.createProducer(queue1);
MessageConsumer consumer0 = session0.createConsumer(queue0);
//Test producers being on broker 0 and broker 1 and consumer on broker 0.
producer0.send(session1.createTextMessage("hello"));
assertNotNull(consumer0.receive(1000));
producer1.send(session1.createTextMessage("hello"));
assertNotNull(consumer0.receive(1000));
Wait.assertTrue(() -> getServer(0).getPostOffice().getBinding(SimpleString.toSimpleString(queueName)) != null);
Wait.assertTrue(() -> getServer(1).getPostOffice().getBinding(SimpleString.toSimpleString(queueName)) != null);
//Wait to see if extra consumers are created - this tests to make sure there is no loop and tests the FederatedQueue metaDataFilterString
//is working properly - should only be 1 consumer on each (1 for the local consumer on broker0 and 1 for the federated consumer on broker1)
assertFalse(Wait.waitFor(() -> getServer(0).locateQueue(SimpleString.toSimpleString(queueName)).getConsumerCount() > 1, 500, 100));
assertFalse(Wait.waitFor(() -> getServer(1).locateQueue(SimpleString.toSimpleString(queueName)).getConsumerCount() > 1, 500, 100));
//Test consumer move from broker 0, to broker 1
final int server1ConsumerCount = getServer(1).getConnectionCount();
consumer0.close();
Wait.waitFor(() -> ((QueueBinding) getServer(0).getPostOffice().getBinding(SimpleString.toSimpleString(queueName))).consumerCount() == 0, 1000);
//Make sure we don't drop connection if shared
if (shared) {
assertFalse(Wait.waitFor(() -> getServer(1).getConnectionCount() == server1ConsumerCount - 1,
500, 100));
assertTrue(server1ConsumerCount == getServer(1).getConnectionCount());
}
MessageConsumer consumer1 = session1.createConsumer(queue1);
producer0.send(session1.createTextMessage("hello"));
assertNotNull(consumer1.receive(1000));
producer1.send(session1.createTextMessage("hello"));
assertNotNull(consumer1.receive(1000));
//Test consumers on both broker 0, and broker 1 that messages route to consumers on same broker
consumer0 = session0.createConsumer(queue0);
producer0.send(session1.createTextMessage("produce0"));
producer1.send(session1.createTextMessage("produce1"));
Message message0 = consumer0.receive(1000);
assertNotNull(message0);
assertEquals("produce0", ((TextMessage) message0).getText());
Message message1 = consumer1.receive(1000);
assertNotNull(message1);
assertEquals("produce1", ((TextMessage) message1).getText());
}
}
@Test
public void testDurableSubscriptionPersistence_1() throws Exception {
Connection conn = null;
try {
conn = createConnection();
conn.setClientID("five");
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer ds = s.createDurableSubscriber(ActiveMQServerTestCase.topic1, "sub", null, false);
MessageProducer p = s.createProducer(ActiveMQServerTestCase.topic1);
p.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage tm = s.createTextMessage("thebody");
p.send(tm);
log.debug("message sent");
conn.close();
stop();
startNoDelete();
deployAndLookupAdministeredObjects();
conn = createConnection();
conn.setClientID("five");
s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
ds = s.createDurableSubscriber(ActiveMQServerTestCase.topic1, "sub", null, false);
TextMessage rm = (TextMessage) ds.receive(3000);
ProxyAssertSupport.assertNotNull(rm);
ProxyAssertSupport.assertEquals("thebody", rm.getText());
ds.close();
s.unsubscribe("sub");
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
public void testFederatedQueueConditional() throws Exception {
String queueName = getName();
getServer(0).registerBrokerPlugin(new ActiveMQServerFederationPlugin() {
@Override
public boolean federatedQueueConditionalCreateConsumer(ServerConsumer consumer) {
//always return false for test
return false;
}
});
FederationConfiguration federationConfiguration = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
getServer(0).getConfiguration().getFederationConfigurations().add(federationConfiguration);
getServer(0).getFederationManager().deploy();
ConnectionFactory cf1 = getCF(1);
ConnectionFactory cf0 = getCF(0);
try (Connection connection1 = cf1.createConnection(); Connection connection0 = cf0.createConnection()) {
connection0.start();
connection1.start();
Session session0 = connection0.createSession();
Session session1 = connection1.createSession();
Queue queue0 = session0.createQueue(queueName);
Queue queue1 = session1.createQueue(queueName);
MessageProducer producer1 = session1.createProducer(queue1);
producer1.send(session1.createTextMessage("hello"));
MessageConsumer consumer0 = session0.createConsumer(queue0);
assertNull(consumer0.receive(1000));
verifier0.validatePluginMethodsEquals(1, 5000, 500, FEDERATED_QUEUE_CONDITIONAL_CREATE_CONSUMER);
verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CREATE_FEDERATED_QUEUE_CONSUMER,
AFTER_CREATE_FEDERATED_QUEUE_CONSUMER);
consumer0.close();
verifier0.validatePluginMethodsEquals(0, 5000, 500, BEFORE_CLOSE_FEDERATED_QUEUE_CONSUMER,
AFTER_CLOSE_FEDERATED_QUEUE_CONSUMER);
}
}
@Test
public void testVirtualTopicFQQNConsumerAutoCreateQAndAddress() throws Exception {
Connection exConn = null;
SimpleString topic = new SimpleString("VirtualTopic.Orders");
SimpleString subscriptionQ = new SimpleString("Consumer.A");
// defaults are false via test setUp
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
exFact.setWatchTopicAdvisories(false);
exConn = exFact.createConnection();
exConn.start();
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topic.toString());
Destination destinationFQN = session.createQueue(CompositeAddress.toFullyQualified(topic, subscriptionQ).toString());
MessageConsumer messageConsumerA = session.createConsumer(destinationFQN);
MessageConsumer messageConsumerB = session.createConsumer(destinationFQN);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a text message");
producer.send(message);
// only one consumer should get the message
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
TextMessage messageReceivedB = (TextMessage) messageConsumerB.receive(2000);
assertTrue((messageReceivedA == null || messageReceivedB == null));
String text = messageReceivedA != null ? messageReceivedA.getText() : messageReceivedB.getText();
assertEquals("This is a text message", text);
messageConsumerA.close();
messageConsumerB.close();
} finally {
if (exConn != null) {
exConn.close();
}
}
}
public static void main(final String[] args) throws Exception {
// Step 1. Create a JMS Connection factory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// Step 2. Create a JMS Connection
try (Connection connection = connectionFactory.createConnection()) {
//Step 3. Create a JMS Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//Step 4. Create a Queue Object
Queue queue = session.createQueue("client.side.exclusive.queue?exclusive=true");
//Step 5. Create a JMS producer
MessageProducer producer = session.createProducer(queue);
//Step 6. Create 2 consumers on the queue
MessageConsumer consumer1 = session.createConsumer(queue);
MessageConsumer consumer2 = session.createConsumer(queue);
MessageConsumer consumer3 = session.createConsumer(queue);
//Step 7. Start the connection
connection.start();
//Step 8. send 30 text messages
Message message = session.createTextMessage("My Message");
for (int i = 0; i < 30; i++) {
producer.send(message);
}
//Step 9. ensure consumer1 gets first 20
for (int i = 0; i < 20; i++) {
Message consumer1Message = consumer1.receive(1000);
if (consumer1Message == null) {
throw new RuntimeException("Example FAILED - 'consumer1' should have received 20 messages");
}
}
System.out.println(ExclusiveQueueClientSideExample.class.getName() + " 'consumer1' received 20 messages as expected");
//Step 10. ensure consumer2 gets no messages yet!
Message consumer2Message = consumer2.receive(1000);
if (consumer2Message != null) {
throw new RuntimeException("Example FAILED - 'consumer2' should have not received any Messages yet!");
}
//Step 11. close consumer1
consumer1.close();
//Step 12. ensure consumer2 receives remaining messages
for (int i = 0; i < 10; i++) {
consumer2Message = consumer2.receive(500);
if (consumer2Message == null) {
throw new RuntimeException("Example FAILED - 'consumer2' should have received 10 messages" + "after consumer1 has been closed");
}
}
System.out.println(ExclusiveQueueClientSideExample.class.getName() + " 'consumer2' received 10 messages " + "as expected, after 'consumer1' has been closed");
//Step 13. ensure consumer3 gets no messages yet!
Message consumer3Message = consumer3.receive(500);
if (consumer3Message != null) {
throw new RuntimeException("Example FAILED - 'consumer3' should have not received any Messages yet!");
}
System.out.println(ExclusiveQueueClientSideExample.class.getName() + " 'consumer3' received 0 messages " + "as expected");
}
}
@Test(timeout = 60000)
public void testPurgeQueueCoreRollback() throws Exception {
String queue = "purgeQueue";
SimpleString ssQueue = new SimpleString(queue);
server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(ssQueue).setRoutingType(RoutingType.ANYCAST).setMaxConsumers(1).setPurgeOnNoConsumers(true).setAutoCreateAddress(false));
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:5672");
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue("purgeQueue"));
javax.jms.Queue jmsQueue = session.createQueue(queue);
MessageConsumer consumer = session.createConsumer(jmsQueue);
for (int i = 0; i < 10; i++) {
Message message = session.createTextMessage("hello " + i);
producer.send(message);
}
session.commit();
QueueImpl queueView = (QueueImpl)getProxyToQueue(queue);
Wait.assertEquals(10, queueView::getMessageCount);
connection.start();
for (int i = 0; i < 10; i++) {
TextMessage txt = (TextMessage)consumer.receive(1000);
assertNotNull(txt);
assertEquals("hello " + i, txt.getText());
}
consumer.close();
session.rollback();
connection.close();
Wait.assertEquals(0, queueView::getMessageCount);
server.stop();
server.start();
queueView = (QueueImpl)getProxyToQueue(queue);
assertEquals(0, queueView.getMessageCount());
assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
}
@Test
public void testAutoDeleteTopicDefaultDurableSubscriptionQueue() throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
String testQueueName = getName();
String sub = testQueueName + "/mysub";
Topic topic = session.createTopic(testQueueName);
assertEquals(testQueueName, topic.getTopicName());
MessageConsumer consumer = session.createSharedDurableConsumer(topic, sub);
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(sub));
assertFalse(queueBinding.getQueue().isAutoDelete());
Wait.assertEquals(0, queueBinding.getQueue()::getMessageCount);
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("hello1"));
producer.send(session.createTextMessage("hello2"));
Message message = consumer.receive(5000);
assertNotNull(message);
assertEquals("hello1", ((TextMessage)message).getText());
message.acknowledge();
consumer.close();
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(sub));
assertNotNull(queueBinding);
consumer = session.createSharedDurableConsumer(topic, sub);
message = consumer.receive(5000);
assertNotNull(message);
assertEquals("hello2", ((TextMessage)message).getText());
message.acknowledge();
consumer.close();
//Wait longer than scan period.
Thread.sleep(20);
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(sub));
assertNotNull(queueBinding);
} finally {
connection.close();
}
}
@Test
public void testMessageDurableSubscription() throws Exception {
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient");
Connection connection = connectionFactory.createConnection();
connection.start();
instanceLog.debug("testMessageDurableSubscription");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic testTopic = session.createTopic("jmsTopic");
String sub1ID = "sub1DurSub";
String sub2ID = "sub2DurSub";
MessageConsumer subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
MessageConsumer subscriber2 = session.createDurableSubscriber(testTopic, sub2ID);
MessageProducer messageProducer = session.createProducer(testTopic);
int count = 100;
String batchPrefix = "First";
List<Message> listMsgs = generateMessages(session, batchPrefix, count);
sendMessages(messageProducer, listMsgs);
instanceLog.debug("First batch messages sent");
List<Message> recvd1 = receiveMessages(subscriber1, count);
List<Message> recvd2 = receiveMessages(subscriber2, count);
assertThat(recvd1.size(), is(count));
assertMessageContent(recvd1, batchPrefix);
instanceLog.debug(sub1ID + " :First batch messages received");
assertThat(recvd2.size(), is(count));
assertMessageContent(recvd2, batchPrefix);
instanceLog.debug(sub2ID + " :First batch messages received");
subscriber1.close();
instanceLog.debug(sub1ID + " : closed");
batchPrefix = "Second";
listMsgs = generateMessages(session, batchPrefix, count);
sendMessages(messageProducer, listMsgs);
instanceLog.debug("Second batch messages sent");
recvd2 = receiveMessages(subscriber2, count);
assertThat(recvd2.size(), is(count));
assertMessageContent(recvd2, batchPrefix);
instanceLog.debug(sub2ID + " :Second batch messages received");
subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
instanceLog.debug(sub1ID + " :connected");
recvd1 = receiveMessages(subscriber1, count);
assertThat(recvd1.size(), is(count));
assertMessageContent(recvd1, batchPrefix);
instanceLog.debug(sub1ID + " :Second batch messages received");
subscriber1.close();
subscriber2.close();
session.unsubscribe(sub1ID);
session.unsubscribe(sub2ID);
}
@Test
public void testClientAcknowledgmentOnClosedConsumer() throws Exception {
Connection producerConnection = null;
Connection consumerConnection = null;
try {
producerConnection = createConnection();
consumerConnection = createConnection();
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer queueProducer = producerSession.createProducer(queue1);
MessageConsumer queueConsumer = consumerSession.createConsumer(queue1);
TextMessage tm = producerSession.createTextMessage();
tm.setText("One");
queueProducer.send(tm);
consumerConnection.start();
TextMessage m = (TextMessage) queueConsumer.receive(1500);
ProxyAssertSupport.assertEquals(m.getText(), "One");
queueConsumer.close();
m.acknowledge();
try {
queueConsumer.receive(2000);
ProxyAssertSupport.fail("should throw exception");
} catch (javax.jms.IllegalStateException e) {
// OK
}
} finally {
if (producerConnection != null) {
producerConnection.close();
}
if (consumerConnection != null) {
consumerConnection.close();
}
}
}
@Test
public void testAckAfterConsumerClosed() throws Exception {
Connection connSend = null;
Connection connReceive = null;
try {
connSend = createConnection();
connSend.start();
Session sessSend = connSend.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer prod = sessSend.createProducer(queue1);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = sessSend.createTextMessage("hello");
prod.send(m);
sessSend.commit();
connReceive = createConnection();
connReceive.start();
Session sessReceive = connReceive.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer cons = sessReceive.createConsumer(queue1);
TextMessage m2 = (TextMessage) cons.receive(1500);
ProxyAssertSupport.assertNotNull(m2);
ProxyAssertSupport.assertEquals("hello", m2.getText());
// It is legal to close the consumer before committing the tx which is when
// the acks are sent
cons.close();
sessReceive.commit();
connReceive.close();
log.trace("Done test");
} finally {
if (connSend != null) {
connSend.close();
}
if (connReceive != null) {
connReceive.close();
}
}
}