下面列出了javax.jms.ServerSessionPool#org.apache.qpid.jms.JmsConnection 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Close the JMS connection known to this actor in an isolated dispatcher because it is blocking.
*
* @return future where the closing operation executes.
*/
@SuppressWarnings("UnusedReturnValue")
private CompletableFuture<Void> ensureJmsConnectionClosed() {
if (jmsConnection != null) {
final JmsConnection jmsConnectionToClose = jmsConnection;
final Runnable closeJmsConnectionRunnable = () -> {
try {
jmsConnectionToClose.close();
} catch (final Throwable error) {
// 'log' is final. It is okay to use it in a future.
log.error(error, "RESOURCE-LEAK: failed to close JMSConnection");
throw new RuntimeException(error);
}
};
return CompletableFuture.runAsync(closeJmsConnectionRunnable,
JMSConnectionHandlingActor.getOwnDispatcher(getContext().system()));
} else {
return CompletableFuture.completedFuture(null);
}
}
@Test(timeout = 30000)
public void testSendReceiveOverWS() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
JmsConnection connection = (JmsConnection) factory.createConnection();
try {
Session session = connection.createSession();
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createMessage());
producer.close();
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
assertNotNull(message);
} finally {
connection.close();
}
}
@Test(timeout = 30000)
public void testSendLargeMessageToClientFromOpenWire() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
JmsConnection connection = (JmsConnection) factory.createConnection();
sendLargeMessageViaOpenWire();
try {
Session session = connection.createSession();
Queue queue = session.createQueue(getQueueName());
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
assertNotNull(message);
assertTrue(message instanceof BytesMessage);
} finally {
connection.close();
}
}
@Ignore("Broker can't accept messages over 65535 right now")
@Test(timeout = 30000)
public void testSendLargeMessageToClientFromAMQP() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
JmsConnection connection = (JmsConnection) factory.createConnection();
sendLargeMessageViaAMQP();
try {
Session session = connection.createSession();
Queue queue = session.createQueue(getQueueName());
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(1000);
assertNotNull(message);
assertTrue(message instanceof BytesMessage);
} finally {
connection.close();
}
}
@Test(timeout=20000)
public void testProducerCloseDoesNotBlock() throws Exception {
connection = (JmsConnection) factory.createConnection();
connection.addConnectionListener(new ConnectionInterruptionListener());
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(_testName.getMethodName());
MessageProducer producer = session.createProducer(queue);
mockPeer.shutdown();
connectionInterrupted.await(9, TimeUnit.SECONDS);
producer.close();
connection.close();
}
@Test(timeout=20000)
public void testSessionCloseWithOpenResourcesDoesNotBlock() throws Exception {
connection = (JmsConnection) factory.createConnection();
connection.addConnectionListener(new ConnectionInterruptionListener());
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(_testName.getMethodName());
session.createConsumer(queue);
session.createProducer(queue);
mockPeer.shutdown();
connectionInterrupted.await(9, TimeUnit.SECONDS);
session.close();
connection.close();
}
@Test(timeout = 20000)
public void testCloseSessionTimesOut() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setCloseTimeout(500);
testPeer.expectBegin();
testPeer.expectEnd(false);
testPeer.expectClose();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull("Session should not be null", session);
try {
session.close();
fail("Should have thrown an timed out exception");
} catch (JmsOperationTimedOutException jmsEx) {
LOG.info("Caught exception: {}", jmsEx.getMessage());
}
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testCreateTemporaryQueueTimesOut() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setRequestTimeout(500);
connection.start();
testPeer.expectBegin();
testPeer.expectTempQueueCreationAttach(null, false);
testPeer.expectDetach(true, false, true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
session.createTemporaryQueue();
fail("Should have timed out on create.");
} catch (JmsOperationTimedOutException jmsEx) {
LOG.info("Caught expected exception: {}", jmsEx.getMessage());
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testCreateTemporaryTopicTimesOut() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setRequestTimeout(500);
connection.start();
testPeer.expectBegin();
testPeer.expectTempTopicCreationAttach(null, false);
testPeer.expectDetach(true, false, true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
session.createTemporaryTopic();
fail("Should have timed out on create.");
} catch (JmsOperationTimedOutException jmsEx) {
LOG.info("Caught expected exception: {}", jmsEx.getMessage());
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test
public void testHandleSendFailIfNoSpaceSync() throws Exception {
connection = createAmqpConnection();
JmsConnection jmsConnection = (JmsConnection) connection;
jmsConnection.setForceSyncSend(true);
connection.setExceptionListener(new TestExceptionListener());
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Message:1"));
try {
producer.send(session.createTextMessage("Message:2"));
fail("Should have failed to send message two");
} catch (JMSException ex) {
LOG.debug("Caught expected exception");
}
connection.close();
}
@Test(timeout = 20000)
public void testSessionSnapshotsPolicyObjects() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotSame(session.getMessageIDPolicy(), connection.getMessageIDPolicy());
assertNotSame(session.getPrefetchPolicy(), connection.getPrefetchPolicy());
assertNotSame(session.getPresettlePolicy(), connection.getPresettlePolicy());
assertNotSame(session.getRedeliveryPolicy(), connection.getRedeliveryPolicy());
testPeer.expectClose();
connection.close();
}
}
@Override
protected void createTestResources() throws Exception {
connection = createConnectionToMockProvider();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
}
});
Queue destination = session.createQueue(_testName.getMethodName());
sender = session.createProducer(destination);
receiver = session.createConsumer(destination);
connection.start();
providerListener.onConnectionFailure(new ProviderException("Something went wrong"));
final JmsConnection jmsConnection = connection;
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return !jmsConnection.isConnected();
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(2)));
}
private void publishMessages(AtomicLong count) throws Exception {
JmsConnection connection = (JmsConnection) factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
while (count.getAndDecrement() > 0) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(payload);
producer.send(message);
if ((count.get() % 10000) == 0) {
LOG.info("Sent message: {}", NUM_SENDS - count.get());
}
}
producer.close();
connection.close();
}
@Test(timeout=20000)
public void testConsumerCloseDoesNotBlock() throws Exception {
connection = (JmsConnection) factory.createConnection();
connection.addConnectionListener(new ConnectionInterruptionListener());
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(_testName.getMethodName());
MessageConsumer consumer = session.createConsumer(queue);
mockPeer.shutdown();
connectionInterrupted.await(9, TimeUnit.SECONDS);
consumer.close();
connection.close();
}
private JmsConnection establishAnonymousConnecton(String connectionParams, String failoverParams, TestAmqpPeer... peers) throws JMSException {
if (peers.length == 0) {
throw new IllegalArgumentException("No test peers were given, at least 1 required");
}
String remoteURI = "failover:(";
boolean first = true;
for (TestAmqpPeer peer : peers) {
if (!first) {
remoteURI += ",";
}
remoteURI += createPeerURI(peer, connectionParams);
first = false;
}
if (failoverParams == null) {
remoteURI += ")?failover.maxReconnectAttempts=10";
} else {
remoteURI += ")?" + failoverParams;
}
ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
Connection connection = factory.createConnection();
return (JmsConnection) connection;
}
@Test(timeout=20000)
public void testTransactionRollbackSucceeds() throws Exception {
connection = (JmsConnection) factory.createConnection();
connection.addConnectionListener(new ConnectionInterruptionListener());
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(_testName.getMethodName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createMessage());
mockPeer.shutdown();
connectionInterrupted.await(9, TimeUnit.SECONDS);
try {
session.rollback();
} catch (TransactionRolledBackException ex) {
fail("Should allow a rollback while offline.");
}
connection.close();
}
@Override
protected MessageConsumer createConsumer() throws Exception {
connection = createConnectionToMockProvider();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(_testName.getMethodName());
MessageConsumer consumer = session.createConsumer(destination);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
}
});
connection.start();
providerListener.onConnectionFailure(new ProviderException("Something went wrong"));
final JmsConnection jmsConnection = connection;
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return !jmsConnection.isConnected();
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(2)));
return consumer;
}
@Test(timeout = 60000)
public void testBrowseAllInQueueSmallPrefetch() throws Exception {
connection = createAmqpConnection();
((JmsDefaultPrefetchPolicy) ((JmsConnection) connection).getPrefetchPolicy()).setQueueBrowserPrefetch(1);
connection.start();
final int MSG_COUNT = 30;
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendToAmqQueue(MSG_COUNT);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(MSG_COUNT, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
assertFalse(enumeration.hasMoreElements());
assertEquals(MSG_COUNT, count);
}
/**
* @return The JmsConnection
*/
private JmsConnection createJmsConnection() {
return safelyExecuteJmsOperation(null, "create JMS connection", () -> {
if (log.isDebugEnabled()) {
log.debug("Attempt to create connection {} for URI [{}]", connection.getId(),
ConnectionBasedJmsConnectionFactory.buildAmqpConnectionUriFromConnection(connection));
}
return jmsConnectionFactory.createConnection(connection, exceptionListener);
});
}
@Override
protected CompletionStage<Status.Status> doTestConnection(final Connection connection) {
// delegate to child actor because the QPID JMS client is blocking until connection is opened/closed
return Patterns.ask(getTestConnectionHandler(connection),
new JmsConnect(getSender()), clientConfig.getTestingTimeout())
// compose the disconnect because otherwise the actor hierarchy might be stopped too fast
.thenCompose(response -> {
log.debug("Closing JMS connection after testing connection.");
if (response instanceof JmsConnected) {
final JmsConnection jmsConnection = ((JmsConnected) response).connection;
final JmsDisconnect jmsDisconnect = new JmsDisconnect(ActorRef.noSender(), jmsConnection);
return Patterns.ask(getDisconnectConnectionHandler(connection), jmsDisconnect,
clientConfig.getTestingTimeout())
// replace jmsDisconnected message with original response
.thenApply(jmsDisconnected -> response);
} else {
return CompletableFuture.completedFuture(response);
}
})
.handle((response, throwable) -> {
if (throwable != null || response instanceof Status.Failure || response instanceof Throwable) {
final Throwable ex =
response instanceof Status.Failure ? ((Status.Failure) response).cause() :
response instanceof Throwable ? (Throwable) response : throwable;
final ConnectionFailedException failedException =
ConnectionFailedException.newBuilder(connectionId())
.description("The requested Connection could not be connected due to '" +
ex.getClass().getSimpleName() + ": " + ex.getMessage() + "'")
.cause(ex).build();
return new Status.Failure(failedException);
} else if (response instanceof ConnectionFailure) {
return ((ConnectionFailure) response).getFailure();
} else {
return new Status.Success(response);
}
});
}
protected Connection createFailingConnection() throws JMSException {
String discoveryPrefix = DiscoveryProviderFactory.DISCOVERY_OPTION_PREFIX;
JmsConnectionFactory factory = new JmsConnectionFactory(
"discovery:(multicast://default?group=altGroup)?" + discoveryPrefix + "startupMaxReconnectAttempts=10" + "&" + discoveryPrefix +"maxReconnectDelay=100");
connection = factory.createConnection();
jmsConnection = (JmsConnection) connection;
jmsConnection.addConnectionListener(this);
jmsConnection.start();
return connection;
}
@Test(timeout = 60000)
public void testBrowseAllInQueueZeroPrefetch() throws Exception {
final int MSG_COUNT = 5;
JmsConnection connection = (JmsConnection) createConnection();
((JmsDefaultPrefetchPolicy) connection.getPrefetchPolicy()).setAll(0);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
javax.jms.Queue queue = session.createQueue(getQueueName());
sendMessages(name.getMethodName(), MSG_COUNT, false);
Queue queueView = getProxyToQueue(getQueueName());
Wait.assertEquals(MSG_COUNT, queueView::getMessageCount);
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (count < MSG_COUNT && enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
LOG.debug("Received all expected message, checking that hasMoreElements returns false");
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
}
private void doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(boolean topic, boolean deferAttachResponseWrite) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
testPeer.expectBegin();
JmsConnection connection = establishAnonymousConnecton(testPeer);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
if (topic) {
testPeer.expectAndRefuseTempTopicCreationAttach(AmqpError.UNAUTHORIZED_ACCESS, "Not Authorized to create temp topics.", false);
//Expect the detach response to the test peer after refusal.
testPeer.expectDetach(true, false, false);
session.createTemporaryTopic();
} else {
testPeer.expectAndRefuseTempQueueCreationAttach(AmqpError.UNAUTHORIZED_ACCESS, "Not Authorized to create temp queues.", false);
//Expect the detach response to the test peer after refusal.
testPeer.expectDetach(true, false, false);
session.createTemporaryQueue();
}
fail("Should have thrown security exception");
} catch (JMSSecurityException jmsse) {
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 30000)
public void testFailoverInitialReconnectDelayDoesNotApplyToInitialConnect() throws Exception {
try (TestAmqpPeer originalPeer = new TestAmqpPeer();) {
// Create a peer to connect to
final String originalURI = createPeerURI(originalPeer);
LOG.info("Original peer is at: {}", originalURI);
// Connect to the first peer
originalPeer.expectSaslAnonymous();
originalPeer.expectOpen();
originalPeer.expectBegin();
int delay = 20000;
StopWatch watch = new StopWatch();
JmsConnection connection = establishAnonymousConnecton("failover.initialReconnectDelay=" + delay + "&failover.maxReconnectAttempts=1", originalPeer);
connection.start();
long taken = watch.taken();
String message = "Initial connect should not have delayed for the specified initialReconnectDelay." + "Elapsed=" + taken + ", delay=" + delay;
assertTrue(message, taken < delay);
assertTrue("Connection took longer than reasonable: " + taken, taken < 5000);
// Shut it down
originalPeer.expectClose();
connection.close();
originalPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout=20000)
public void testConnectionCloseDoesNotBlock() throws Exception {
connection = (JmsConnection) factory.createConnection();
connection.addConnectionListener(new ConnectionInterruptionListener());
connection.start();
mockPeer.shutdown();
connectionInterrupted.await(9, TimeUnit.SECONDS);
connection.close();
}
@Override
protected MessageProducer createProducer() throws Exception {
connection = createConnectionToMockProvider();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
message = session.createMessage();
destination = session.createQueue("test");
MessageProducer producer = session.createProducer(destination);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
}
});
connection.start();
providerListener.onConnectionFailure(new ProviderException("Something went wrong"));
final JmsConnection jmsConnection = connection;
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return !jmsConnection.isConnected();
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(2)));
return producer;
}
@Test(timeout = 20000)
public void testCreateTransactedSessionFailsWhenNoDetachResponseSent() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
((JmsConnection) connection).setRequestTimeout(500);
testPeer.expectBegin();
// Expect the session, with an immediate link to the transaction coordinator
// using a target with the expected capabilities only.
CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
testPeer.expectSenderAttach(notNullValue(), txCoordinatorMatcher, true, true, false, 0, null, null);
testPeer.expectDetach(true, false, false);
// Expect the AMQP session to be closed due to the JMS session creation failure.
testPeer.expectEnd();
try {
connection.createSession(true, Session.SESSION_TRANSACTED);
fail("Session create should have failed.");
} catch (JMSException ex) {
// Expected
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testCloseSenderTimesOut() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setCloseTimeout(500);
testPeer.expectBegin();
testPeer.expectSenderAttach();
testPeer.expectDetach(true, false, true);
testPeer.expectClose();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
try {
producer.close();
fail("Should have thrown a timed out exception");
} catch (JmsOperationTimedOutException jmsEx) {
LOG.info("Caught excpected exception", jmsEx);
}
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testSendWhenLinkCreditIsZeroAndTimeout() throws Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setSendTimeout(500);
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
Message message = session.createTextMessage("text");
// Expect the producer to attach. Don't send any credit so that the client will
// block on a send and we can test our timeouts.
testPeer.expectSenderAttachWithoutGrantingCredit();
testPeer.expectClose();
MessageProducer producer = session.createProducer(queue);
try {
producer.send(message);
fail("Send should time out.");
} catch (JmsSendTimedOutException jmsEx) {
LOG.info("Caught expected error: {}", jmsEx.getMessage());
} catch (Throwable error) {
fail("Send should time out, but got: " + error.getMessage());
}
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testSendTimesOutWhenNoDispostionArrives() throws Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
connection.setSendTimeout(500);
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
Message message = session.createTextMessage("text");
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
// Expect the producer to attach and grant it some credit, it should send
// a transfer which we will not send any response for which should cause the
// send operation to time out.
testPeer.expectSenderAttach();
testPeer.expectTransferButDoNotRespond(messageMatcher);
testPeer.expectClose();
MessageProducer producer = session.createProducer(queue);
try {
producer.send(message);
fail("Send should time out.");
} catch (JmsSendTimedOutException jmsEx) {
LOG.info("Caught expected error: {}", jmsEx.getMessage());
} catch (Throwable error) {
fail("Send should time out, but got: " + error.getMessage());
}
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}