下面列出了怎么用javax.jms.TopicConnectionFactory的API类实例代码及写法,或者点击链接到github查看源代码。
private ConnectionFactory createConnectionFactory() {
if (this.connectionFactory != null) {
return this.connectionFactory;
}
if (ctx == null) {
return null;
}
try {
if (this.destinationType.equals(JMSConstants.JMSDestinationType.QUEUE)) {
this.connectionFactory = (QueueConnectionFactory) ctx.lookup(this.connectionFactoryString);
} else if (this.destinationType.equals(JMSConstants.JMSDestinationType.TOPIC)) {
this.connectionFactory = (TopicConnectionFactory) ctx.lookup(this.connectionFactoryString);
}
} catch (NamingException e) {
logger.error(
"Naming exception while obtaining connection factory for '" + this.connectionFactoryString + "'",
e);
}
return this.connectionFactory;
}
/**
* This implementation delegates to the {@code createTopicConnection(username, password)}
* method of the target TopicConnectionFactory, passing in the specified user credentials.
* If the specified username is empty, it will simply delegate to the standard
* {@code createTopicConnection()} method of the target ConnectionFactory.
* @param username the username to use
* @param password the password to use
* @return the Connection
* @see javax.jms.TopicConnectionFactory#createTopicConnection(String, String)
* @see javax.jms.TopicConnectionFactory#createTopicConnection()
*/
protected TopicConnection doCreateTopicConnection(
@Nullable String username, @Nullable String password) throws JMSException {
ConnectionFactory target = obtainTargetConnectionFactory();
if (!(target instanceof TopicConnectionFactory)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a TopicConnectionFactory");
}
TopicConnectionFactory queueFactory = (TopicConnectionFactory) target;
if (StringUtils.hasLength(username)) {
return queueFactory.createTopicConnection(username, password);
}
else {
return queueFactory.createTopicConnection();
}
}
@Test
public void testWithTopicConnectionFactoryAndJms11Usage() throws JMSException {
TopicConnectionFactory cf = mock(TopicConnectionFactory.class);
TopicConnection con = mock(TopicConnection.class);
given(cf.createConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createConnection();
Connection con2 = scf.createConnection();
con1.start();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).start();
verify(con).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
@Test
public void testWithTopicConnectionFactoryAndJms102Usage() throws JMSException {
TopicConnectionFactory cf = mock(TopicConnectionFactory.class);
TopicConnection con = mock(TopicConnection.class);
given(cf.createTopicConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createTopicConnection();
Connection con2 = scf.createTopicConnection();
con1.start();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).start();
verify(con).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
private static void publish() throws JMSException {
// get topic connection
TopicConnectionFactory connectionFactory = new WeEventConnectionFactory(defaultBrokerUrl);
TopicConnection connection = connectionFactory.createTopicConnection();
// start connection
connection.start();
// create session
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// create topic
Topic topic = session.createTopic(topicName);
// create publisher
TopicPublisher publisher = session.createPublisher(topic);
// send message
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(("hello WeEvent").getBytes(StandardCharsets.UTF_8));
publisher.publish(msg);
System.out.print("send done.");
connection.close();
}
/**
* This implementation delegates to the {@code createTopicConnection(username, password)}
* method of the target TopicConnectionFactory, passing in the specified user credentials.
* If the specified username is empty, it will simply delegate to the standard
* {@code createTopicConnection()} method of the target ConnectionFactory.
* @param username the username to use
* @param password the password to use
* @return the Connection
* @see javax.jms.TopicConnectionFactory#createTopicConnection(String, String)
* @see javax.jms.TopicConnectionFactory#createTopicConnection()
*/
protected TopicConnection doCreateTopicConnection(
@Nullable String username, @Nullable String password) throws JMSException {
ConnectionFactory target = obtainTargetConnectionFactory();
if (!(target instanceof TopicConnectionFactory)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a TopicConnectionFactory");
}
TopicConnectionFactory queueFactory = (TopicConnectionFactory) target;
if (StringUtils.hasLength(username)) {
return queueFactory.createTopicConnection(username, password);
}
else {
return queueFactory.createTopicConnection();
}
}
@Test
public void testWithTopicConnectionFactoryAndJms11Usage() throws JMSException {
TopicConnectionFactory cf = mock(TopicConnectionFactory.class);
TopicConnection con = mock(TopicConnection.class);
given(cf.createConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createConnection();
Connection con2 = scf.createConnection();
con1.start();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).start();
verify(con).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
@Test
public void testWithTopicConnectionFactoryAndJms102Usage() throws JMSException {
TopicConnectionFactory cf = mock(TopicConnectionFactory.class);
TopicConnection con = mock(TopicConnection.class);
given(cf.createTopicConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createTopicConnection();
Connection con2 = scf.createTopicConnection();
con1.start();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).start();
verify(con).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
@Test
public void testWithTopicConnectionFactoryAndJms11Usage() throws JMSException {
TopicConnectionFactory cf = mock(TopicConnectionFactory.class);
TopicConnection con = mock(TopicConnection.class);
given(cf.createConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createConnection();
Connection con2 = scf.createConnection();
con1.start();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).start();
verify(con).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
@Test
public void testWithTopicConnectionFactoryAndJms102Usage() throws JMSException {
TopicConnectionFactory cf = mock(TopicConnectionFactory.class);
TopicConnection con = mock(TopicConnection.class);
given(cf.createTopicConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createTopicConnection();
Connection con2 = scf.createTopicConnection();
con1.start();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).start();
verify(con).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
public TopicConnection getTopicConnection(TopicConnectionFactory tcf, String uniqueID )
throws JMSException {
final TopicConnection tc;
final String username = Config.parms.getString("us");
if (username != null && username.length() != 0) {
Log.logger.log(Level.INFO, "getTopicConnection(): authenticating as \"" + username + "\"");
final String password = Config.parms.getString("pw");
tc = tcf.createTopicConnection(username, password);
} else {
tc = tcf.createTopicConnection();
}
if (durable) {
// Note: change signature to match getConnection
setDurableConnectionId( tc, ((WorkerThread)Thread.currentThread()), uniqueID );
} // end if durable
return tc;
}
public JMSSink(final String tcfBindingName, final String topicBindingName, final String username,
final String password) {
try {
final Context ctx = new InitialContext();
final TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) lookup(ctx,
tcfBindingName);
final TopicConnection topicConnection =
topicConnectionFactory.createTopicConnection(username,
password);
topicConnection.start();
final TopicSession topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
final Topic topic = (Topic) ctx.lookup(topicBindingName);
final TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic);
topicSubscriber.setMessageListener(this);
} catch (final Exception e) {
logger.error("Could not read JMS message.", e);
}
}
public TopicConnection createTopicConnection() throws JMSException {
try {
return ((TopicConnectionFactory) (this.connectionFactory)).createTopicConnection();
} catch (JMSException e) {
logger.error(
"JMS Exception while creating topic connection through factory '" + this.connectionFactoryString
+ "' " + e.getMessage(), e);
}
return null;
}
public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
try {
return ((TopicConnectionFactory) (this.connectionFactory)).createTopicConnection(userName, password);
} catch (JMSException e) {
logger.error(
"JMS Exception while creating topic connection through factory '" + this.connectionFactoryString
+ "' " + e.getMessage(), e);
}
return null;
}
@Override
public TopicConnection createTopicConnection() throws JMSException {
ConnectionFactory target = getTargetConnectionFactory();
if (!(target instanceof TopicConnectionFactory)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no TopicConnectionFactory");
}
TopicConnection targetConnection = ((TopicConnectionFactory) target).createTopicConnection();
return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection);
}
@Override
public TopicConnection createTopicConnection(String username, String password) throws JMSException {
ConnectionFactory target = getTargetConnectionFactory();
if (!(target instanceof TopicConnectionFactory)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no TopicConnectionFactory");
}
TopicConnection targetConnection = ((TopicConnectionFactory) target).createTopicConnection(username, password);
return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection);
}
@Override
public TopicConnection createTopicConnection() throws JMSException {
ConnectionFactory target = obtainTargetConnectionFactory();
if (target instanceof TopicConnectionFactory) {
return ((TopicConnectionFactory) target).createTopicConnection();
}
else {
Connection con = target.createConnection();
if (!(con instanceof TopicConnection)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a TopicConnectionFactory");
}
return (TopicConnection) con;
}
}
@Override
public TopicConnection createTopicConnection(String username, String password) throws JMSException {
ConnectionFactory target = obtainTargetConnectionFactory();
if (target instanceof TopicConnectionFactory) {
return ((TopicConnectionFactory) target).createTopicConnection(username, password);
}
else {
Connection con = target.createConnection(username, password);
if (!(con instanceof TopicConnection)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a TopicConnectionFactory");
}
return (TopicConnection) con;
}
}
/**
* Create a JMS Connection via this template's ConnectionFactory.
* @return the new JMS Connection
* @throws javax.jms.JMSException if thrown by JMS API methods
*/
protected Connection doCreateConnection() throws JMSException {
ConnectionFactory cf = getTargetConnectionFactory();
if (Boolean.FALSE.equals(this.pubSubMode) && cf instanceof QueueConnectionFactory) {
return ((QueueConnectionFactory) cf).createQueueConnection();
}
else if (Boolean.TRUE.equals(this.pubSubMode) && cf instanceof TopicConnectionFactory) {
return ((TopicConnectionFactory) cf).createTopicConnection();
}
else {
return obtainTargetConnectionFactory().createConnection();
}
}
@Test
public void testCachingConnectionFactoryWithTopicConnectionFactoryAndJms102Usage() throws JMSException {
TopicConnectionFactory cf = mock(TopicConnectionFactory.class);
TopicConnection con = mock(TopicConnection.class);
TopicSession txSession = mock(TopicSession.class);
TopicSession nonTxSession = mock(TopicSession.class);
given(cf.createTopicConnection()).willReturn(con);
given(con.createTopicSession(true, Session.AUTO_ACKNOWLEDGE)).willReturn(txSession);
given(txSession.getTransacted()).willReturn(true);
given(con.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE)).willReturn(nonTxSession);
CachingConnectionFactory scf = new CachingConnectionFactory(cf);
scf.setReconnectOnException(false);
Connection con1 = scf.createTopicConnection();
Session session1 = con1.createSession(true, Session.AUTO_ACKNOWLEDGE);
session1.getTransacted();
session1.close(); // should lead to rollback
session1 = con1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
session1.close();
con1.start();
TopicConnection con2 = scf.createTopicConnection();
Session session2 = con2.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
session2.close();
session2 = con2.createSession(true, Session.AUTO_ACKNOWLEDGE);
session2.getTransacted();
session2.close();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(txSession).close();
verify(nonTxSession).close();
verify(con).start();
verify(con).stop();
verify(con).close();
}
private static void subscribe() throws JMSException {
// get topic connection
TopicConnectionFactory connectionFactory = new WeEventConnectionFactory(defaultBrokerUrl);
TopicConnection connection = connectionFactory.createTopicConnection();
// start connection
connection.start();
// create session
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// create topic
Topic topic = session.createTopic(topicName);
// create subscriber
TopicSubscriber subscriber = session.createSubscriber(topic);
// create listener
subscriber.setMessageListener(message -> {
BytesMessage msg = (BytesMessage) message;
try {
byte[] data = new byte[(int) msg.getBodyLength()];
msg.readBytes(data);
System.out.println("received: " + new String(data, StandardCharsets.UTF_8));
} catch (JMSException e) {
e.printStackTrace();
}
});
connection.close();
}
@Override
public TopicConnection createTopicConnection() throws JMSException {
ConnectionFactory target = getTargetConnectionFactory();
if (!(target instanceof TopicConnectionFactory)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no TopicConnectionFactory");
}
TopicConnection targetConnection = ((TopicConnectionFactory) target).createTopicConnection();
return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection);
}
@Override
public TopicConnection createTopicConnection(String username, String password) throws JMSException {
ConnectionFactory target = getTargetConnectionFactory();
if (!(target instanceof TopicConnectionFactory)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no TopicConnectionFactory");
}
TopicConnection targetConnection = ((TopicConnectionFactory) target).createTopicConnection(username, password);
return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection);
}
@Override
public TopicConnection createTopicConnection() throws JMSException {
ConnectionFactory target = obtainTargetConnectionFactory();
if (target instanceof TopicConnectionFactory) {
return ((TopicConnectionFactory) target).createTopicConnection();
}
else {
Connection con = target.createConnection();
if (!(con instanceof TopicConnection)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a TopicConnectionFactory");
}
return (TopicConnection) con;
}
}
@Override
public TopicConnection createTopicConnection(String username, String password) throws JMSException {
ConnectionFactory target = obtainTargetConnectionFactory();
if (target instanceof TopicConnectionFactory) {
return ((TopicConnectionFactory) target).createTopicConnection(username, password);
}
else {
Connection con = target.createConnection(username, password);
if (!(con instanceof TopicConnection)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a TopicConnectionFactory");
}
return (TopicConnection) con;
}
}
/**
* Create a JMS Connection via this template's ConnectionFactory.
* @return the new JMS Connection
* @throws javax.jms.JMSException if thrown by JMS API methods
*/
protected Connection doCreateConnection() throws JMSException {
ConnectionFactory cf = getTargetConnectionFactory();
if (Boolean.FALSE.equals(this.pubSubMode) && cf instanceof QueueConnectionFactory) {
return ((QueueConnectionFactory) cf).createQueueConnection();
}
else if (Boolean.TRUE.equals(this.pubSubMode) && cf instanceof TopicConnectionFactory) {
return ((TopicConnectionFactory) cf).createTopicConnection();
}
else {
return obtainTargetConnectionFactory().createConnection();
}
}
@Test
public void testCachingConnectionFactoryWithTopicConnectionFactoryAndJms102Usage() throws JMSException {
TopicConnectionFactory cf = mock(TopicConnectionFactory.class);
TopicConnection con = mock(TopicConnection.class);
TopicSession txSession = mock(TopicSession.class);
TopicSession nonTxSession = mock(TopicSession.class);
given(cf.createTopicConnection()).willReturn(con);
given(con.createTopicSession(true, Session.AUTO_ACKNOWLEDGE)).willReturn(txSession);
given(txSession.getTransacted()).willReturn(true);
given(con.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE)).willReturn(nonTxSession);
CachingConnectionFactory scf = new CachingConnectionFactory(cf);
scf.setReconnectOnException(false);
Connection con1 = scf.createTopicConnection();
Session session1 = con1.createSession(true, Session.AUTO_ACKNOWLEDGE);
session1.getTransacted();
session1.close(); // should lead to rollback
session1 = con1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
session1.close();
con1.start();
TopicConnection con2 = scf.createTopicConnection();
Session session2 = con2.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
session2.close();
session2 = con2.createSession(true, Session.AUTO_ACKNOWLEDGE);
session2.getTransacted();
session2.close();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(txSession).close();
verify(nonTxSession).close();
verify(con).start();
verify(con).stop();
verify(con).close();
}
@Test(timeout = 60000)
public void testInstanceOf() throws Exception {
cf = new JmsPoolConnectionFactory();
assertTrue(cf instanceof QueueConnectionFactory);
assertTrue(cf instanceof TopicConnectionFactory);
cf.stop();
}
@Test(timeout = 60000)
public void testInstanceOf() throws Exception {
JmsPoolConnectionFactory pcf = new JmsPoolConnectionFactory();
assertTrue(pcf instanceof QueueConnectionFactory);
assertTrue(pcf instanceof TopicConnectionFactory);
pcf.stop();
}
@Test(timeout = 60000)
public void testInstanceOf() throws Exception {
JmsPoolConnectionFactory pcf = new JmsPoolConnectionFactory();
assertTrue(pcf instanceof QueueConnectionFactory);
assertTrue(pcf instanceof TopicConnectionFactory);
pcf.stop();
}