下面列出了怎么用javax.jms.TopicConnection的API类实例代码及写法,或者点击链接到github查看源代码。
protected Session createSession(Connection connection) {
try {
if (JMSConstants.JMS_SPEC_VERSION_1_1.equals(jmsSpec) || JMSConstants.JMS_SPEC_VERSION_2_0
.equals(jmsSpec)) {
return connection.createSession(transactedSession, sessionAckMode);
} else {
if (this.destinationType.equals(JMSConstants.JMSDestinationType.QUEUE)) {
return (QueueSession) ((QueueConnection) (connection))
.createQueueSession(transactedSession, sessionAckMode);
} else if (this.destinationType.equals(JMSConstants.JMSDestinationType.TOPIC)) {
return (TopicSession) ((TopicConnection) (connection))
.createTopicSession(transactedSession, sessionAckMode);
}
}
} catch (JMSException e) {
logger.error("JMS Exception while obtaining session for factory '" + this.connectionFactoryString + "' " + e
.getMessage(), e);
}
return null;
}
/**
* Create a default Session for this ConnectionFactory,
* adapting to JMS 1.0.2 style queue/topic mode if necessary.
* @param con the JMS Connection to operate on
* @param mode the Session acknowledgement mode
* ({@code Session.TRANSACTED} or one of the common modes)
* @return the newly created Session
* @throws JMSException if thrown by the JMS API
*/
protected Session createSession(Connection con, Integer mode) throws JMSException {
// Determine JMS API arguments...
boolean transacted = (mode == Session.SESSION_TRANSACTED);
int ackMode = (transacted ? Session.AUTO_ACKNOWLEDGE : mode);
// Now actually call the appropriate JMS factory method...
if (Boolean.FALSE.equals(this.pubSubMode) && con instanceof QueueConnection) {
return ((QueueConnection) con).createQueueSession(transacted, ackMode);
}
else if (Boolean.TRUE.equals(this.pubSubMode) && con instanceof TopicConnection) {
return ((TopicConnection) con).createTopicSession(transacted, ackMode);
}
else {
return con.createSession(transacted, ackMode);
}
}
/**
* This implementation delegates to the {@code createTopicConnection(username, password)}
* method of the target TopicConnectionFactory, passing in the specified user credentials.
* If the specified username is empty, it will simply delegate to the standard
* {@code createTopicConnection()} method of the target ConnectionFactory.
* @param username the username to use
* @param password the password to use
* @return the Connection
* @see javax.jms.TopicConnectionFactory#createTopicConnection(String, String)
* @see javax.jms.TopicConnectionFactory#createTopicConnection()
*/
protected TopicConnection doCreateTopicConnection(
@Nullable String username, @Nullable String password) throws JMSException {
ConnectionFactory target = obtainTargetConnectionFactory();
if (!(target instanceof TopicConnectionFactory)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a TopicConnectionFactory");
}
TopicConnectionFactory queueFactory = (TopicConnectionFactory) target;
if (StringUtils.hasLength(username)) {
return queueFactory.createTopicConnection(username, password);
}
else {
return queueFactory.createTopicConnection();
}
}
@Test
public void testWithTopicConnection() throws JMSException {
Connection con = mock(TopicConnection.class);
SingleConnectionFactory scf = new SingleConnectionFactory(con);
TopicConnection con1 = scf.createTopicConnection();
con1.start();
con1.stop();
con1.close();
TopicConnection con2 = scf.createTopicConnection();
con2.start();
con2.stop();
con2.close();
scf.destroy(); // should trigger actual close
verify(con, times(2)).start();
verify(con, times(2)).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
@Test
public void testWithTopicConnectionFactoryAndJms11Usage() throws JMSException {
TopicConnectionFactory cf = mock(TopicConnectionFactory.class);
TopicConnection con = mock(TopicConnection.class);
given(cf.createConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createConnection();
Connection con2 = scf.createConnection();
con1.start();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).start();
verify(con).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
@Test
public void testWithTopicConnectionFactoryAndJms102Usage() throws JMSException {
TopicConnectionFactory cf = mock(TopicConnectionFactory.class);
TopicConnection con = mock(TopicConnection.class);
given(cf.createTopicConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createTopicConnection();
Connection con2 = scf.createTopicConnection();
con1.start();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).start();
verify(con).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
private static void publish() throws JMSException {
// get topic connection
TopicConnectionFactory connectionFactory = new WeEventConnectionFactory(defaultBrokerUrl);
TopicConnection connection = connectionFactory.createTopicConnection();
// start connection
connection.start();
// create session
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// create topic
Topic topic = session.createTopic(topicName);
// create publisher
TopicPublisher publisher = session.createPublisher(topic);
// send message
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(("hello WeEvent").getBytes(StandardCharsets.UTF_8));
publisher.publish(msg);
System.out.print("send done.");
connection.close();
}
/**
* Create a default Session for this ConnectionFactory,
* adapting to JMS 1.0.2 style queue/topic mode if necessary.
* @param con the JMS Connection to operate on
* @param mode the Session acknowledgement mode
* ({@code Session.TRANSACTED} or one of the common modes)
* @return the newly created Session
* @throws JMSException if thrown by the JMS API
*/
protected Session createSession(Connection con, Integer mode) throws JMSException {
// Determine JMS API arguments...
boolean transacted = (mode == Session.SESSION_TRANSACTED);
int ackMode = (transacted ? Session.AUTO_ACKNOWLEDGE : mode);
// Now actually call the appropriate JMS factory method...
if (Boolean.FALSE.equals(this.pubSubMode) && con instanceof QueueConnection) {
return ((QueueConnection) con).createQueueSession(transacted, ackMode);
}
else if (Boolean.TRUE.equals(this.pubSubMode) && con instanceof TopicConnection) {
return ((TopicConnection) con).createTopicSession(transacted, ackMode);
}
else {
return con.createSession(transacted, ackMode);
}
}
/**
* This implementation delegates to the {@code createTopicConnection(username, password)}
* method of the target TopicConnectionFactory, passing in the specified user credentials.
* If the specified username is empty, it will simply delegate to the standard
* {@code createTopicConnection()} method of the target ConnectionFactory.
* @param username the username to use
* @param password the password to use
* @return the Connection
* @see javax.jms.TopicConnectionFactory#createTopicConnection(String, String)
* @see javax.jms.TopicConnectionFactory#createTopicConnection()
*/
protected TopicConnection doCreateTopicConnection(
@Nullable String username, @Nullable String password) throws JMSException {
ConnectionFactory target = obtainTargetConnectionFactory();
if (!(target instanceof TopicConnectionFactory)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a TopicConnectionFactory");
}
TopicConnectionFactory queueFactory = (TopicConnectionFactory) target;
if (StringUtils.hasLength(username)) {
return queueFactory.createTopicConnection(username, password);
}
else {
return queueFactory.createTopicConnection();
}
}
@Override
public TopicConnection createTopicConnection(String username, String password) throws JMSException {
JmsTopicConnection connection = null;
try {
JmsConnectionInfo connectionInfo = configureConnectionInfo(username, password);
Provider provider = createProvider(remoteURI);
connection = new JmsTopicConnection(connectionInfo, provider);
connection.setExceptionListener(exceptionListener);
connection.connect();
} catch (Exception e) {
if (connection != null) {
try {
connection.close();
} catch (Throwable ignored) {}
}
throw JmsExceptionSupport.create(e);
}
return connection;
}
@Test
public void testWithTopicConnectionFactoryAndJms102Usage() throws JMSException {
TopicConnectionFactory cf = mock(TopicConnectionFactory.class);
TopicConnection con = mock(TopicConnection.class);
given(cf.createTopicConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createTopicConnection();
Connection con2 = scf.createTopicConnection();
con1.start();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).start();
verify(con).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
@Test(timeout = 60000)
public void testTopicMessageSend() throws Exception {
cf.setMaxConnections(1);
TopicConnection connection = cf.createTopicConnection();
try {
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(getTestName());
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.send(topicSession.createMessage());
assertEquals(1, cf.getNumConnections());
} finally {
connection.close();
cf.stop();
}
}
@Test(timeout = 60000)
public void testSenderAndPublisherDest() throws Exception {
JmsPoolXAConnectionFactory pcf = createXAPooledConnectionFactory();
QueueConnection connection = pcf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender sender = session.createSender(session.createQueue("AA"));
assertNotNull(sender.getQueue().getQueueName());
connection.close();
TopicConnection topicConnection = pcf.createTopicConnection();
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic("AA"));
assertNotNull(topicPublisher.getTopic().getTopicName());
topicConnection.close();
pcf.stop();
}
public static ManagedConnection create(
final Connection connection ) {
if ( (connection instanceof XAQueueConnection) && (connection instanceof XATopicConnection)) {
return new ManagedXAQueueTopicConnection(connection);
} else if (connection instanceof XAQueueConnection) {
return new ManagedXAQueueConnection((XAQueueConnection) connection);
} else if (connection instanceof XATopicConnection) {
return new ManagedXATopicConnection((XATopicConnection) connection);
} else if ( (connection instanceof QueueConnection) && (connection instanceof TopicConnection)) {
return new ManagedQueueTopicConnection(connection);
} else if (connection instanceof QueueConnection) {
return new ManagedQueueConnection((QueueConnection) connection);
} else if (connection instanceof TopicConnection) {
return new ManagedTopicConnection((TopicConnection) connection);
} else {
return new ManagedConnection(connection);
}
}
@Test
public void testNotificationProperties() throws Exception {
try (TopicConnection topicConnection = factory.createTopicConnection()) {
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic notificationsTopic = topicSession.createTopic("activemq.notifications");
TopicSubscriber subscriber = topicSession.createSubscriber(notificationsTopic);
List<Message> receivedMessages = new CopyOnWriteArrayList<>();
subscriber.setMessageListener(receivedMessages::add);
topicConnection.start();
Wait.waitFor(() -> receivedMessages.size() > 0);
Assert.assertTrue(receivedMessages.size() > 0);
for (Message message : receivedMessages) {
assertNotNull(message);
assertNotNull(message.getStringProperty("_AMQ_NotifType"));
}
}
}
@Test
public void testWithTopicConnection() throws JMSException {
Connection con = mock(TopicConnection.class);
SingleConnectionFactory scf = new SingleConnectionFactory(con);
TopicConnection con1 = scf.createTopicConnection();
con1.start();
con1.stop();
con1.close();
TopicConnection con2 = scf.createTopicConnection();
con2.start();
con2.stop();
con2.close();
scf.destroy(); // should trigger actual close
verify(con, times(2)).start();
verify(con, times(2)).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
/**
* Create a default Session for this ConnectionFactory,
* adapting to JMS 1.0.2 style queue/topic mode if necessary.
* @param con the JMS Connection to operate on
* @param mode the Session acknowledgement mode
* ({@code Session.TRANSACTED} or one of the common modes)
* @return the newly created Session
* @throws JMSException if thrown by the JMS API
*/
protected Session createSession(Connection con, Integer mode) throws JMSException {
// Determine JMS API arguments...
boolean transacted = (mode == Session.SESSION_TRANSACTED);
int ackMode = (transacted ? Session.AUTO_ACKNOWLEDGE : mode);
// Now actually call the appropriate JMS factory method...
if (Boolean.FALSE.equals(this.pubSubMode) && con instanceof QueueConnection) {
return ((QueueConnection) con).createQueueSession(transacted, ackMode);
}
else if (Boolean.TRUE.equals(this.pubSubMode) && con instanceof TopicConnection) {
return ((TopicConnection) con).createTopicSession(transacted, ackMode);
}
else {
return con.createSession(transacted, ackMode);
}
}
@Test
public void testTopicSessionCannotCreateQueues() throws Exception
{
TopicConnection topicConnection = getTopicConnection();
try
{
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicSession.createQueue("abc");
fail("Expected exception was not thrown");
}
catch (javax.jms.IllegalStateException s)
{
// PASS
}
finally
{
topicConnection.close();
}
}
@Test
public void testTopicSessionCannotCreateTemporaryQueues() throws Exception
{
TopicConnection topicConnection = getTopicConnection();
try
{
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicSession.createTemporaryQueue();
fail("Expected exception was not thrown");
}
catch (javax.jms.IllegalStateException s)
{
// PASS
}
finally
{
topicConnection.close();
}
}
@Test
public void testCreateQueueOnATopicSession() throws Exception {
TopicConnection c = (TopicConnection) getConnectionFactory().createConnection();
TopicSession s = c.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
try {
s.createQueue("TestQueue");
ProxyAssertSupport.fail("should throw IllegalStateException");
} catch (javax.jms.IllegalStateException e) {
// OK
}
c.close();
}
public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
try {
return ((TopicConnectionFactory) (this.connectionFactory)).createTopicConnection(userName, password);
} catch (JMSException e) {
logger.error(
"JMS Exception while creating topic connection through factory '" + this.connectionFactoryString
+ "' " + e.getMessage(), e);
}
return null;
}
protected void createTestResources() throws Exception {
connection = createTopicConnectionToMockProvider();
session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createTopic(_testName.getMethodName());
publisher = session.createPublisher(destination);
subscriber = session.createSubscriber(destination);
// Close the session explicitly, without closing the above.
session.close();
}
@Override
public TopicConnection createTopicConnection(String username, String password) throws JMSException {
ConnectionFactory target = getTargetConnectionFactory();
if (!(target instanceof TopicConnectionFactory)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no TopicConnectionFactory");
}
TopicConnection targetConnection = ((TopicConnectionFactory) target).createTopicConnection(username, password);
return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection);
}
/**
* Wrap the given Connection with a proxy that delegates every method call to it
* but handles Session lookup in a transaction-aware fashion.
* @param target the original Connection to wrap
* @return the wrapped Connection
*/
protected Connection getTransactionAwareConnectionProxy(Connection target) {
List<Class<?>> classes = new ArrayList<>(3);
classes.add(Connection.class);
if (target instanceof QueueConnection) {
classes.add(QueueConnection.class);
}
if (target instanceof TopicConnection) {
classes.add(TopicConnection.class);
}
return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(),
ClassUtils.toClassArray(classes), new TransactionAwareConnectionInvocationHandler(target));
}
@Override
public TopicConnection createTopicConnection() throws JMSException {
ConnectionFactory target = obtainTargetConnectionFactory();
if (target instanceof TopicConnectionFactory) {
return ((TopicConnectionFactory) target).createTopicConnection();
}
else {
Connection con = target.createConnection();
if (!(con instanceof TopicConnection)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a TopicConnectionFactory");
}
return (TopicConnection) con;
}
}
@Override
public TopicConnection createTopicConnection(String username, String password) throws JMSException {
ConnectionFactory target = obtainTargetConnectionFactory();
if (target instanceof TopicConnectionFactory) {
return ((TopicConnectionFactory) target).createTopicConnection(username, password);
}
else {
Connection con = target.createConnection(username, password);
if (!(con instanceof TopicConnection)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a TopicConnectionFactory");
}
return (TopicConnection) con;
}
}
/**
* Test introduced as a result of a TCK failure.
*/
@Test
public void testNonDurableSubscriberInvalidUnsubscribe() throws Exception {
TopicConnection conn = createTopicConnection();
conn.setClientID("clientIDxyz123");
TopicSession ts = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
try {
ts.unsubscribe("invalid-subscription-name");
ProxyAssertSupport.fail("this should fail");
} catch (javax.jms.InvalidDestinationException e) {
// OK
}
}
private static void subscribe() throws JMSException {
// get topic connection
TopicConnectionFactory connectionFactory = new WeEventConnectionFactory(defaultBrokerUrl);
TopicConnection connection = connectionFactory.createTopicConnection();
// start connection
connection.start();
// create session
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// create topic
Topic topic = session.createTopic(topicName);
// create subscriber
TopicSubscriber subscriber = session.createSubscriber(topic);
// create listener
subscriber.setMessageListener(message -> {
BytesMessage msg = (BytesMessage) message;
try {
byte[] data = new byte[(int) msg.getBodyLength()];
msg.readBytes(data);
System.out.println("received: " + new String(data, StandardCharsets.UTF_8));
} catch (JMSException e) {
e.printStackTrace();
}
});
connection.close();
}
@Override
public TopicConnection createTopicConnection() throws JMSException {
ConnectionFactory target = getTargetConnectionFactory();
if (!(target instanceof TopicConnectionFactory)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no TopicConnectionFactory");
}
TopicConnection targetConnection = ((TopicConnectionFactory) target).createTopicConnection();
return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection);
}
/**
* Wrap the given Connection with a proxy that delegates every method call to it
* but handles Session lookup in a transaction-aware fashion.
* @param target the original Connection to wrap
* @return the wrapped Connection
*/
protected Connection getTransactionAwareConnectionProxy(Connection target) {
List<Class<?>> classes = new ArrayList<>(3);
classes.add(Connection.class);
if (target instanceof QueueConnection) {
classes.add(QueueConnection.class);
}
if (target instanceof TopicConnection) {
classes.add(TopicConnection.class);
}
return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(),
ClassUtils.toClassArray(classes), new TransactionAwareConnectionInvocationHandler(target));
}