下面列出了javax.jms.Connection#setExceptionListener ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Prepare the given Connection before it is exposed.
* <p>The default implementation applies ExceptionListener and client id.
* Can be overridden in subclasses.
* @param con the Connection to prepare
* @throws JMSException if thrown by JMS API methods
* @see #setExceptionListener
* @see #setReconnectOnException
*/
protected void prepareConnection(Connection con) throws JMSException {
if (getClientId() != null) {
con.setClientID(getClientId());
}
if (this.aggregatedExceptionListener != null) {
con.setExceptionListener(this.aggregatedExceptionListener);
}
else if (getExceptionListener() != null || isReconnectOnException()) {
ExceptionListener listenerToUse = getExceptionListener();
if (isReconnectOnException()) {
this.aggregatedExceptionListener = new AggregatedExceptionListener();
this.aggregatedExceptionListener.delegates.add(this);
if (listenerToUse != null) {
this.aggregatedExceptionListener.delegates.add(listenerToUse);
}
listenerToUse = this.aggregatedExceptionListener;
}
con.setExceptionListener(listenerToUse);
}
}
/**
* Prepare the given Connection before it is exposed.
* <p>The default implementation applies ExceptionListener and client id.
* Can be overridden in subclasses.
* @param con the Connection to prepare
* @throws JMSException if thrown by JMS API methods
* @see #setExceptionListener
* @see #setReconnectOnException
*/
protected void prepareConnection(Connection con) throws JMSException {
if (getClientId() != null) {
con.setClientID(getClientId());
}
if (this.aggregatedExceptionListener != null) {
con.setExceptionListener(this.aggregatedExceptionListener);
}
else if (getExceptionListener() != null || isReconnectOnException()) {
ExceptionListener listenerToUse = getExceptionListener();
if (isReconnectOnException()) {
this.aggregatedExceptionListener = new AggregatedExceptionListener();
this.aggregatedExceptionListener.delegates.add(this);
if (listenerToUse != null) {
this.aggregatedExceptionListener.delegates.add(listenerToUse);
}
listenerToUse = this.aggregatedExceptionListener;
}
con.setExceptionListener(listenerToUse);
}
}
private Map<Connection, List<Session>> openConnectionsAndSessions(int numConnections, int numSessions, boolean transacted, ConnectionFactory conFac) throws JMSException
{
Map<Connection, List<Session>> connectionAndSessions = new HashMap<>();
for (int i= 0; i < numConnections ; i++)
{
Connection connection = conFac.createConnection();
connection.setExceptionListener(jmse -> {
LOGGER.error("The sample received an exception through the ExceptionListener", jmse);
System.exit(1);
});
List<Session> sessions = new ArrayList<>();
connectionAndSessions.put(connection, sessions);
connection.start();
for (int s= 0; s < numSessions ; s++)
{
Session session = connection.createSession(transacted, transacted?Session.SESSION_TRANSACTED:Session.AUTO_ACKNOWLEDGE);
sessions.add(session);
}
}
return connectionAndSessions;
}
@Test(timeout = 60 * 1000)
public void testAbortAlreadyClosedConnection() throws Exception {
Connection conn = createConnectionFactory().createConnection();
conn.setExceptionListener(this);
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
sess.createConsumer(destination);
conn.start();
startProducers(destination, 20);
TimeUnit.SECONDS.sleep(1);
LOG.info("closing connection: " + conn);
conn.close();
TimeUnit.SECONDS.sleep(5);
assertTrue("no exceptions : " + exceptions, exceptions.isEmpty());
}
@Test(timeout = 60 * 1000)
public void testAbortAlreadyClosedConsumers() throws Exception {
Connection conn = createConnectionFactory().createConnection();
conn.setExceptionListener(this);
connections.add(conn);
Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageConsumer consumer = sess.createConsumer(destination);
conn.start();
startProducers(destination, 20);
TimeUnit.SECONDS.sleep(1);
LOG.info("closing consumer: " + consumer);
consumer.close();
TimeUnit.SECONDS.sleep(5);
assertTrue("no exceptions : " + exceptions, exceptions.isEmpty());
}
private Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean start) throws JMSException {
JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
Connection connection = trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
if (start) {
connection.start();
}
return connection;
}
@Test
public void brokerRestartConnectionClose() throws Exception
{
Connection con = getConnectionBuilder().setFailover(false).build();
try
{
con.setExceptionListener(_connectionCloseFuture::set);
getBrokerAdmin().restart();
assertClientConnectionClosed(con);
}
finally
{
con.close();
}
}
@Test(timeout = 20000)
public void testOnExceptionFiredOnServerSessionFailure() throws Exception {
final CountDownLatch exceptionFired = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exceptionFired.countDown();
}
});
connection.start();
JmsServerSessionPool sessionPool = new JmsServerSessionPool(new JmsFailingServerSession());
// Now the Connection consumer arrives and we give it a message
// to be dispatched to the server session.
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
assertTrue("Exception should have been fired", exceptionFired.await(5, TimeUnit.SECONDS));
testPeer.expectDetach(true, true, true);
testPeer.expectDispositionThatIsReleasedAndSettled();
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
public static void main(String[] args) throws Exception {
try {
// The configuration for the Qpid InitialContextFactory has been supplied in
// a jndi.properties file in the classpath, which results in it being picked
// up automatically by the InitialContext constructor.
Context context = new InitialContext();
ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
Destination queue = (Destination) context.lookup("myQueueLookup");
Connection connection = factory.createConnection(System.getProperty("USER"), System.getProperty("PASSWORD"));
connection.setExceptionListener(new MyExceptionListener());
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(queue);
MessageConsumer messageConsumer = session.createConsumer(queue);
TextMessage message = session.createTextMessage("Hello world!");
messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L);
if (receivedMessage != null) {
System.out.println(receivedMessage.getText());
} else {
System.out.println("No message received within the given timeout!");
}
connection.close();
} catch (Exception exp) {
System.out.println("Caught exception, exiting.");
exp.printStackTrace(System.out);
System.exit(1);
}
}
@Test
@BMRules(
rules = {@BMRule(
name = "Corrupt Decoding",
targetClass = "org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder",
targetMethod = "decode(byte)",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.DisconnectOnCriticalFailureTest.doThrow();")})
public void testSendDisconnect() throws Exception {
createQueue("queue1");
final Connection producerConnection = nettyCf.createConnection();
final CountDownLatch latch = new CountDownLatch(1);
try {
producerConnection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException e) {
latch.countDown();
}
});
corruptPacket.set(true);
producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertTrue(latch.await(5, TimeUnit.SECONDS));
} finally {
corruptPacket.set(false);
if (producerConnection != null) {
producerConnection.close();
}
}
}
@Test(timeout = 20000)
public void testOnExceptionFiredOnSessionPoolFailure() throws Exception {
final CountDownLatch exceptionFired = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exceptionFired.countDown();
}
});
connection.start();
JmsFailingServerSessionPool sessionPool = new JmsFailingServerSessionPool();
// Now the Connection consumer arrives and we give it a message
// to be dispatched to the server session.
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
assertTrue("Exception should have been fired", exceptionFired.await(5, TimeUnit.SECONDS));
testPeer.expectDetach(true, true, true);
testPeer.expectDispositionThatIsReleasedAndSettled();
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
private Connection createTunneledConnection(final ClientMonitor clientMonitor) throws Exception
{
final int localPort = _tcpTunneler.getLocalPort();
Connection tunneledConnection = getConnectionBuilder().setPort(localPort).build();
_tcpTunneler.addClientListener(clientMonitor);
final AtomicReference<JMSException> _exception = new AtomicReference<>();
tunneledConnection.setExceptionListener(exception -> {
_exception.set(exception);
_tcpTunneler.disconnect(clientMonitor.getClientAddress());
});
return tunneledConnection;
}
public void testOnSend(boolean blockOnFailover) throws Exception {
mayBlock.set(blockOnFailover);
Connection sendConnection = null;
Connection connection = null;
AtomicReference<JMSException> exceptionOnConnection = new AtomicReference<>();
try {
((ActiveMQConnectionFactory) cf).setOutgoingInterceptorList(OutBoundPacketCapture.class.getName());
((ActiveMQConnectionFactory) cf).setIncomingInterceptorList(SessSendCauseResponseTimeout.class.getName());
((ActiveMQConnectionFactory) cf).setCallTimeout(500);
sendConnection = cf.createConnection();
sendConnection.setExceptionListener(exceptionOnConnection::set);
final Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = sendSession.createProducer(queue);
TextMessage message = sendSession.createTextMessage();
message.setText("Message");
producer.send(message);
fail("JMSException expected");
} catch (JMSException e) {
if (blockOnFailover) {
Wait.assertTrue(blocked::get);
unblock();
}
assertTrue(e.getCause() instanceof ActiveMQConnectionTimedOutException);
//Ensure JMS Connection ExceptionListener was also invoked
assertTrue(Wait.waitFor(() -> exceptionOnConnection.get() != null, 2000, 100));
assertTrue(exceptionOnConnection.get().getCause() instanceof ActiveMQConnectionTimedOutException);
} finally {
if (connection != null) {
connection.close();
}
if (sendConnection != null) {
sendConnection.close();
}
}
}
public static void main(String[] args) throws Exception {
try {
// The configuration for the Qpid InitialContextFactory has been supplied in
// a jndi.properties file in the classpath, which results in it being picked
// up automatically by the InitialContext constructor.
Context context = new InitialContext();
ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
Destination queue = (Destination) context.lookup("myQueueLookup");
Connection connection = factory.createConnection(System.getProperty("USER"), System.getProperty("PASSWORD"));
connection.setExceptionListener(new MyExceptionListener());
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//Create a temporary queue and consumer to receive responses, and a producer to send requests.
TemporaryQueue responseQueue = session.createTemporaryQueue();
MessageConsumer messageConsumer = session.createConsumer(responseQueue);
MessageProducer messageProducer = session.createProducer(queue);
//Send some requests and receive the responses.
String[] requests = new String[] { "Twas brillig, and the slithy toves",
"Did gire and gymble in the wabe.",
"All mimsy were the borogroves,",
"And the mome raths outgrabe." };
for (String request : requests) {
TextMessage requestMessage = session.createTextMessage(request);
requestMessage.setJMSReplyTo(responseQueue);
messageProducer.send(requestMessage, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
TextMessage responseMessage = (TextMessage) messageConsumer.receive(2000);
if (responseMessage != null) {
System.out.println("[CLIENT] " + request + " ---> " + responseMessage.getText());
} else {
System.out.println("[CLIENT] Response for '" + request +"' was not received within the timeout, exiting.");
break;
}
}
connection.close();
} catch (Exception exp) {
System.out.println("[CLIENT] Caught exception, exiting.");
exp.printStackTrace(System.out);
System.exit(1);
}
}
/**
* Registers this listener container as JMS ExceptionListener on the shared connection.
*/
@Override
protected void prepareSharedConnection(Connection connection) throws JMSException {
super.prepareSharedConnection(connection);
connection.setExceptionListener(this);
}
@Test
public void testListenerCalledForOneConnection() throws Exception {
Connection conn = cf.createConnection();
CountDownLatch latch = new CountDownLatch(1);
MyExceptionListener listener = new MyExceptionListener(latch);
conn.setExceptionListener(listener);
ClientSessionInternal coreSession = (ClientSessionInternal) ((ActiveMQConnection) conn).getInitialSession();
coreSession.getConnection().fail(new ActiveMQInternalErrorException("blah"));
latch.await(5, TimeUnit.SECONDS);
Assert.assertEquals(1, listener.numCalls);
conn.close();
}
@Test(timeout = 20000)
public void testRemotelyEndConnectionWithRedirect() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final CountDownLatch done = new CountDownLatch(1);
final AtomicReference<JMSException> asyncError = new AtomicReference<JMSException>();
final String redirectVhost = "vhost";
final String redirectNetworkHost = "localhost";
final int redirectPort = 5677;
// Don't set a ClientId, so that the underlying AMQP connection isn't established yet
Connection connection = testFixture.establishConnecton(testPeer, false, null, null, null, false);
// Tell the test peer to close the connection when executing its last handler
Map<Symbol, Object> errorInfo = new HashMap<Symbol, Object>();
errorInfo.put(OPEN_HOSTNAME, redirectVhost);
errorInfo.put(NETWORK_HOST, redirectNetworkHost);
errorInfo.put(PORT, 5677);
testPeer.remotelyCloseConnection(true, ConnectionError.REDIRECT, "Connection redirected", errorInfo);
// Add the exception listener
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
asyncError.set(exception);
done.countDown();
}
});
// Trigger the underlying AMQP connection
connection.start();
assertTrue("Connection should report failure", done.await(5, TimeUnit.SECONDS));
assertTrue(asyncError.get() instanceof JMSException);
assertTrue(asyncError.get().getCause() instanceof ProviderConnectionRedirectedException);
ProviderConnectionRedirectedException redirect = (ProviderConnectionRedirectedException) asyncError.get().getCause();
URI redirectionURI = redirect.getRedirectionURI();
assertNotNull(redirectionURI);
assertTrue(redirectVhost, redirectionURI.getQuery().contains("amqp.vhost=" + redirectVhost));
assertEquals(redirectNetworkHost, redirectionURI.getHost());
assertEquals(redirectPort, redirectionURI.getPort());
testPeer.waitForAllHandlersToComplete(1000);
connection.close();
}
}
@Test(timeout = 60000)
public void testConnectionCanBeCreatedAfterFailure() throws Exception {
final CountDownLatch failed = new CountDownLatch(1);
Connection connection = cf.createConnection();
LOG.info("Fetched new connection from the pool: {}", connection);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.info("Pooled Connection failed");
failed.countDown();
}
});
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test");
MessageProducer producer = session.createProducer(queue);
MockJMSConnection mockConnection = (MockJMSConnection) ((JmsPoolConnection) connection).getConnection();
mockConnection.injectConnectionFailure(new IOException("Lost connection"));
assertTrue(failed.await(15, TimeUnit.SECONDS));
try {
producer.send(session.createMessage());
fail("Should be disconnected");
} catch (JMSException ex) {
LOG.info("Producer failed as expected: {}", ex.getMessage());
}
Connection connection2 = cf.createConnection();
assertNotSame(connection, connection2);
LOG.info("Fetched new connection from the pool: {}", connection2);
session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection2.close();
cf.stop();
}
@Test
@BMRules(
rules = {@BMRule(
name = "Corrupt Decoding",
targetClass = "org.apache.activemq.artemis.core.protocol.ClientPacketDecoder",
targetMethod = "decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.DisconnectOnCriticalFailureTest.doThrow($1);")})
public void testClientDisconnect() throws Exception {
Queue q1 = createQueue("queue1");
final Connection connection = nettyCf.createConnection();
final CountDownLatch latch = new CountDownLatch(1);
try {
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException e) {
latch.countDown();
}
});
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(q1);
TextMessage m = session.createTextMessage("hello");
producer.send(m);
connection.start();
corruptPacket.set(true);
MessageConsumer consumer = session.createConsumer(q1);
consumer.receive(2000);
assertTrue(latch.await(5, TimeUnit.SECONDS));
} finally {
corruptPacket.set(false);
if (connection != null) {
connection.close();
}
}
}
private Connection createConnection(final String username,
final String password,
final ConnectionFactoryFactory cff,
final String clientID,
final boolean isXA,
boolean isSource) throws Exception {
Connection conn = null;
try {
Object cf = cff.createConnectionFactory();
if (cf instanceof ActiveMQConnectionFactory && registry != null) {
registry.register(XARecoveryConfig.newConfig((ActiveMQConnectionFactory) cf, username, password, null));
}
if (qualityOfServiceMode == QualityOfServiceMode.ONCE_AND_ONLY_ONCE && !(cf instanceof XAConnectionFactory)) {
throw new IllegalArgumentException("Connection factory must be XAConnectionFactory");
}
if (username == null) {
if (isXA) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Creating an XA connection");
}
conn = ((XAConnectionFactory) cf).createXAConnection();
} else {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Creating a non XA connection");
}
conn = ((ConnectionFactory) cf).createConnection();
}
} else {
if (isXA) {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Creating an XA connection");
}
conn = ((XAConnectionFactory) cf).createXAConnection(username, password);
} else {
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Creating a non XA connection");
}
conn = ((ConnectionFactory) cf).createConnection(username, password);
}
}
if (clientID != null) {
conn.setClientID(clientID);
}
boolean ha = false;
BridgeFailoverListener failoverListener = null;
if (conn instanceof ActiveMQConnection) {
ActiveMQConnectionFactory activeMQCF = (ActiveMQConnectionFactory) cf;
ha = activeMQCF.isHA();
if (ha) {
ActiveMQConnection activeMQConn = (ActiveMQConnection) conn;
failoverListener = new BridgeFailoverListener(isSource);
activeMQConn.setFailoverListener(failoverListener);
}
}
conn.setExceptionListener(new BridgeExceptionListener(ha, failoverListener, isSource));
return conn;
} catch (JMSException e) {
try {
if (conn != null) {
conn.close();
}
} catch (Throwable ignored) {
}
throw e;
}
}