下面列出了怎么用javax.jms.ResourceAllocationException的API类实例代码及写法,或者点击链接到github查看源代码。
@Test(timeout = 60000)
public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination d = session.createQueue(getQueueName());
MessageProducer p = session.createProducer(d);
fillAddress(getQueueName());
Exception e = null;
try {
p.send(session.createBytesMessage());
} catch (ResourceAllocationException rae) {
e = rae;
}
assertTrue(e instanceof ResourceAllocationException);
assertTrue(e.getMessage().contains("resource-limit-exceeded"));
long addressSize = server.getPagingManager().getPageStore(new SimpleString(getQueueName())).getAddressSize();
assertTrue(addressSize >= MAX_SIZE_BYTES_REJECT_THRESHOLD);
}
protected ConnectionFactory getConnectionFactory() throws Exception {
factory.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException arg0) {
if (arg0 instanceof ResourceAllocationException) {
gotResourceException.set(true);
}
}
});
return factory;
}
/**
* Converts instances of sub-classes of {@link JMSException} into the corresponding sub-class of
* {@link JMSRuntimeException}.
*
* @param e
* @return
*/
public static JMSRuntimeException convertToRuntimeException(JMSException e) {
if (e instanceof javax.jms.IllegalStateException) {
return new IllegalStateRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof InvalidClientIDException) {
return new InvalidClientIDRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof InvalidDestinationException) {
return new InvalidDestinationRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof InvalidSelectorException) {
return new InvalidSelectorRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof JMSSecurityException) {
return new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof MessageFormatException) {
return new MessageFormatRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof MessageNotWriteableException) {
return new MessageNotWriteableRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof ResourceAllocationException) {
return new ResourceAllocationRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof TransactionInProgressException) {
return new TransactionInProgressRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof TransactionRolledBackException) {
return new TransactionRolledBackRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
@Override
public ResourceAllocationException toJMSException() {
ResourceAllocationException jmsEx = new ResourceAllocationException(getMessage());
jmsEx.initCause(this);
jmsEx.setLinkedException(this);
return jmsEx;
}
@Override
public ResourceAllocationException toJMSException() {
ResourceAllocationException jmsEx = new ResourceAllocationException(getMessage());
jmsEx.initCause(this);
jmsEx.setLinkedException(this);
return jmsEx;
}
@Test(timeout = 20000)
public void testRemotelyEndConnectionWithSessionWithProducerWithSendWaitingOnCredit() throws Exception {
final String BREAD_CRUMB = "ErrorMessageBreadCrumb";
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Expect producer creation, don't give it credit.
testPeer.expectSenderAttachWithoutGrantingCredit();
// Producer has no credit so the send should block waiting for it.
testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB, 50);
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("myMessage");
try {
producer.send(message);
fail("Expected exception to be thrown");
} catch (ResourceAllocationException jmse) {
// Expected
assertNotNull("Expected exception to have a message", jmse.getMessage());
assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
} catch (Throwable t) {
fail("Caught unexpected exception: " + t);
}
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testRemotelyCloseProducerWithSendWaitingForCredit() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Expect producer creation, don't give it credit.
testPeer.expectSenderAttachWithoutGrantingCredit();
// Producer has no credit so the send should block waiting for it, then fail when the remote close occurs
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Producer closed", 50);
testPeer.expectClose();
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("myMessage");
try {
producer.send(message);
fail("Expected exception to be thrown due to close of producer");
} catch (ResourceAllocationException rae) {
// Expected if remote close beat the send to the provider
} catch (IllegalStateException ise) {
// Can happen if send fires before remote close if processed.
}
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
private void doTestRemotelyCloseProducerWithSendWaitingForCredit(int cacheSize) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Use a long timeout to ensure no early evictions in this test.
Connection connection = testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000");
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Expect producer creation, don't give it credit.
testPeer.expectSenderAttachWithoutGrantingCredit();
// Producer has no credit so the send should block waiting for it, then fail when the remote close occurs
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Producer closed", 50);
testPeer.expectClose();
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(null);
Message message = session.createTextMessage("myMessage");
try {
producer.send(queue, message);
fail("Expected exception to be thrown due to close of producer");
} catch (ResourceAllocationException rae) {
// Expected if remote close beat the send to the provider
} catch (IllegalStateException ise) {
// Can happen if send fires before remote close if processed.
}
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
public static JMSRuntimeException toRuntimeException(final JMSException e) {
if (e instanceof javax.jms.IllegalStateException) {
return new IllegalStateRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof InvalidClientIDException) {
return new InvalidClientIDRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof InvalidDestinationException) {
return new InvalidDestinationRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof InvalidSelectorException) {
return new InvalidSelectorRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof JMSSecurityException) {
return new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof MessageFormatException) {
return new MessageFormatRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof MessageNotWriteableException) {
return new MessageNotWriteableRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof ResourceAllocationException) {
return new ResourceAllocationRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof TransactionInProgressException) {
return new TransactionInProgressRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
if (e instanceof TransactionRolledBackException) {
return new TransactionRolledBackRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
@Test(expected = ResourceAllocationRuntimeException.class)
public void testConvertsResourceAllocationExceptionToResourceAllocationRuntimeException() {
throw JMSExceptionSupport.createRuntimeException(new ResourceAllocationException("error"));
}
private void doTestRemotelyCloseProducerDuringSyncSend(int cacheSize) throws Exception {
final String BREAD_CRUMB = "ErrorMessageBreadCrumb";
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Use a long timeout to ensure no early evictions in this test.
Connection connection = testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000");
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Expect producer creation, give it credit.
testPeer.expectSenderAttach();
String text = "myMessage";
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
// Expect a message to be sent, but don't send a disposition in
// response, simply remotely close the producer instead.
testPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, false);
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB);
testPeer.expectClose();
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(null);
Message message = session.createTextMessage(text);
try {
producer.send(queue, message);
fail("Expected exception to be thrown");
} catch (JMSException jmse) {
LOG.trace("JMSException thrown from send: ", jmse);
// Expected but requires some context to be correct.
assertTrue(jmse instanceof ResourceAllocationException);
assertNotNull("Expected exception to have a message", jmse.getMessage());
assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
}
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
private void doTestRemotelyCloseConnectionDuringSyncSend(int cacheSize) throws Exception {
final String BREAD_CRUMB = "ErrorMessageBreadCrumb";
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Use a long timeout to ensure no early evictions in this test.
Connection connection = testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000");
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Expect producer creation, give it credit.
testPeer.expectSenderAttach();
String text = "myMessage";
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
// Expect a message to be sent, but don't send a disposition in
// response, simply remotely close the connection instead.
testPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, false);
testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB);
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(null);
Message message = session.createTextMessage(text);
try {
producer.send(queue, message);
fail("Expected exception to be thrown");
} catch (JMSException jmse) {
// Expected exception with specific context
assertTrue(jmse instanceof ResourceAllocationException);
assertNotNull("Expected exception to have a message", jmse.getMessage());
assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
}
testPeer.waitForAllHandlersToComplete(3000);
connection.close();
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testAsyncSendFailureHandled() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final CountDownLatch sendFailureReportedToListener = new CountDownLatch(1);
final AtomicReference<Throwable> sendFailureError = new AtomicReference<>();
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?jms.forceAsyncSend=true&amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
connection.setExceptionListener((error) -> {
sendFailureError.compareAndSet(null, error);
sendFailureReportedToListener.countDown();
});
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
//Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination, then closing the link after the message is sent.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(topicName));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
final String BREAD_CRUMB = "SEND FAILURE EXPECTED";
org.apache.qpid.jms.test.testpeer.describedtypes.Error rejectError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
rejectError.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
rejectError.setDescription(BREAD_CRUMB);
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected().setError(rejectError), true);
testPeer.expectDetach(true, true, true);
// Producer should act as synchronous regardless of asynchronous send setting.
Message message = session.createMessage();
try {
producer.send(dest, message);
} catch (JMSException jmsEx) {
LOG.debug("Caught expected error from failed send.");
fail("Send should not fail as it should have fired asynchronously");
}
// Repeat the send and observe another attach->transfer->detach.
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
testPeer.expectDetach(true, true, true);
assertTrue("Send failure not reported to exception handler", sendFailureReportedToListener.await(5, TimeUnit.SECONDS));
assertNotNull(sendFailureError.get());
assertTrue(sendFailureError.get() instanceof ResourceAllocationException);
assertTrue(sendFailureError.get().getMessage().contains(BREAD_CRUMB));
producer.send(dest, message);
// Send here is asynchronous so we need to wait for disposition to arrive and detach to happen
testPeer.waitForAllHandlersToComplete(1000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testAsyncCompletionListenerSendFailureHandled() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
//Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination, then closing the link after the message is sent.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(topicName));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
String content = "testContent";
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content));
TestJmsCompletionListener completionListener = new TestJmsCompletionListener();
Message message = session.createTextMessage(content);
final String BREAD_CRUMB = "SEND FAILURE EXPECTED";
org.apache.qpid.jms.test.testpeer.describedtypes.Error rejectError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
rejectError.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
rejectError.setDescription(BREAD_CRUMB);
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected().setError(rejectError), true);
testPeer.expectDetach(true, true, true);
// The fallback producer acts as synchronous regardless of the completion listener,
// so exceptions are thrown from send. Only onComplete uses the listener.
try {
producer.send(dest, message, completionListener);
} catch (JMSException jmsEx) {
LOG.debug("Caught unexpected error from failed send.");
fail("Send should not fail for asychrnous completion sends");
}
// Repeat the send (but accept this time) and observe another attach->transfer->detach.
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
testPeer.expectDetach(true, true, true);
assertTrue("Send failure not reported to exception handler", completionListener.awaitCompletion(5, TimeUnit.SECONDS));
assertNotNull(completionListener.exception);
assertTrue(completionListener.exception instanceof ResourceAllocationException);
assertTrue(completionListener.exception.getMessage().contains(BREAD_CRUMB));
TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener();
producer.send(dest, message, completionListener2);
assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
assertNull(completionListener2.exception);
Message receivedMessage2 = completionListener2.message;
assertNotNull(receivedMessage2);
assertTrue(receivedMessage2 instanceof TextMessage);
assertEquals(content, ((TextMessage) receivedMessage2).getText());
// Asynchronous send requires a wait otherwise we can close before the detach which we are testing for.
testPeer.waitForAllHandlersToComplete(1000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(expected = ResourceAllocationRuntimeException.class)
public void testConvertsResourceAllocationExceptionToResourceAllocationRuntimeException() {
throw JmsExceptionSupport.createRuntimeException(new ResourceAllocationException("error"));
}
private void doFailoverPassthroughOfFailingSyncSendTestImpl(ListDescribedType failingState, boolean inspectException) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final Connection connection = establishAnonymousConnecton(testPeer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
//Do a warmup that succeeds
String messageContent1 = "myMessage1";
TransferPayloadCompositeMatcher messageMatcher1 = new TransferPayloadCompositeMatcher();
messageMatcher1.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher1.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
messageMatcher1.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher1.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent1));
testPeer.expectTransfer(messageMatcher1);
TextMessage message1 = session.createTextMessage(messageContent1);
producer.send(message1);
testPeer.waitForAllHandlersToComplete(1000);
// Create and send a new message, which fails as it is not accepted
assertFalse(failingState instanceof Accepted);
String messageContent2 = "myMessage2";
TransferPayloadCompositeMatcher messageMatcher2 = new TransferPayloadCompositeMatcher();
messageMatcher2.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher2.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
messageMatcher2.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher2.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent2));
long delay = 15;
testPeer.expectTransfer(messageMatcher2, nullValue(), false, true, failingState, true, 0, delay);
TextMessage message2 = session.createTextMessage(messageContent2);
long start = System.currentTimeMillis();
try {
producer.send(message2);
fail("Expected an exception for this send.");
} catch (JMSException jmse) {
//Expected
long elapsed = System.currentTimeMillis() - start;
MatcherAssert.assertThat("Send call should have taken at least the disposition delay", elapsed, Matchers.greaterThanOrEqualTo(delay));
if (inspectException) {
assertTrue(jmse instanceof ResourceAllocationException);
assertTrue(jmse.getMessage().contains("RLE description"));
}
}
testPeer.waitForAllHandlersToComplete(1000);
//Do a final send that succeeds
String messageContent3 = "myMessage3";
TransferPayloadCompositeMatcher messageMatcher3 = new TransferPayloadCompositeMatcher();
messageMatcher3.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher3.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
messageMatcher3.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher3.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent3));
testPeer.expectTransfer(messageMatcher3);
TextMessage message3 = session.createTextMessage(messageContent3);
producer.send(message3);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}