下面列出了javax.jms.Connection#start ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testGetBook() throws Exception {
Context ctx = getContext();
ConnectionFactory factory = (ConnectionFactory)ctx.lookup("ConnectionFactory");
Destination destination = (Destination)ctx.lookup("dynamicQueues/test.jmstransport.text");
Destination replyToDestination = (Destination)ctx.lookup("dynamicQueues/test.jmstransport.response");
Connection connection = null;
try {
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
postGetMessage(session, destination, replyToDestination);
checkBookInResponse(session, replyToDestination, 123L, "CXF JMS Rocks");
session.close();
} finally {
close(connection);
}
}
@Test
public void testAutoCreateOnSendToFQQN() throws Exception {
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = RandomUtil.randomString();
String addressName = RandomUtil.randomString();
javax.jms.Queue queue = ActiveMQJMSClient.createQueue(CompositeAddress.toFullyQualified(addressName, queueName));
MessageProducer producer = session.createProducer(queue);
final int numMessages = 100;
for (int i = 0; i < numMessages; i++) {
TextMessage mess = session.createTextMessage("msg" + i);
producer.send(mess);
}
producer.close();
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < numMessages; i++) {
Message m = messageConsumer.receive(5000);
Assert.assertNotNull(m);
}
// make sure the JMX control was created for the address and queue
assertNotNull(server.getManagementService().getResource(ADDRESS + addressName));
assertNotNull(server.getManagementService().getResource(QUEUE + queueName));
connection.close();
}
@Test
public void testSetObjectPropertyForDouble() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ObjectMessage msg = session.createObjectMessage("test");
msg.setObjectProperty("TestDoubleProperty", Double.MAX_VALUE);
assertEquals(Double.MAX_VALUE, msg.getObjectProperty("TestDoubleProperty"));
MessageProducer producer = session.createProducer(queue);
producer.send(msg);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Message receivedMessage = consumer.receive(getReceiveTimeout());
assertTrue("ObjectMessage should be received", receivedMessage instanceof ObjectMessage);
assertEquals("Unexpected received property",
Double.MAX_VALUE,
receivedMessage.getObjectProperty("TestDoubleProperty"));
}
finally
{
connection.close();
}
}
@Test
public void testSubscribeTopicToFQQNWrongQueueAttachedToAnotherAddress() throws Exception {
// Create 2 Queues: address1::queue1, address2::queue2
String address1 = "a1";
String address2 = "a2";
String queue1 = "q1";
String queue2 = "q2";
server.createQueue(new QueueConfiguration(queue1).setAddress(address1));
server.createQueue(new QueueConfiguration(queue2).setAddress(address2));
Exception e = null;
// Wrong FQQN. Attempt to subscribe to a queue belonging to a different address than given in the FQQN.
String wrongFQQN = address1 + "::" + queue2;
Connection connection = createConnection(false);
try {
connection.setClientID("FQQNconn");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(wrongFQQN);
session.createConsumer(topic);
} catch (InvalidDestinationException ide) {
e = ide;
} finally {
connection.close();
}
assertNotNull(e);
assertTrue(e.getMessage().contains("Queue: '" + queue2 + "' does not exist for address '" + address1 + "'"));
}
@Test
public void connectionStopThenStart() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnectionBuilder().setPrefetch(0).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Utils.sendMessages(session, queue, MSG_COUNT);
connection.start();
final MessageConsumer consumer = session.createConsumer(queue);
final int messageToReceivedBeforeConnectionStop = MSG_COUNT / 2;
CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop);
consumer.setMessageListener(countingMessageListener);
countingMessageListener.awaitMessages(getReceiveTimeout());
connection.stop();
assertTrue("Too few messages received after Connection#stop()", countingMessageListener.getReceivedCount() >= messageToReceivedBeforeConnectionStop);
countingMessageListener.resetLatch();
connection.start();
countingMessageListener.awaitMessages(getReceiveTimeout());
assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount());
}
finally
{
connection.close();
}
}
/**
* TEST TEMPORARY TOPICS
*/
public void testTempTopic(String prod_broker_url, String cons_broker_url) throws Exception {
Connection conn;
Session sess;
Destination cons_dest;
int num_msg;
num_msg = 5;
LOG.debug("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
//
// Connect to the bus.
//
conn = createConnection(cons_broker_url);
conn.start();
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//
// Create the destination on which messages are being tested.
//
LOG.trace("Creating destination");
cons_dest = sess.createTemporaryTopic();
testOneDest(conn, sess, cons_dest, num_msg);
//
// Cleanup
//
sess.close();
conn.close();
}
@Test(timeout = 30000)
public void testDurableConsumerLarge() throws Exception {
String durableClientId = getTopicName() + "-ClientId";
Connection connection = createConnection(durableClientId);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTopicName());
final MessageConsumer consumer1 = session.createDurableSubscriber(topic, "DurbaleSub1");
final MessageConsumer consumer2 = session.createDurableSubscriber(topic, "DurbaleSub2");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
ObjectMessage objMessage = session.createObjectMessage();
BigObject bigObject = new BigObject(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
objMessage.setObject(bigObject);
producer.send(objMessage);
ObjectMessage msg1 = (ObjectMessage)consumer1.receive(5000);
Assert.assertNotNull(msg1);
assertTrue("Should be an instance of TextMessage", msg1 instanceof ObjectMessage);
ObjectMessage msg2 = (ObjectMessage)consumer2.receive(5000);
assertNotNull("Should have received a message by now.", msg2);
assertTrue("Should be an instance of TextMessage", msg2 instanceof ObjectMessage);
} finally {
connection.close();
}
}
@Parameters({"broker-hostname", "broker-port", "admin-username", "admin-password"})
@Test(priority = 1,
description = "create and publish to a queue by a user who has queues:create and queues:publish scopes")
public void testCreateAndPublishByAdminUser(String brokerHostname,
String port,
String adminUsername,
String adminPassword) throws NamingException, JMSException {
String queueName = "testCreateAndPublishScopeByAdminUser";
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withQueue(queueName)
.build();
ConnectionFactory connectionFactory
= (ConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
Connection connection = connectionFactory.createConnection();
connection.start();
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = producerSession.createQueue(queueName);
MessageProducer producer = producerSession.createProducer(queue);
int numberOfMessages = 1;
for (int i = 0; i < numberOfMessages; i++) {
producer.send(producerSession.createTextMessage("Test message " + i));
}
producerSession.close();
connection.close();
}
@Test
public void testAutoDelete() throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
String testQueueName = getName();
Queue queue = session.createQueue(testQueueName + "?auto-delete=true");
ActiveMQDestination activeMQDestination = (ActiveMQDestination) queue;
final MessageConsumer consumer1 = session.createConsumer(queue);
assertEquals(testQueueName, queue.getQueueName());
assertEquals(true, activeMQDestination.getQueueAttributes().getAutoDelete());
assertEquals(true, activeMQDestination.getQueueConfiguration().isAutoDelete());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello1"));
producer.send(session.createTextMessage("hello2"));
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertTrue(queueBinding.getQueue().isAutoDelete());
Wait.assertEquals(2, queueBinding.getQueue()::getMessageCount);
Message message = consumer1.receive(5000);
assertNotNull(message);
message.acknowledge();
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
Wait.assertEquals(1, queueBinding.getQueue()::getMessageCount);
final MessageConsumer consumer2 = session.createConsumer(queue);
consumer1.close();
message = consumer2.receive(5000);
assertNotNull(message);
message.acknowledge();
consumer2.close();
Wait.assertTrue(() -> server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName)) == null, 5000, 10);
} finally {
connection.close();
}
}
public Connection createConnection() throws JMSException {
Connection connection = connectionPool.createConnection(username, password);
connection.start();
return connection;
}
/**
* Test that a message created from the same connection than a nolocal consumer
* can be sent by *another* connection and will be received by the nolocal consumer
*/
@Test
public void testNoLocal() throws Exception {
Connection defaultConn = null;
Connection newConn = null;
try {
Topic topic1 = createTopic("topic1");
defaultConn = cf.createConnection();
Session defaultSess = defaultConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer defaultConsumer = defaultSess.createConsumer(topic1);
MessageConsumer noLocalConsumer = defaultSess.createConsumer(topic1, null, true);
MessageProducer defaultProd = defaultSess.createProducer(topic1);
defaultConn.start();
String text = RandomUtil.randomString();
// message is created only once from the same connection than the noLocalConsumer
TextMessage messageSent = defaultSess.createTextMessage(text);
for (int i = 0; i < 10; i++) {
defaultProd.send(messageSent);
}
Message received = null;
for (int i = 0; i < 10; i++) {
received = defaultConsumer.receive(5000);
assertNotNull(received);
assertEquals(text, ((TextMessage) received).getText());
}
newConn = cf.createConnection();
Session newSession = newConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer newProd = newSession.createProducer(topic1);
MessageConsumer newConsumer = newSession.createConsumer(topic1);
newConn.start();
text = RandomUtil.randomString();
messageSent.setText(text);
defaultProd.send(messageSent);
received = newConsumer.receive(5000);
assertNotNull(received);
assertEquals(text, ((TextMessage) received).getText());
text = RandomUtil.randomString();
messageSent.setText(text);
// we send the message created at the start of the test but on the *newConn* this time
newProd.send(messageSent);
newConn.close();
received = noLocalConsumer.receive(5000);
assertNotNull("nolocal consumer did not get message", received);
assertEquals(text, ((TextMessage) received).getText());
} finally {
if (defaultConn != null) {
defaultConn.close();
}
if (newConn != null) {
newConn.close();
}
}
}
public static void main(String[] args) throws Exception {
final Map<String, String> messageReceiverMap = new ConcurrentHashMap<>();
Connection connection = null;
try {
//Step 2. Perform a lookup on the queue
Queue queue = ActiveMQJMSClient.createQueue("exampleQueue");
//Step 3. Perform a lookup on the Connection Factory
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?groupID=Group-0");
//Step 4. Create a JMS Connection
connection = cf.createConnection();
//Step 5. Create a JMS Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//Step 6. Create 2 JMS Message Producers
MessageProducer producer1 = session.createProducer(queue);
MessageProducer producer2 = session.createProducer(queue);
//Step 7. Create two consumers
MessageConsumer consumer1 = session.createConsumer(queue);
consumer1.setMessageListener(new SimpleMessageListener("consumer-1", messageReceiverMap));
MessageConsumer consumer2 = session.createConsumer(queue);
consumer2.setMessageListener(new SimpleMessageListener("consumer-2", messageReceiverMap));
//Step 8. Create and send 10 text messages with each producer
int msgCount = 10;
for (int i = 0; i < msgCount; i++) {
TextMessage m = session.createTextMessage("producer1 message " + i);
producer1.send(m);
System.out.println("Sent message: " + m.getText());
TextMessage m2 = session.createTextMessage("producer2 message " + i);
producer2.send(m2);
System.out.println("Sent message: " + m2.getText());
}
System.out.println("all messages are sent");
//Step 9. Start the connection
connection.start();
Thread.sleep(2000);
//Step 10. check the group messages are received by only one consumer
String trueReceiver = messageReceiverMap.get("producer1 message " + 0);
for (int i = 0; i < msgCount; i++) {
String receiver = messageReceiverMap.get("producer1 message " + i);
if (!trueReceiver.equals(receiver)) {
throw new IllegalStateException("Group message [producer1 message " + i + "] went to wrong receiver: " + receiver);
}
receiver = messageReceiverMap.get("producer2 message " + i);
if (!trueReceiver.equals(receiver)) {
throw new IllegalStateException("Group message [producer2 message " + i + "] went to wrong receiver: " + receiver);
}
}
} finally {
//Step 11. Be sure to close our JMS resources!
if (connection != null) {
connection.close();
}
}
}
@Test
public void testSendForeignWithForeignDestinationSet() throws Exception {
Connection conn = createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = sess.createProducer(queue1);
MessageConsumer c = sess.createConsumer(queue1);
conn.start();
Message foreign = new SimpleJMSMessage(new SimpleDestination());
foreign.setJMSDestination(new SimpleDestination());
// the producer destination should override the foreign destination and the send should succeed
p.send(foreign);
Message m = c.receive(1000);
ProxyAssertSupport.assertNotNull(m);
}
@Test(timeout = 60000)
public void testCanExhaustSessions() throws Exception {
final int totalMessagesExpected = NUM_MESSAGES * 2;
final CountDownLatch latch = new CountDownLatch(2);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Connection connection = null;
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionURI);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE);
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 0; i < totalMessagesExpected; ++i) {
Message msg = consumer.receive(5000);
if (msg == null) {
return;
}
numReceived++;
if (numReceived % 20 == 0) {
LOG.debug("received " + numReceived + " messages ");
System.runFinalization();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException ex) {}
}
}
}
});
thread.start();
ExecutorService threads = Executors.newFixedThreadPool(2);
final CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
LOG.trace("Starting threads to send messages!");
}
});
threads.execute(new TestRunner(barrier, latch));
threads.execute(new TestRunner(barrier, latch));
latch.await(2, TimeUnit.SECONDS);
thread.join();
assertEquals(totalMessagesExpected, numReceived);
}
@Test
public void testConflation() throws Exception
{
final String queueName = getTestName();
final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, false);
final Connection producerConnection = getConnection();
try
{
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
Message message = producerSession.createMessage();
message.setStringProperty(KEY_PROPERTY, "A");
message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 1);
producer.send(message);
message.setStringProperty(KEY_PROPERTY, "B");
message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 2);
producer.send(message);
message.setStringProperty(KEY_PROPERTY, "A");
message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 3);
producer.send(message);
message.setStringProperty(KEY_PROPERTY, "B");
message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 4);
producer.send(message);
}
finally
{
producerConnection.close();
}
Connection consumerConnection = getConnection();
try
{
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message received1 = consumer.receive(getReceiveTimeout());
assertNotNull("First message is not received", received1);
assertEquals("Unexpected key property value", "A", received1.getStringProperty(KEY_PROPERTY));
assertEquals("Unexpected sequence property value",
3,
received1.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
Message received2 = consumer.receive(getReceiveTimeout());
assertNotNull("Second message is not received", received2);
assertEquals("Unexpected key property value", "B", received2.getStringProperty(KEY_PROPERTY));
assertEquals("Unexpected sequence property value",
4,
received2.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
assertNull("Unexpected message is received", consumer.receive(getReceiveTimeout() / 4));
}
finally
{
consumerConnection.close();
}
}
@Test
public void testAutoDeleteDelay() throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
String testQueueName = getName();
Queue queue = session.createQueue(testQueueName + "?auto-delete=true&auto-delete-delay=100");
ActiveMQDestination activeMQDestination = (ActiveMQDestination) queue;
assertEquals(testQueueName, queue.getQueueName());
assertEquals(Long.valueOf(100), activeMQDestination.getQueueAttributes().getAutoDeleteDelay());
assertEquals(Long.valueOf(100), activeMQDestination.getQueueConfiguration().getAutoDeleteDelay());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello1"));
producer.send(session.createTextMessage("hello2"));
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertTrue(queueBinding.getQueue().isAutoDelete());
assertEquals(100, queueBinding.getQueue().getAutoDeleteDelay());
assertEquals(2, queueBinding.getQueue().getMessageCount());
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(5000);
assertNotNull(message);
message.acknowledge();
consumer.close();
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
Wait.assertEquals(1, queueBinding.getQueue()::getMessageCount);
consumer = session.createConsumer(queue);
message = consumer.receive(5000);
assertNotNull(message);
message.acknowledge();
consumer.close();
//Wait longer than scan period, but less than delay
Thread.sleep(50);
//Check the queue has not been removed.
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
assertNotNull(queueBinding);
Wait.assertTrue(() -> server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName)) == null, 5000, 10);
} finally {
connection.close();
}
}
@Test
public void testCapacityExceededCausesBlock() throws Exception
{
String queueName = getTestName();
int messageSize = evaluateMessageSize();
int capacity = messageSize * 3 + messageSize / 2;
int resumeCapacity = messageSize * 2;
Queue queue = createAndBindQueueWithFlowControlEnabled(queueName, capacity, resumeCapacity);
Connection producerConnection = getConnectionBuilder().setSyncPublish(true).build();
try
{
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
// try to send 5 messages (should block after 4)
MessageSender messageSender = sendMessagesAsync(producer, producerSession, 5);
assertTrue("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", true, 5000));
assertEquals("Incorrect number of message sent before blocking",
4,
messageSender.getNumberOfSentMessages());
Connection consumerConnection = getConnection();
try
{
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message message = consumer.receive(getReceiveTimeout());
assertNotNull("Message is not received", message);
assertFalse("Flow is not stopped", awaitAttributeValue(queueName, "queueFlowStopped", false, 1000));
assertEquals("Message incorrectly sent after one message received",
4,
messageSender.getNumberOfSentMessages());
Message message2 = consumer.receive(getReceiveTimeout());
assertNotNull("Message is not received", message2);
assertTrue("Message sending is not finished", messageSender.getSendLatch()
.await(1000, TimeUnit.MILLISECONDS));
assertEquals("Message not sent after two messages received",
5,
messageSender.getNumberOfSentMessages());
}
finally
{
consumerConnection.close();
}
}
finally
{
producerConnection.close();
}
}
public void run() {
try {
running = true;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
Connection connection = connectionFactory.createConnection();
if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
connection.setClientID(clientId);
}
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ackMode);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
replyProducer = session.createProducer(null);
replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageConsumer consumer = null;
if (durable && topic) {
consumer = session.createDurableSubscriber((Topic) destination, consumerName);
} else {
consumer = session.createConsumer(destination);
}
if (maxiumMessages > 0) {
consumeMessagesAndClose(connection, session, consumer);
} else {
if (receiveTimeOut == 0) {
consumer.setMessageListener(this);
} else {
consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);
}
}
} catch (Exception e) {
System.out.println("[" + this.getName() + "] Caught: " + e);
e.printStackTrace();
}
}
@Test
public void testFQNConsumer() throws Exception {
Connection exConn = null;
SimpleString durableQueue = new SimpleString("myqueue");
this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST));
try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
exConn = exFact.createConnection();
exConn.start();
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(durableQueue.toString());
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a text message");
producer.send(message);
Destination destinationFQN = session.createQueue(CompositeAddress.toFullyQualified(durableQueue, durableQueue).toString());
MessageConsumer messageConsumer = session.createConsumer(destinationFQN);
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
assertEquals("This is a text message", messageReceived.getText());
} finally {
if (exConn != null) {
exConn.close();
}
}
}
@Test
public void testSendAndReceivePersistentDifferentConnections() throws Exception {
Connection connSend = null;
Connection connReceive = null;
try {
connSend = createConnection();
connSend.start();
Session sessSend = connSend.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sessSend.createProducer(null);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = sessSend.createTextMessage("hello");
prod.send(queue1, m);
sessSend.commit();
connReceive = createConnection();
connReceive.start();
Session sessReceive = connReceive.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer cons = sessReceive.createConsumer(queue1);
TextMessage m2 = (TextMessage) cons.receive(1500);
ProxyAssertSupport.assertNotNull(m2);
ProxyAssertSupport.assertEquals("hello", m2.getText());
sessReceive.commit();
cons.close();
connReceive.close();
connReceive = createConnection();
connReceive.start();
sessReceive = connReceive.createSession(true, Session.SESSION_TRANSACTED);
cons = sessReceive.createConsumer(queue1);
} finally {
if (connSend != null) {
connSend.close();
}
if (connReceive != null) {
connReceive.close();
}
checkEmpty(queue1);
}
}