类javax.jms.ResourceAllocationException源码实例Demo

下面列出了怎么用javax.jms.ResourceAllocationException的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: activemq-artemis   文件: AmqpFlowControlTest.java
@Test(timeout = 60000)
public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
   Connection connection = createConnection();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   Destination d = session.createQueue(getQueueName());
   MessageProducer p = session.createProducer(d);

   fillAddress(getQueueName());

   Exception e = null;
   try {
      p.send(session.createBytesMessage());
   } catch (ResourceAllocationException rae) {
      e = rae;
   }
   assertTrue(e instanceof ResourceAllocationException);
   assertTrue(e.getMessage().contains("resource-limit-exceeded"));

   long addressSize = server.getPagingManager().getPageStore(new SimpleString(getQueueName())).getAddressSize();
   assertTrue(addressSize >= MAX_SIZE_BYTES_REJECT_THRESHOLD);
}
 
protected ConnectionFactory getConnectionFactory() throws Exception {
   factory.setExceptionListener(new ExceptionListener() {
      @Override
      public void onException(JMSException arg0) {
         if (arg0 instanceof ResourceAllocationException) {
            gotResourceException.set(true);
         }
      }
   });
   return factory;
}
 
源代码3 项目: activemq-artemis   文件: JmsExceptionUtils.java
/**
 * Converts instances of sub-classes of {@link JMSException} into the corresponding sub-class of
 * {@link JMSRuntimeException}.
 *
 * @param e
 * @return
 */
public static JMSRuntimeException convertToRuntimeException(JMSException e) {
   if (e instanceof javax.jms.IllegalStateException) {
      return new IllegalStateRuntimeException(e.getMessage(), e.getErrorCode(), e);
   }
   if (e instanceof InvalidClientIDException) {
      return new InvalidClientIDRuntimeException(e.getMessage(), e.getErrorCode(), e);
   }
   if (e instanceof InvalidDestinationException) {
      return new InvalidDestinationRuntimeException(e.getMessage(), e.getErrorCode(), e);
   }
   if (e instanceof InvalidSelectorException) {
      return new InvalidSelectorRuntimeException(e.getMessage(), e.getErrorCode(), e);
   }
   if (e instanceof JMSSecurityException) {
      return new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e);
   }
   if (e instanceof MessageFormatException) {
      return new MessageFormatRuntimeException(e.getMessage(), e.getErrorCode(), e);
   }
   if (e instanceof MessageNotWriteableException) {
      return new MessageNotWriteableRuntimeException(e.getMessage(), e.getErrorCode(), e);
   }
   if (e instanceof ResourceAllocationException) {
      return new ResourceAllocationRuntimeException(e.getMessage(), e.getErrorCode(), e);
   }
   if (e instanceof TransactionInProgressException) {
      return new TransactionInProgressRuntimeException(e.getMessage(), e.getErrorCode(), e);
   }
   if (e instanceof TransactionRolledBackException) {
      return new TransactionRolledBackRuntimeException(e.getMessage(), e.getErrorCode(), e);
   }
   return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
 
@Override
public ResourceAllocationException toJMSException() {
    ResourceAllocationException jmsEx = new ResourceAllocationException(getMessage());
    jmsEx.initCause(this);
    jmsEx.setLinkedException(this);
    return jmsEx;
}
 
@Override
public ResourceAllocationException toJMSException() {
    ResourceAllocationException jmsEx = new ResourceAllocationException(getMessage());
    jmsEx.initCause(this);
    jmsEx.setLinkedException(this);
    return jmsEx;
}
 
源代码6 项目: qpid-jms   文件: ConnectionIntegrationTest.java
@Test(timeout = 20000)
public void  testRemotelyEndConnectionWithSessionWithProducerWithSendWaitingOnCredit() throws Exception {
    final String BREAD_CRUMB = "ErrorMessageBreadCrumb";

    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        Connection connection = testFixture.establishConnecton(testPeer);

        testPeer.expectBegin();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Expect producer creation, don't give it credit.
        testPeer.expectSenderAttachWithoutGrantingCredit();

        // Producer has no credit so the send should block waiting for it.
        testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB, 50);

        Queue queue = session.createQueue("myQueue");
        final MessageProducer producer = session.createProducer(queue);

        Message message = session.createTextMessage("myMessage");

        try {
            producer.send(message);
            fail("Expected exception to be thrown");
        } catch (ResourceAllocationException jmse) {
            // Expected
            assertNotNull("Expected exception to have a message", jmse.getMessage());
            assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
        } catch (Throwable t) {
            fail("Caught unexpected exception: " + t);
        }

        connection.close();

        testPeer.waitForAllHandlersToComplete(3000);
    }
}
 
源代码7 项目: qpid-jms   文件: ProducerIntegrationTest.java
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testRemotelyCloseProducerWithSendWaitingForCredit() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        Connection connection = testFixture.establishConnecton(testPeer);

        testPeer.expectBegin();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Expect producer creation, don't give it credit.
        testPeer.expectSenderAttachWithoutGrantingCredit();

        // Producer has no credit so the send should block waiting for it, then fail when the remote close occurs
        testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Producer closed", 50);
        testPeer.expectClose();

        Queue queue = session.createQueue("myQueue");
        final MessageProducer producer = session.createProducer(queue);

        Message message = session.createTextMessage("myMessage");

        try {
            producer.send(message);
            fail("Expected exception to be thrown due to close of producer");
        } catch (ResourceAllocationException rae) {
            // Expected if remote close beat the send to the provider
        } catch (IllegalStateException ise) {
            // Can happen if send fires before remote close if processed.
        }

        connection.close();

        testPeer.waitForAllHandlersToComplete(3000);
    }
}
 
private void doTestRemotelyCloseProducerWithSendWaitingForCredit(int cacheSize) throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        // Use a long timeout to ensure no early evictions in this test.
        Connection connection = testFixture.establishConnecton(testPeer,
            "?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000");

        testPeer.expectBegin();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Expect producer creation, don't give it credit.
        testPeer.expectSenderAttachWithoutGrantingCredit();

        // Producer has no credit so the send should block waiting for it, then fail when the remote close occurs
        testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Producer closed", 50);
        testPeer.expectClose();

        Queue queue = session.createQueue("myQueue");
        final MessageProducer producer = session.createProducer(null);

        Message message = session.createTextMessage("myMessage");

        try {
            producer.send(queue, message);
            fail("Expected exception to be thrown due to close of producer");
        } catch (ResourceAllocationException rae) {
            // Expected if remote close beat the send to the provider
        } catch (IllegalStateException ise) {
            // Can happen if send fires before remote close if processed.
        }

        connection.close();

        testPeer.waitForAllHandlersToComplete(3000);
    }
}
 
源代码9 项目: tomee   文件: JMS2.java
public static JMSRuntimeException toRuntimeException(final JMSException e) {
    if (e instanceof javax.jms.IllegalStateException) {
        return new IllegalStateRuntimeException(e.getMessage(), e.getErrorCode(), e);
    }
    if (e instanceof InvalidClientIDException) {
        return new InvalidClientIDRuntimeException(e.getMessage(), e.getErrorCode(), e);
    }
    if (e instanceof InvalidDestinationException) {
        return new InvalidDestinationRuntimeException(e.getMessage(), e.getErrorCode(), e);
    }
    if (e instanceof InvalidSelectorException) {
        return new InvalidSelectorRuntimeException(e.getMessage(), e.getErrorCode(), e);
    }
    if (e instanceof JMSSecurityException) {
        return new JMSSecurityRuntimeException(e.getMessage(), e.getErrorCode(), e);
    }
    if (e instanceof MessageFormatException) {
        return new MessageFormatRuntimeException(e.getMessage(), e.getErrorCode(), e);
    }
    if (e instanceof MessageNotWriteableException) {
        return new MessageNotWriteableRuntimeException(e.getMessage(), e.getErrorCode(), e);
    }
    if (e instanceof ResourceAllocationException) {
        return new ResourceAllocationRuntimeException(e.getMessage(), e.getErrorCode(), e);
    }
    if (e instanceof TransactionInProgressException) {
        return new TransactionInProgressRuntimeException(e.getMessage(), e.getErrorCode(), e);
    }
    if (e instanceof TransactionRolledBackException) {
        return new TransactionRolledBackRuntimeException(e.getMessage(), e.getErrorCode(), e);
    }
    return new JMSRuntimeException(e.getMessage(), e.getErrorCode(), e);
}
 
源代码10 项目: pooled-jms   文件: JMSExceptionSupportTest.java
@Test(expected = ResourceAllocationRuntimeException.class)
public void testConvertsResourceAllocationExceptionToResourceAllocationRuntimeException() {
    throw JMSExceptionSupport.createRuntimeException(new ResourceAllocationException("error"));
}
 
private void doTestRemotelyCloseProducerDuringSyncSend(int cacheSize) throws Exception {
    final String BREAD_CRUMB = "ErrorMessageBreadCrumb";

    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

        // Use a long timeout to ensure no early evictions in this test.
        Connection connection = testFixture.establishConnecton(testPeer,
            "?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000");

        testPeer.expectBegin();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Expect producer creation, give it credit.
        testPeer.expectSenderAttach();

        String text = "myMessage";
        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(headersMatcher);
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
        messageMatcher.setPropertiesMatcher(propsMatcher);
        messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));

        // Expect a message to be sent, but don't send a disposition in
        // response, simply remotely close the producer instead.
        testPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, false);
        testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB);
        testPeer.expectClose();

        Queue queue = session.createQueue("myQueue");
        final MessageProducer producer = session.createProducer(null);

        Message message = session.createTextMessage(text);

        try {
            producer.send(queue, message);
            fail("Expected exception to be thrown");
        } catch (JMSException jmse) {
            LOG.trace("JMSException thrown from send: ", jmse);
            // Expected but requires some context to be correct.
            assertTrue(jmse instanceof ResourceAllocationException);
            assertNotNull("Expected exception to have a message", jmse.getMessage());
            assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
        }

        connection.close();

        testPeer.waitForAllHandlersToComplete(3000);
    }
}
 
private void doTestRemotelyCloseConnectionDuringSyncSend(int cacheSize) throws Exception {
    final String BREAD_CRUMB = "ErrorMessageBreadCrumb";

    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        // Use a long timeout to ensure no early evictions in this test.
        Connection connection = testFixture.establishConnecton(testPeer,
            "?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000");

        testPeer.expectBegin();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Expect producer creation, give it credit.
        testPeer.expectSenderAttach();

        String text = "myMessage";
        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(headersMatcher);
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
        messageMatcher.setPropertiesMatcher(propsMatcher);
        messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));

        // Expect a message to be sent, but don't send a disposition in
        // response, simply remotely close the connection instead.
        testPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, false);
        testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB);

        Queue queue = session.createQueue("myQueue");
        final MessageProducer producer = session.createProducer(null);

        Message message = session.createTextMessage(text);

        try {
            producer.send(queue, message);
            fail("Expected exception to be thrown");
        } catch (JMSException jmse) {
            // Expected exception with specific context
            assertTrue(jmse instanceof ResourceAllocationException);
            assertNotNull("Expected exception to have a message", jmse.getMessage());
            assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
        }

        testPeer.waitForAllHandlersToComplete(3000);

        connection.close();
    }
}
 
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testAsyncSendFailureHandled() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {

        final CountDownLatch sendFailureReportedToListener = new CountDownLatch(1);
        final AtomicReference<Throwable> sendFailureError = new AtomicReference<>();

        JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
            "?jms.forceAsyncSend=true&amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");

        connection.setExceptionListener((error) -> {
            sendFailureError.compareAndSet(null, error);
            sendFailureReportedToListener.countDown();
        });

        connection.start();

        testPeer.expectBegin();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        String topicName = "myTopic";
        Topic dest = session.createTopic(topicName);

        // Expect no AMQP traffic when we create the anonymous producer, as it will wait
        // for an actual send to occur on the producer before anything occurs on the wire

        //Create an anonymous producer
        MessageProducer producer = session.createProducer(null);
        assertNotNull("Producer object was null", producer);

        // Expect a new message sent by the above producer to cause creation of a new
        // sender link to the given destination, then closing the link after the message is sent.
        TargetMatcher targetMatcher = new TargetMatcher();
        targetMatcher.withAddress(equalTo(topicName));
        targetMatcher.withDynamic(equalTo(false));
        targetMatcher.withDurable(equalTo(TerminusDurability.NONE));

        MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
        MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(headersMatcher);
        messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);

        final String BREAD_CRUMB = "SEND FAILURE EXPECTED";

        org.apache.qpid.jms.test.testpeer.describedtypes.Error rejectError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
        rejectError.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
        rejectError.setDescription(BREAD_CRUMB);

        testPeer.expectSenderAttach(targetMatcher, false, false);
        testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected().setError(rejectError), true);
        testPeer.expectDetach(true, true, true);

        // Producer should act as synchronous regardless of asynchronous send setting.
        Message message = session.createMessage();
        try {
            producer.send(dest, message);
        } catch (JMSException jmsEx) {
            LOG.debug("Caught expected error from failed send.");
            fail("Send should not fail as it should have fired asynchronously");
        }

        // Repeat the send and observe another attach->transfer->detach.
        testPeer.expectSenderAttach(targetMatcher, false, false);
        testPeer.expectTransfer(messageMatcher);
        testPeer.expectDetach(true, true, true);

        assertTrue("Send failure not reported to exception handler", sendFailureReportedToListener.await(5, TimeUnit.SECONDS));
        assertNotNull(sendFailureError.get());
        assertTrue(sendFailureError.get() instanceof ResourceAllocationException);
        assertTrue(sendFailureError.get().getMessage().contains(BREAD_CRUMB));

        producer.send(dest, message);

        // Send here is asynchronous so we need to wait for disposition to arrive and detach to happen
        testPeer.waitForAllHandlersToComplete(1000);

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testAsyncCompletionListenerSendFailureHandled() throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
            "?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");

        connection.start();

        testPeer.expectBegin();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        String topicName = "myTopic";
        Topic dest = session.createTopic(topicName);

        // Expect no AMQP traffic when we create the anonymous producer, as it will wait
        // for an actual send to occur on the producer before anything occurs on the wire

        //Create an anonymous producer
        MessageProducer producer = session.createProducer(null);
        assertNotNull("Producer object was null", producer);

        // Expect a new message sent by the above producer to cause creation of a new
        // sender link to the given destination, then closing the link after the message is sent.
        TargetMatcher targetMatcher = new TargetMatcher();
        targetMatcher.withAddress(equalTo(topicName));
        targetMatcher.withDynamic(equalTo(false));
        targetMatcher.withDurable(equalTo(TerminusDurability.NONE));

        String content = "testContent";
        TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
        messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
        messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
        messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
        messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content));

        TestJmsCompletionListener completionListener = new TestJmsCompletionListener();
        Message message = session.createTextMessage(content);

        final String BREAD_CRUMB = "SEND FAILURE EXPECTED";

        org.apache.qpid.jms.test.testpeer.describedtypes.Error rejectError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
        rejectError.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
        rejectError.setDescription(BREAD_CRUMB);

        testPeer.expectSenderAttach(targetMatcher, false, false);
        testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected().setError(rejectError), true);
        testPeer.expectDetach(true, true, true);

        // The fallback producer acts as synchronous regardless of the completion listener,
        // so exceptions are thrown from send. Only onComplete uses the listener.
        try {
            producer.send(dest, message, completionListener);
        } catch (JMSException jmsEx) {
            LOG.debug("Caught unexpected error from failed send.");
            fail("Send should not fail for asychrnous completion sends");
        }

        // Repeat the send (but accept this time) and observe another attach->transfer->detach.
        testPeer.expectSenderAttach(targetMatcher, false, false);
        testPeer.expectTransfer(messageMatcher);
        testPeer.expectDetach(true, true, true);

        assertTrue("Send failure not reported to exception handler", completionListener.awaitCompletion(5, TimeUnit.SECONDS));
        assertNotNull(completionListener.exception);
        assertTrue(completionListener.exception instanceof ResourceAllocationException);
        assertTrue(completionListener.exception.getMessage().contains(BREAD_CRUMB));

        TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener();

        producer.send(dest, message, completionListener2);

        assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
        assertNull(completionListener2.exception);
        Message receivedMessage2 = completionListener2.message;
        assertNotNull(receivedMessage2);
        assertTrue(receivedMessage2 instanceof TextMessage);
        assertEquals(content, ((TextMessage) receivedMessage2).getText());

        // Asynchronous send requires a wait otherwise we can close before the detach which we are testing for.
        testPeer.waitForAllHandlersToComplete(1000);

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
源代码15 项目: qpid-jms   文件: JmsExceptionSupportTest.java
@Test(expected = ResourceAllocationRuntimeException.class)
public void testConvertsResourceAllocationExceptionToResourceAllocationRuntimeException() {
    throw JmsExceptionSupport.createRuntimeException(new ResourceAllocationException("error"));
}
 
源代码16 项目: qpid-jms   文件: FailoverIntegrationTest.java
private void doFailoverPassthroughOfFailingSyncSendTestImpl(ListDescribedType failingState, boolean inspectException) throws Exception {
    try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
        final Connection connection = establishAnonymousConnecton(testPeer);

        testPeer.expectSaslAnonymous();
        testPeer.expectOpen();
        testPeer.expectBegin();
        testPeer.expectBegin();
        testPeer.expectSenderAttach();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("myQueue");

        MessageProducer producer = session.createProducer(queue);

        //Do a warmup that succeeds
        String messageContent1 = "myMessage1";
        TransferPayloadCompositeMatcher messageMatcher1 = new TransferPayloadCompositeMatcher();
        messageMatcher1.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
        messageMatcher1.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
        messageMatcher1.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
        messageMatcher1.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent1));

        testPeer.expectTransfer(messageMatcher1);

        TextMessage message1 = session.createTextMessage(messageContent1);
        producer.send(message1);

        testPeer.waitForAllHandlersToComplete(1000);

        // Create and send a new message, which fails as it is not accepted
        assertFalse(failingState instanceof Accepted);

        String messageContent2 = "myMessage2";
        TransferPayloadCompositeMatcher messageMatcher2 = new TransferPayloadCompositeMatcher();
        messageMatcher2.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
        messageMatcher2.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
        messageMatcher2.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
        messageMatcher2.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent2));

        long delay = 15;
        testPeer.expectTransfer(messageMatcher2, nullValue(), false, true, failingState, true, 0, delay);

        TextMessage message2 = session.createTextMessage(messageContent2);

        long start = System.currentTimeMillis();
        try {
            producer.send(message2);
            fail("Expected an exception for this send.");
        } catch (JMSException jmse) {
            //Expected
            long elapsed = System.currentTimeMillis() - start;
            MatcherAssert.assertThat("Send call should have taken at least the disposition delay", elapsed, Matchers.greaterThanOrEqualTo(delay));

            if (inspectException) {
                assertTrue(jmse instanceof ResourceAllocationException);
                assertTrue(jmse.getMessage().contains("RLE description"));
            }
        }

        testPeer.waitForAllHandlersToComplete(1000);

        //Do a final send that succeeds
        String messageContent3 = "myMessage3";
        TransferPayloadCompositeMatcher messageMatcher3 = new TransferPayloadCompositeMatcher();
        messageMatcher3.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
        messageMatcher3.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
        messageMatcher3.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
        messageMatcher3.setMessageContentMatcher(new EncodedAmqpValueMatcher(messageContent3));

        testPeer.expectTransfer(messageMatcher3);

        TextMessage message3 = session.createTextMessage(messageContent3);
        producer.send(message3);

        testPeer.expectClose();
        connection.close();

        testPeer.waitForAllHandlersToComplete(1000);
    }
}
 
 类所在包
 类方法
 同包方法