下面列出了怎么用javax.jms.ExceptionListener的API类实例代码及写法,或者点击链接到github查看源代码。
private Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean start) throws JMSException {
JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
Connection connection = trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
if (start) {
connection.start();
}
return connection;
}
public void testSetExceptionListener() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
connection = (ActiveMQConnection) cf.createConnection();
assertNull(connection.getExceptionListener());
ExceptionListener exListener = new ExceptionListener() {
@Override
public void onException(JMSException arg0) {
}
};
cf.setExceptionListener(exListener);
connection.close();
connection = (ActiveMQConnection) cf.createConnection();
assertNotNull(connection.getExceptionListener());
assertEquals(exListener, connection.getExceptionListener());
connection.close();
connection = (ActiveMQConnection) cf.createConnection();
assertEquals(exListener, connection.getExceptionListener());
assertEquals(exListener, cf.getExceptionListener());
connection.close();
}
@Test
public void testWithJTA() throws JMSException, XAException, InterruptedException {
TransactionManager transactionManager = new GeronimoTransactionManager();
Connection connection = createXAConnection("brokerJTA", transactionManager);
Queue dest = JMSUtil.createQueue(connection, "test");
MessageListener listenerHandler = new TestMessageListener();
ExceptionListener exListener = new TestExceptionListener();
PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection, dest,
listenerHandler, exListener);
container.setTransacted(false);
container.setAcknowledgeMode(Session.SESSION_TRANSACTED);
container.setTransactionManager(transactionManager);
container.start();
testTransactionalBehaviour(connection, dest);
container.stop();
connection.close();
}
/**
* Prepare the given Connection before it is exposed.
* <p>The default implementation applies ExceptionListener and client id.
* Can be overridden in subclasses.
* @param con the Connection to prepare
* @throws JMSException if thrown by JMS API methods
* @see #setExceptionListener
* @see #setReconnectOnException
*/
protected void prepareConnection(Connection con) throws JMSException {
if (getClientId() != null) {
con.setClientID(getClientId());
}
if (this.aggregatedExceptionListener != null) {
con.setExceptionListener(this.aggregatedExceptionListener);
}
else if (getExceptionListener() != null || isReconnectOnException()) {
ExceptionListener listenerToUse = getExceptionListener();
if (isReconnectOnException()) {
this.aggregatedExceptionListener = new AggregatedExceptionListener();
this.aggregatedExceptionListener.delegates.add(this);
if (listenerToUse != null) {
this.aggregatedExceptionListener.delegates.add(listenerToUse);
}
listenerToUse = this.aggregatedExceptionListener;
}
con.setExceptionListener(listenerToUse);
}
}
@Test(timeout=60000)
public void testConnectionExceptionBrokerStop() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
connection = createAmqpConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
latch.countDown();
}
});
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
stopPrimaryBroker();
assertTrue(latch.await(10, TimeUnit.SECONDS));
connection.close();
}
/**
* Verify that the 'global' exception listener set on the connection factory
* is ignored when the factory gets serialized.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testSerializeThenDeserializeIgnoresGlobalExceptionListener() throws Exception {
String uri = "amqp://localhost:1234";
JmsConnectionFactory cf = new JmsConnectionFactory(uri);
cf.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
// Nothing
}
});
Map<String, String> props = cf.getProperties();
Object roundTripped = roundTripSerialize(cf);
assertNotNull("Null object returned", roundTripped);
assertEquals("Unexpected type", JmsConnectionFactory.class, roundTripped.getClass());
assertEquals("Unexpected uri", uri, ((JmsConnectionFactory)roundTripped).getRemoteURI());
Map<String, String> props2 = ((JmsConnectionFactory)roundTripped).getProperties();
assertFalse("Properties map should not contain ExceptionListener", props.containsKey("exceptionListener"));
assertEquals("Properties were not equal", props, props2);
}
@Override
public JmsConnection createConnection(final Connection connection, final ExceptionListener exceptionListener)
throws JMSException, NamingException {
checkNotNull(connection, "Connection");
checkNotNull(exceptionListener, "Exception Listener");
final Context ctx = createContext(connection);
final org.apache.qpid.jms.JmsConnectionFactory cf =
(org.apache.qpid.jms.JmsConnectionFactory) ctx.lookup(connection.getId().toString());
if (isSecuredConnection(connection) && connection.isValidateCertificates()) {
cf.setSslContext(SSLContextCreator.fromConnection(connection, null).withoutClientCertificate());
}
@SuppressWarnings("squid:S2095") final JmsConnection jmsConnection = (JmsConnection) cf.createConnection();
jmsConnection.setExceptionListener(exceptionListener);
return jmsConnection;
}
@Override
protected void createTestResources() throws Exception {
connection = createConnectionToMockProvider();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
}
});
Queue destination = session.createQueue(_testName.getMethodName());
sender = session.createProducer(destination);
receiver = session.createConsumer(destination);
connection.start();
providerListener.onConnectionFailure(new ProviderException("Something went wrong"));
final JmsConnection jmsConnection = connection;
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return !jmsConnection.isConnected();
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(2)));
}
@Override
public ExceptionListener getExceptionListener() {
try {
return connection.getExceptionListener();
} catch (JMSException jmse) {
throw JmsExceptionSupport.createRuntimeException(jmse);
}
}
@Test
public void testWithConnectionFactoryAndExceptionListener() throws JMSException {
ConnectionFactory cf = mock(ConnectionFactory.class);
Connection con = mock(Connection.class);
ExceptionListener listener = new ChainedExceptionListener();
given(cf.createConnection()).willReturn(con);
given(con.getExceptionListener()).willReturn(listener);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
scf.setExceptionListener(listener);
Connection con1 = scf.createConnection();
assertEquals(listener, con1.getExceptionListener());
con1.start();
con1.stop();
con1.close();
Connection con2 = scf.createConnection();
con2.start();
con2.stop();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).setExceptionListener(listener);
verify(con, times(2)).start();
verify(con, times(2)).stop();
verify(con).close();
}
/**
* Invoke the registered JMS ExceptionListener, if any.
* @param ex the exception that arose during JMS processing
* @see #setExceptionListener
*/
protected void invokeExceptionListener(JMSException ex) {
ExceptionListener exceptionListener = getExceptionListener();
if (exceptionListener != null) {
exceptionListener.onException(ex);
}
}
@Override
public ExceptionListener getExceptionListener() {
try {
return connection.getExceptionListener();
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
}
@Override
public void onException(JMSException ex) {
// Iterate over temporary copy in order to avoid ConcurrentModificationException,
// since listener invocations may in turn trigger registration of listeners...
Set<ExceptionListener> copy;
synchronized (connectionMonitor) {
copy = new LinkedHashSet<>(this.delegates);
}
for (ExceptionListener listener : copy) {
listener.onException(ex);
}
}
@Override
public ExceptionListener getExceptionListener() {
try {
return connection.getExceptionListener();
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public void setExceptionListener(ExceptionListener listener) {
try {
connection.setExceptionListener(listener);
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
/**
* Get the exception listener -- throws IllegalStateException
*
* @return The exception listener
* @throws JMSException Thrown if an error occurs
*/
@Override
public ExceptionListener getExceptionListener() throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("getExceptionListener()");
}
throw new IllegalStateException(ISE);
}
@Override
public ExceptionListener getExceptionListener() {
try {
return connection.getExceptionListener();
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Override
public void setExceptionListener(ExceptionListener listener) {
try {
connection.setExceptionListener(listener);
} catch (JMSException jmse) {
throw JMSExceptionSupport.createRuntimeException(jmse);
}
}
@Test(timeout = 60000)
public void testSetGetExceptionListener() throws Exception {
pcf = createPooledConnectionFactory();
connection = (TopicConnection) pcf.createConnection();
ExceptionListener listener = new ExceptionListener() {
@Override
public void onException(JMSException exception) {
}
};
connection.setExceptionListener(listener);
assertEquals(listener, connection.getExceptionListener());
}
@Override
public void setExceptionListener(ExceptionListener listener) {
if (xa) {
throw new JMSRuntimeException("Illegal call to setExceptionListener");
}
try {
connection().setExceptionListener(listener);
} catch (final JMSException e) {
throw toRuntimeException(e);
}
}
@SuppressWarnings("unused")
private JMSConnectionHandlingActor(final Connection connection, final ExceptionListener exceptionListener,
final JmsConnectionFactory jmsConnectionFactory) {
this.connection = checkNotNull(connection, "connection");
this.exceptionListener = exceptionListener;
this.jmsConnectionFactory = jmsConnectionFactory;
}
/**
* Set the exception listener -- throws IllegalStateException
*
* @param listener The exception listener
* @throws JMSException Thrown if an error occurs
*/
@Override
public void setExceptionListener(final ExceptionListener listener) throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("setExceptionListener(" + listener + ")");
}
throw new IllegalStateException(ISE);
}
@Test(timeout=30000)
public void testGetExceptionListener() throws JMSException {
connection = new JmsConnection(connectionInfo, provider);
assertNull(connection.getExceptionListener());
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
}
});
assertNotNull(connection.getExceptionListener());
}
@Override
public void setExceptionListener(ExceptionListener listener) {
try {
connection.setExceptionListener(listener);
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
}
@Override
public ExceptionListener getExceptionListener() {
try {
return connection().getExceptionListener();
} catch (final JMSException e) {
throw toRuntimeException(e);
}
}
public PollingMessageListenerContainer(Connection connection, Destination destination,
MessageListener listenerHandler, ExceptionListener exceptionListener) {
this.connection = connection;
this.destination = destination;
this.listenerHandler = listenerHandler;
this.exceptionListener = exceptionListener;
}
@Test(timeout = 20000)
public void testOnExceptionFiredOnSessionPoolFailure() throws Exception {
final CountDownLatch exceptionFired = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exceptionFired.countDown();
}
});
connection.start();
JmsFailingServerSessionPool sessionPool = new JmsFailingServerSessionPool();
// Now the Connection consumer arrives and we give it a message
// to be dispatched to the server session.
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
assertTrue("Exception should have been fired", exceptionFired.await(5, TimeUnit.SECONDS));
testPeer.expectDetach(true, true, true);
testPeer.expectDispositionThatIsReleasedAndSettled();
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Override
public void onException(JMSException ex) {
synchronized (connectionMonitor) {
// Iterate over temporary copy in order to avoid ConcurrentModificationException,
// since listener invocations may in turn trigger registration of listeners...
for (ExceptionListener listener : new LinkedHashSet<ExceptionListener>(this.delegates)) {
listener.onException(ex);
}
}
}
@Test
public void testWithConnectionFactoryAndExceptionListener() throws JMSException {
ConnectionFactory cf = mock(ConnectionFactory.class);
Connection con = mock(Connection.class);
ExceptionListener listener = new ChainedExceptionListener();
given(cf.createConnection()).willReturn(con);
given(con.getExceptionListener()).willReturn(listener);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
scf.setExceptionListener(listener);
Connection con1 = scf.createConnection();
assertEquals(listener, con1.getExceptionListener());
con1.start();
con1.stop();
con1.close();
Connection con2 = scf.createConnection();
con2.start();
con2.stop();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).setExceptionListener(listener);
verify(con, times(2)).start();
verify(con, times(2)).stop();
verify(con).close();
}
@Test
public void testRuntimeExceptionOnSetExceptionListenerFailure() throws JMSException {
JmsConnection connection = Mockito.mock(JmsConnection.class);
JmsContext context = new JmsContext(connection, JMSContext.CLIENT_ACKNOWLEDGE);
Mockito.doThrow(IllegalStateException.class).when(connection).setExceptionListener(nullable(ExceptionListener.class));
try {
context.setExceptionListener(null);
fail("Should throw ISRE");
} catch (IllegalStateRuntimeException isre) {
} finally {
context.close();
}
}