下面列出了javax.jms.MessageProducer#send ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void publish(String dest, Serializable object, String contentType, String tag) throws Exception {
Connection conn = connectionFactory.createConnection();
try {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = createDestination(dest);
MessageProducer producer = session.createProducer(destination);
ObjectMessage message = session.createObjectMessage();
if (contentType != null) {
message.setStringProperty(HttpHeaderProperty.CONTENT_TYPE, contentType);
}
if (tag != null) {
message.setStringProperty("MyTag", tag);
}
message.setObject(object);
producer.send(message);
} finally {
conn.close();
}
}
@Test(timeout = 60000)
public void testAnonymousSend() throws Exception {
connection = createAmqpConnection();
assertNotNull(connection);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.getMethodName());
assertNotNull(session);
MessageProducer producer = session.createProducer(null);
Message message = session.createMessage();
producer.send(queue, message);
QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
assertEquals(1, proxy.getQueueSize());
}
/**
* Login with valid user and password
* But try send to address not authorised - Persistent
* Should not allow and should throw exception
*/
@Test
public void testLoginValidUserAndPasswordButNotAuthorisedToSend() throws Exception {
SimpleString queueName = SimpleString.toSimpleString("guest.cannot.send");
if (getJmsServer().locateQueue(queueName) == null) {
getJmsServer().createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
}
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection("guest", "guest");
Session session = connection.createSession();
Destination destination = session.createQueue(queueName.toString());
MessageProducer messageProducer = session.createProducer(destination);
try {
messageProducer.send(session.createTextMessage("hello"));
fail("JMSSecurityException expected as guest is not allowed to send");
} catch (JMSSecurityException activeMQSecurityException) {
//pass
}
connection.close();
}
public void testRemoveMessages() throws Exception {
final int QUEUE_SIZE = 30000;
final long TEST_TIMEOUT = 20000;
// Populate a test queue with uniquely-identifiable messages.
Connection conn = createConnection();
try {
conn.start();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < QUEUE_SIZE; i++) {
Message message = session.createMessage();
message.setIntProperty("id", i);
producer.send(message);
}
session.commit();
} finally {
conn.close();
}
// Access the implementation of the test queue and move the last message
// to another queue. Verify that the move occurred within the limits of
// the test.
Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(destination);
ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
context.setBroker(broker.getBroker());
context.getMessageEvaluationContext().setDestination(destination);
long startTimeMillis = System.currentTimeMillis();
Assert.assertEquals(1, queue.removeMatchingMessages("id=" + (QUEUE_SIZE - 1)));
long durationMillis = System.currentTimeMillis() - startTimeMillis;
LOG.info("It took " + durationMillis + "ms to remove the last message from a queue a " + QUEUE_SIZE + " messages.");
Assert.assertTrue("Removing the message took too long: " + durationMillis + "ms", durationMillis < TEST_TIMEOUT);
}
@Test
public void testConsumerFilterArrivalTime1000() throws Exception
{
assumeThat("Only legacy client implements this feature", getProtocol(), is(not(equalTo(Protocol.AMQP_1_0))));
final String queueName = getTestName();
createQueue(queueName);
final Connection connection = getConnection();
try
{
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue =
session.createQueue(String.format(LEGACY_BINDING_URL, queueName, queueName, getReceiveTimeout()));
connection.start();
final MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("A"));
Thread.sleep(getReceiveTimeout() / 4);
final MessageConsumer consumer = session.createConsumer(queue);
final Message receivedMessage = consumer.receive(getReceiveTimeout());
assertNotNull("Message A should be received", receivedMessage);
assertTrue("Unexpected message type", receivedMessage instanceof TextMessage);
assertEquals("Unexpected message", "A", ((TextMessage) receivedMessage).getText());
producer.send(session.createTextMessage("B"));
final Message secondMessage = consumer.receive(getReceiveTimeout());
assertNotNull("Message B should be received", secondMessage);
assertTrue("Unexpected message type", secondMessage instanceof TextMessage);
assertEquals("Unexpected message", "B", ((TextMessage) secondMessage).getText());
}
finally
{
connection.close();
}
}
@Test
public void test() throws Exception {
Connection connection = null;
Session session = null;
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue("decanter"));
ActiveMQMapMessage mapMessage = new ActiveMQMapMessage();
mapMessage.setString("message", "map");
producer.send(mapMessage);
Thread.sleep(200L);
Assert.assertEquals(1, dispatcher.getPostEvents().size());
Event event = dispatcher.getPostEvents().get(0);
Assert.assertEquals("map", event.getProperty("message"));
Assert.assertEquals("jms", event.getProperty("type"));
ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("{ \"message\" : \"text\" }");
producer.send(textMessage);
Thread.sleep(200L);
Assert.assertEquals(2, dispatcher.getPostEvents().size());
event = dispatcher.getPostEvents().get(1);
Assert.assertEquals("text", event.getProperty("message"));
Assert.assertEquals("jms", event.getProperty("type"));
} finally {
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
/**
* Reports are sent to the clients as json messages.
* @param jsonResponse the message to be sent
*/
private void sendJsonResponse(final String jsonResponse) {
if (replyDestination == null) {
log.error("sendJsonResponse() : JMSReplyTo destination is null - cannot send reply.");
return;
}
MessageProducer messageProducer = null;
try {
messageProducer = session.createProducer(replyDestination);
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
messageProducer.setTimeToLive(defaultReplyTTL);
Message replyMessage = null;
// Send response as Json message
replyMessage = session.createTextMessage(jsonResponse);
messageProducer.send(replyMessage);
log.debug("ClientRequestReportHandler() : Report sent.");
} catch (Throwable e) {
log.warn("daqTotalParts(): Failed to send Progress report :" + e.getMessage(), e);
} finally {
if (messageProducer != null) {
try {
messageProducer.close();
} catch (JMSException ignore) { // IGNORE
}
}
}
}
protected void sendMessages(Connection c, String qName) throws JMSException {
Session s = null;
s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
LOGGER.info("Sender: Using AUTO-ACK session");
Queue q = s.createQueue(qName);
MessageProducer producer = s.createProducer(null);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
long sent = 0;
while (running.get()) {
beforeSend();
producer.send(q, s.createTextMessage("Message_" + (sent++)));
}
}
public void sendMessage(String text) throws JMSException {
Connection connection = null;
Session session = null;
try {
connection = connectionFactory.createConnection();
connection.start();
// Create a Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(chatQueue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a message
TextMessage message = session.createTextMessage(text);
// Tell the producer to send the message
producer.send(message);
} finally {
// Clean up
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
private void testConnection(final Connection connection) throws JMSException, InterruptedException {
try {
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage(TEXT));
assertTrue(Listener.sync());
} finally {
try {
connection.close();
} catch (final JMSException e) {
//no-op
}
}
}
@Test(timeout = 60000)
public void testSendWithTimeToLiveExpiresToDLQ() throws Exception {
Connection connection = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer sender = session.createProducer(queue);
sender.setTimeToLive(1);
Message message = session.createMessage();
sender.send(message);
connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getDeadLetterAddress()));
Message m = consumer.receive(10000);
assertNotNull(m);
consumer.close();
consumer = session.createConsumer(queue);
m = consumer.receiveNoWait();
assertNull(m);
consumer.close();
} finally {
if (connection != null) {
connection.close();
}
}
}
@Test(timeout=20000)
public void testProducedMessagesAfterCommitOfSentMessagesFails() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId1);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
MessageProducer producer = session.createProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId1));
stateMatcher.withOutcome(nullValue());
TransactionalState txState = new TransactionalState();
txState.setTxnId(txnId1);
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
producer.send(session.createMessage());
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with rejected and settled disposition to indicate the commit failed
Rejected commitFailure = new Rejected(new Error(Symbol.valueOf("failed"), "Unknown error"));
testPeer.expectDischarge(txnId1, false, commitFailure);
// Then expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId2);
try {
session.commit();
fail("Commit operation should have failed.");
} catch (TransactionRolledBackException jmsTxRb) {
}
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId2));
stateMatcher.withOutcome(nullValue());
txState = new TransactionalState();
txState.setTxnId(txnId2);
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
testPeer.expectDischarge(txnId2, true);
producer.send(session.createMessage());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test
public void testAutoVirtualTopicWildcardStarFQQN() throws Exception {
Connection connection = null;
SimpleString topicA = new SimpleString("VirtualTopic.Orders.A");
SimpleString topicB = new SimpleString("VirtualTopic.Orders.B");
SimpleString topic = new SimpleString("VirtualTopic.Orders.*");
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateQueues(true);
this.server.getAddressSettingsRepository().getMatch("VirtualTopic.#").setAutoCreateAddresses(true);
try {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(urlString);
activeMQConnectionFactory.setWatchTopicAdvisories(false);
connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(topicA.toString() + "," + topicB.toString());
MessageConsumer messageConsumerA = session.createConsumer(session.createQueue("Consumer.A." + topic.toString()));
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("This is a text message");
producer.send(message);
TextMessage messageReceivedA = (TextMessage) messageConsumerA.receive(2000);
TextMessage messageReceivedB = (TextMessage) messageConsumerA.receive(2000);
assertTrue((messageReceivedA != null && messageReceivedB != null));
String text = messageReceivedA.getText();
assertEquals("This is a text message", text);
messageConsumerA.close();
} finally {
if (connection != null) {
connection.close();
}
}
}
/**
* Tests that messages are delivered normally to a subscriber on a separate connection despite
* the use of durable subscriber with no-local on the first connection.
*/
@Test
public void testNoLocalSubscriberAndSubscriberOnSeparateConnection() throws Exception
{
String noLocalSubscriptionName = getTestName() + "_no_local_sub";
String subscriobtionName = getTestName() + "_sub";
Topic topic = createTopic(getTestName());
final String clientId = "clientId";
Connection noLocalConnection = getConnectionBuilder().setClientId(clientId).build();
try
{
Connection connection = getConnection();
try
{
Session noLocalSession = noLocalConnection.createSession(true, Session.SESSION_TRANSACTED);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer noLocalSessionProducer = noLocalSession.createProducer(topic);
MessageProducer sessionProducer = session.createProducer(topic);
try
{
TopicSubscriber noLocalSubscriber =
noLocalSession.createDurableSubscriber(topic, noLocalSubscriptionName, null, true);
TopicSubscriber subscriber = session.createDurableSubscriber(topic, subscriobtionName, null, false);
noLocalConnection.start();
connection.start();
noLocalSessionProducer.send(noLocalSession.createTextMessage("Message1"));
noLocalSession.commit();
sessionProducer.send(session.createTextMessage("Message2"));
sessionProducer.send(session.createTextMessage("Message3"));
session.commit();
Message durableSubscriberMessage = noLocalSubscriber.receive(getReceiveTimeout());
assertTrue(durableSubscriberMessage instanceof TextMessage);
assertEquals("Unexpected local message received",
"Message2",
((TextMessage) durableSubscriberMessage).getText());
noLocalSession.commit();
Message nonDurableSubscriberMessage = subscriber.receive(getReceiveTimeout());
assertTrue(nonDurableSubscriberMessage instanceof TextMessage);
assertEquals("Unexpected message received",
"Message1",
((TextMessage) nonDurableSubscriberMessage).getText());
session.commit();
noLocalSubscriber.close();
subscriber.close();
}
finally
{
noLocalSession.unsubscribe(noLocalSubscriptionName);
session.unsubscribe(subscriobtionName);
}
}
finally
{
connection.close();
}
}
finally
{
noLocalConnection.close();
}
}
@Test(timeout = 20000)
public void testAsyncCompletionGetsTimedOutErrorWhenNoDispostionArrives() throws Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setSendTimeout(500);
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
Message message = session.createTextMessage("text");
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
// Expect the producer to attach and grant it some credit, it should send
// a transfer which we will not send any response for which should cause the
// send operation to time out.
testPeer.expectSenderAttach();
testPeer.expectTransferButDoNotRespond(messageMatcher);
testPeer.expectClose();
MessageProducer producer = session.createProducer(queue);
TestJmsCompletionListener listener = new TestJmsCompletionListener();
try {
producer.send(message, listener);
} catch (Throwable error) {
LOG.info("Caught unexpected error: {}", error.getMessage());
fail("Send should not fail for async.");
}
assertTrue("Did not get async callback", listener.awaitCompletion(5, TimeUnit.SECONDS));
assertNotNull(listener.exception);
assertTrue(listener.exception instanceof JmsSendTimedOutException);
assertNotNull(listener.message);
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout=20000)
public void testProducedMessagesOnTransactedSessionCanBeReused() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a Declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
// Create a producer to use in provoking creation of the AMQP transaction
testPeer.expectSenderAttach();
MessageProducer producer = session.createProducer(queue);
// Expect the message which was sent under the current transaction. Check it carries
// TransactionalState with the above txnId but has no outcome. Respond with a
// TransactionalState with Accepted outcome.
Message message = session.createMessage();
for(int i = 0; i < 3; ++i) {
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
stateMatcher.withTxnId(equalTo(txnId));
stateMatcher.withOutcome(nullValue());
TransactionalState txState = new TransactionalState();
txState.setTxnId(txnId);
txState.setOutcome(new Accepted());
testPeer.expectTransfer(messageMatcher, stateMatcher, txState, true);
message.setIntProperty("sequence", i);
producer.send(message);
}
// Expect rollback on close without a commit call.
testPeer.expectDischarge(txnId, true);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Parameters({ "broker-port", "admin-username", "admin-password", "broker-hostname" })
@Test
public void testConsumerWithBasicReject(String port,
String adminUsername,
String adminPassword,
String brokerHostname) throws Exception {
System.setProperty("AndesAckWaitTimeOut", "5000");
String queueName = "testConsumerWithBasicReject";
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withQueue(queueName)
.build();
ConnectionFactory connectionFactory
= (ConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
Connection connection = connectionFactory.createConnection();
connection.start();
// publish message
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = producerSession.createQueue(queueName);
MessageProducer producer = producerSession.createProducer(queue);
producer.send(producerSession.createTextMessage("Test message for reject test"));
producerSession.close();
// Consume published messages
Session subscriberSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination subscriberDestination = (Destination) initialContextForQueue.lookup(queueName);
MessageConsumer consumer = subscriberSession.createConsumer(subscriberDestination);
Message message = consumer.receive(5000);
Assert.assertNotNull(message, "Message was not received");
message = consumer.receive(10000);
Assert.assertNotNull(message, "Requeued Message was not received");
Assert.assertTrue(message.getJMSRedelivered(), "Redelivered flag was not set");
message.acknowledge();
connection.close();
}
@Test
public void testProducerCloseInCompletionListener() throws Exception {
Connection pconn = createConnection();
Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer p = ps.createProducer(ActiveMQServerTestCase.topic1);
CountDownLatch latch = new CountDownLatch(1);
CloseCompletionListener listener = new CloseCompletionListener(p, latch);
p.send(ps.createMessage(), DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, 0L, listener);
ProxyAssertSupport.assertTrue(latch.await(5, TimeUnit.SECONDS));
ProxyAssertSupport.assertNotNull(listener.exception);
ProxyAssertSupport.assertTrue(listener.exception instanceof javax.jms.IllegalStateException);
}
@Test
public void testResubscribeWithChangedNoLocal() throws Exception
{
assumeThat("QPID-8068", getProtocol(), is(equalTo(Protocol.AMQP_1_0)));
String subscriptionName = getTestName() + "_sub";
Topic topic = createTopic(getTestName());
String clientId = "testClientId";
Connection connection = getConnectionBuilder().setClientId(clientId).build();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
TopicSubscriber durableSubscriber =
session.createDurableSubscriber(topic, subscriptionName, null, false);
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("A"));
producer.send(session.createTextMessage("B"));
session.commit();
connection.start();
Message receivedMessage = durableSubscriber.receive(getReceiveTimeout());
assertTrue("TextMessage should be received", receivedMessage instanceof TextMessage);
assertEquals("Unexpected message received", "A", ((TextMessage)receivedMessage).getText());
session.commit();
}
finally
{
connection.close();
}
connection = getConnectionBuilder().setClientId(clientId).build();
try
{
connection.start();
Session session2 = connection.createSession(true, Session.SESSION_TRANSACTED);
TopicSubscriber noLocalSubscriber2 = session2.createDurableSubscriber(topic, subscriptionName, null, true);
Connection secondConnection = getConnectionBuilder().setClientId("secondConnection").build();
try
{
Session secondSession = secondConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer secondProducer = secondSession.createProducer(topic);
secondProducer.send(secondSession.createTextMessage("C"));
}
finally
{
secondConnection.close();
}
Message noLocalSubscriberMessage = noLocalSubscriber2.receive(getReceiveTimeout());
assertTrue("TextMessage should be received", noLocalSubscriberMessage instanceof TextMessage);
assertEquals("Unexpected message received", "C", ((TextMessage)noLocalSubscriberMessage).getText());
}
finally
{
connection.close();
}
}
@Test
public void testReceiveNoWait() throws Exception {
assertNotNull(queue);
for (int i = 0; i < 1000; i++) {
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int j = 0; j < 100; j++) {
String text = "Message" + j;
TextMessage message = session.createTextMessage();
message.setText(text);
producer.send(message);
}
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
for (int j = 0; j < 100; j++) {
TextMessage m = (TextMessage) consumer.receiveNoWait();
if (m == null) {
throw new IllegalStateException("msg null");
}
assertEquals("Message" + j, m.getText());
m.acknowledge();
}
connection.close();
}
}