下面列出了怎么用javax.jms.ConnectionFactory的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Method to publish a messege to JwtRevocation topic
*
* @throws NamingException Error thrown while handling initial context
* @throws JMSException Error thrown while creating JMS connection
*/
private void publishMessage() throws NamingException, JMSException {
String topicName = "jwtRevocation";
InitialContext initialContext = ClientHelper.getInitialContextBuilder("admin", "admin",
"localhost", "5672").withTopic(topicName).build();
ConnectionFactory connectionFactory = (ConnectionFactory) initialContext
.lookup(ClientHelper.CONNECTION_FACTORY);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = (Topic) initialContext.lookup(topicName);
MessageProducer producer = session.createProducer(topic);
MapMessage message = session.createMapMessage();
message.setString("revokedToken", jti);
message.setString("ttl", "3600");
producer.send(message);
connection.close();
}
public void create(String clientId, String topicName)
throws JMSException {
this.clientId = clientId;
// create a Connection Factory
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
// create a Connection
connection = connectionFactory.createConnection();
connection.setClientID(clientId);
// create a Session
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create the Topic from which messages will be received
Topic topic = session.createTopic(topicName);
// create a MessageConsumer for receiving messages
messageConsumer = session.createConsumer(topic);
// start the connection in order to receive messages
connection.start();
}
private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException, InterruptedException {
Connection connection = cf.createConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = session.createTemporaryQueue();
TextMessage msg = session.createTextMessage("Request");
msg.setJMSReplyTo(tempQueue);
MessageProducer producer = session.createProducer(session.createQueue(serviceQueue));
producer.send(msg);
MessageConsumer consumer = session.createConsumer(tempQueue);
Message replyMsg = consumer.receive();
assertNotNull(replyMsg);
LOG.debug("Reply message: {}", replyMsg);
consumer.close();
producer.close();
session.close();
} finally {
connection.close();
}
}
/**
* Ensure no blocking calls in acknowledge flow when block on acknowledge = false.
* This is done by checking the performance compared to blocking is much improved.
*/
@Test
public void testNonBlockingAckPerf() throws Exception {
ConnectionFactory cf1 = ActiveMQJMSClient.createConnectionFactory("tcp://127.0.0.1:61616?blockOnNonDurableSend=true&blockOnAcknowledge=false", "testsuitecf1");
ConnectionFactory cf2 = ActiveMQJMSClient.createConnectionFactory("tcp://127.0.0.1:61616?blockOnNonDurableSend=true&blockOnAcknowledge=true", "testsuitecf2");
int messageCount = 100;
long sendT1 = send(cf1, queue1, messageCount);
long sendT2 = send(cf2, queue2, messageCount);
long time1 = consume(cf1, queue1, messageCount);
long time2 = consume(cf2, queue2, messageCount);
log.debug("BlockOnAcknowledge=false MessageCount=" + messageCount + " TimeToConsume=" + time1);
log.debug("BlockOnAcknowledge=true MessageCount=" + messageCount + " TimeToConsume=" + time2);
Assert.assertTrue(time1 < (time2 / 2));
}
public static void main(String args[]) throws Exception {
// create CamelContext
CamelContext context = new DefaultCamelContext();
// connect to embedded ActiveMQ JMS broker
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("vm://localhost");
context.addComponent("jms",
JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
// add our route to the CamelContext
context.addRoutes(new RouteBuilder() {
@Override
public void configure() {
from("ftp://rider.com/orders?username=rider&password=secret").to("jms:incomingOrders");
}
});
// start the route and let it do its work
context.start();
Thread.sleep(10000);
// stop the CamelContext
context.stop();
}
protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception {
String url = broker.getTransportConnectors().get(0).getServer().getConnectURI().toString();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connectionFactory.setOptimizedMessageDispatch(true);
connectionFactory.setCopyMessageOnSend(false);
connectionFactory.setUseCompression(false);
connectionFactory.setDispatchAsync(false);
connectionFactory.setUseAsyncSend(false);
connectionFactory.setOptimizeAcknowledge(false);
connectionFactory.setWatchTopicAdvisories(true);
ActiveMQPrefetchPolicy qPrefetchPolicy = new ActiveMQPrefetchPolicy();
qPrefetchPolicy.setQueuePrefetch(100);
qPrefetchPolicy.setTopicPrefetch(1000);
connectionFactory.setPrefetchPolicy(qPrefetchPolicy);
connectionFactory.setAlwaysSyncSend(true);
return connectionFactory;
}
@Test
public void testLazyTransactionalSession() throws JMSException {
ConnectionFactory cf = mock(ConnectionFactory.class);
Connection con = mock(Connection.class);
final Session session = mock(Session.class);
JmsTransactionManager tm = new JmsTransactionManager(cf);
tm.setLazyResourceRetrieval(true);
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
given(cf.createConnection()).willReturn(con);
given(con.createSession(true, Session.AUTO_ACKNOWLEDGE)).willReturn(session);
JmsTemplate jt = new JmsTemplate(cf);
jt.execute((SessionCallback<Void>) sess -> {
assertSame(sess, session);
return null;
});
tm.commit(ts);
verify(session).commit();
verify(session).close();
verify(con).close();
}
private void sendTextMessages(int nMsgs, ConnectionFactory factory) throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue(testQueueName);
MessageProducer producer = session.createProducer(queue);
TextMessage msg = session.createTextMessage();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < PAYLOAD; ++i) {
builder.append("A");
}
msg.setText(builder.toString());
for (int i = 0; i < nMsgs; ++i) {
msg.setIntProperty("i", (Integer) i);
producer.send(msg);
}
}
}
private void doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(boolean clientID, UnsignedByte saslFailureCode) throws Exception {
String optionString;
if (clientID) {
optionString = "?jms.clientID=myClientID";
} else {
optionString = "?jms.awaitClientID=false";
}
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
testPeer.expectSaslFailingExchange(new Symbol[] {PLAIN, ANONYMOUS}, PLAIN, saslFailureCode);
ConnectionFactory factory = new JmsConnectionFactory("failover:(amqp://localhost:" + testPeer.getServerPort() + ")" + optionString);
try {
factory.createConnection("username", "password");
fail("Excepted exception to be thrown");
}catch (JMSSecurityException jmsse) {
LOG.info("Caught expected security exception: {}", jmsse.getMessage());
}
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Configuration
public Properties config() {
return new PropertiesBuilder()
.p("amq", "new://Resource?type=ActiveMQResourceAdapter")
.p("amq.DataSource", "")
.p("amq.BrokerXmlConfig", "broker:(vm://localhost)")
.p("target", "new://Resource?type=Queue")
.p("mdbs", "new://Container?type=MESSAGE")
.p("mdbs.ResourceAdapter", "amq")
.p("cf", "new://Resource?type=" + ConnectionFactory.class.getName())
.p("cf.ResourceAdapter", "amq")
.p("xaCf", "new://Resource?class-name=" + ActiveMQXAConnectionFactory.class.getName())
.p("xaCf.BrokerURL", "vm://localhost")
.build();
}
@Test
public void validateFailedPublishAndTransferToFailure() throws Exception {
ConnectionFactory cf = mock(ConnectionFactory.class);
PublishJMS pubProc = new PublishJMS();
TestRunner runner = TestRunners.newTestRunner(pubProc);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, "fooQueue");
runner.enqueue("Hello Joe".getBytes());
runner.run();
Thread.sleep(200);
assertTrue(runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).isEmpty());
assertNotNull(runner.getFlowFilesForRelationship(PublishJMS.REL_FAILURE).get(0));
}
public void sendMessage(SimpleString queue) throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
connection.start();
Destination destination = session.createQueue(queue.toString());
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage();
message.setText("Message");
producer.send(message);
} finally {
connection.close();
}
}
private JmsPoolConnectionFactory create(ConnectionFactory connectionFactory, JmsPoolConnectionFactoryProperties poolProperties) {
JmsPoolConnectionFactory pooledConnectionFactory = new JmsPoolConnectionFactory();
pooledConnectionFactory.setConnectionFactory(connectionFactory);
pooledConnectionFactory.setBlockIfSessionPoolIsFull(poolProperties.isBlockIfFull());
if (poolProperties.getBlockIfFullTimeout() != null) {
pooledConnectionFactory.setBlockIfSessionPoolIsFullTimeout(poolProperties.getBlockIfFullTimeout().toMillis());
}
if (poolProperties.getIdleTimeout() != null) {
pooledConnectionFactory.setConnectionIdleTimeout((int) poolProperties.getIdleTimeout().toMillis());
}
pooledConnectionFactory.setMaxConnections(poolProperties.getMaxConnections());
pooledConnectionFactory.setMaxSessionsPerConnection(poolProperties.getMaxSessionsPerConnection());
if (poolProperties.getTimeBetweenExpirationCheck() != null) {
pooledConnectionFactory.setConnectionCheckInterval(poolProperties.getTimeBetweenExpirationCheck().toMillis());
}
pooledConnectionFactory.setUseAnonymousProducers(poolProperties.isUseAnonymousProducers());
return pooledConnectionFactory;
}
private void doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(UnsignedByte saslFailureCode) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
testPeer.expectSaslFailingExchange(new Symbol[] {PLAIN, ANONYMOUS}, PLAIN, saslFailureCode);
ConnectionFactory factory = new JmsConnectionFactory("failover:(amqp://localhost:" + testPeer.getServerPort() + ")");
Connection connection = factory.createConnection("username", "password");
try {
connection.start();
fail("Excepted exception to be thrown");
}catch (JMSSecurityException jmsse) {
LOG.info("Caught expected security exception: {}", jmsse.getMessage());
}
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test
public void testWithConnectionFactory() throws JMSException {
ConnectionFactory cf = mock(ConnectionFactory.class);
Connection con = mock(Connection.class);
given(cf.createConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createConnection();
Connection con2 = scf.createConnection();
con1.start();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).start();
verify(con).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
@Test
public void testTransactionRollback() throws JMSException {
ConnectionFactory cf = mock(ConnectionFactory.class);
Connection con = mock(Connection.class);
final Session session = mock(Session.class);
given(cf.createConnection()).willReturn(con);
given(con.createSession(true, Session.AUTO_ACKNOWLEDGE)).willReturn(session);
JmsTransactionManager tm = new JmsTransactionManager(cf);
TransactionStatus ts = tm.getTransaction(new DefaultTransactionDefinition());
JmsTemplate jt = new JmsTemplate(cf);
jt.execute((SessionCallback<Void>) sess -> {
assertSame(sess, session);
return null;
});
tm.rollback(ts);
verify(session).rollback();
verify(session).close();
verify(con).close();
}
/**
* Sets the ConnectionFactory used to create new pooled Connections.
* <p>
* Updates to this value do not affect Connections that were previously created and placed
* into the pool. In order to allocate new Connections based off this new ConnectionFactory
* it is first necessary to {@link #clear} the pooled Connections.
*
* @param factory
* The factory to use to create pooled Connections.
*/
public void setConnectionFactory(final Object factory) {
if (factory instanceof ConnectionFactory) {
String logMessage = "JMS ConnectionFactory on classpath is not a JMS 2.0+ version.";
try {
ConnectionFactory.class.getMethod("createContext", int.class);
logMessage = "Provided ConnectionFactory implementation is not JMS 2.0+ capable.";
factory.getClass().getMethod("createContext", int.class);
logMessage = "Provided ConnectionFactory implementation is JMS 2.0+ capable.";
jmsContextSupported = true;
} catch (NoSuchMethodException | SecurityException e) {
} finally {
LOG.info(logMessage);
}
this.connectionFactory = factory;
} else {
throw new IllegalArgumentException("connectionFactory should implement javax.jms.ConnectionFactory");
}
}
@Override
protected CamelContext createCamelContext() throws Exception {
// create CamelContext
CamelContext camelContext = super.createCamelContext();
// connect to embedded ActiveMQ JMS broker
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("vm://localhost");
camelContext.addComponent("jms",
JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
return camelContext;
}
private ConnectionFactory getCFThruSerialization(ConnectionFactory fact) throws Exception {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(cf);
ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
ObjectInputStream ois = new ObjectInputStream(bis);
ConnectionFactory newCF = (ConnectionFactory) ois.readObject();
oos.close();
ois.close();
return newCF;
}
@RequestMapping("/activemq")
@ResponseBody
public String testcase() {
Session session = null;
Connection connection = null;
try {
ConnectionFactory factory = new ActiveMQConnectionFactory(USER_NAME, PASSWORD, brokenUrl);
connection = factory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test");
MessageProducer messageProducer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test");
messageProducer.send(message);
session.commit();
session.close();
connection.close();
} catch (Exception ex) {
logger.error(ex);
try {
session.close();
connection.close();
} catch (JMSException e) {
logger.error(e);
}
}
new ConsumerThread().start();
return SUCCESS;
}
@Bean
public ConnectionFactory nativeConnectionFactory(){
final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL("tcp://localhost:61616");
cf.setTrustAllPackages(true);
return cf;
}
@Override
protected void setUp() throws Exception {
if (broker == null) {
broker = createBroker();
}
ConnectionFactory factory = createConnectionFactory();
connection = factory.createConnection();
connection.start();
super.setUp();
}
public void openConnection() throws JMSException {
// Create a new connection factory
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
connection = connectionFactory.createConnection();
}
@Test
public void testContextRefreshedEventStartsTheConnectionByDefault() throws Exception {
MessageConsumer messageConsumer = mock(MessageConsumer.class);
Session session = mock(Session.class);
// Queue gets created in order to create MessageConsumer for that Destination...
given(session.createQueue(DESTINATION_NAME)).willReturn(QUEUE_DESTINATION);
// and then the MessageConsumer gets created...
given(session.createConsumer(QUEUE_DESTINATION, null)).willReturn(messageConsumer); // no MessageSelector...
Connection connection = mock(Connection.class);
// session gets created in order to register MessageListener...
given(connection.createSession(this.container.isSessionTransacted(),
this.container.getSessionAcknowledgeMode())).willReturn(session);
// and the connection is start()ed after the listener is registered...
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
given(connectionFactory.createConnection()).willReturn(connection);
this.container.setConnectionFactory(connectionFactory);
this.container.setDestinationName(DESTINATION_NAME);
this.container.setMessageListener(new TestMessageListener());
this.container.afterPropertiesSet();
GenericApplicationContext context = new GenericApplicationContext();
context.getBeanFactory().registerSingleton("messageListenerContainer", this.container);
context.refresh();
verify(connection).setExceptionListener(this.container);
verify(connection).start();
}
public ClientJmsDelegate(final Context context)
{
try
{
_context = context;
final ConnectionFactory connectionFactory = (ConnectionFactory) _context.lookup("connectionfactory");
_controllerConnection = connectionFactory.createConnection();
_controllerConnection.start();
_controllerQueue = (Destination) context.lookup(DistributedTestConstants.CONTROLLER_QUEUE_JNDI_NAME);
_instructionListenerSession = _controllerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
_controllerSession = _controllerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
_controlQueueProducer = _controllerSession.createProducer(_controllerQueue);
_clientName = UUID.randomUUID().toString();
_testConnections = new ConcurrentHashMap<>();
_testSessions = new ConcurrentHashMap<>();
_testProducers = new ConcurrentHashMap<>();
_testConsumers = new ConcurrentHashMap<>();
_testSubscriptions = new ConcurrentHashMap<>();
_testMessageProviders = new ConcurrentHashMap<>();
_defaultMessageProvider = new MessageProvider(null);
_testSessionToConnections = new ConcurrentHashMap<>();
_queueCreator = QpidQueueCreatorFactory.createInstance();
}
catch (final NamingException ne)
{
throw new DistributedTestException("Unable to create client jms delegate", ne);
}
catch (final JMSException jmse)
{
throw new DistributedTestException("Unable to create client jms delegate", jmse);
}
}
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
camelContext.addComponent("jms", jmsComponentClientAcknowledge(connectionFactory));
return camelContext;
}
@Bean
public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
JmsTemplate template = new JmsTemplate(connectionFactory);
template.setPubSubDomain(false);
return template;
}
/**
* Set the JMS ConnectionFactory that this instance should manage transactions for.
*/
public void setConnectionFactory(ConnectionFactory cf) {
if (cf instanceof TransactionAwareConnectionFactoryProxy) {
// If we got a TransactionAwareConnectionFactoryProxy, we need to perform transactions
// for its underlying target ConnectionFactory, else JMS access code won't see
// properly exposed transactions (i.e. transactions for the target ConnectionFactory).
this.connectionFactory = ((TransactionAwareConnectionFactoryProxy) cf).getTargetConnectionFactory();
}
else {
this.connectionFactory = cf;
}
}
public MdbInvocationHandler(final ConnectionFactory connectionFactory, final Destination requestQueue) throws JMSException {
this.requestQueue = requestQueue;
// open a connection
connection = connectionFactory.createConnection();
connection.start();
// create a session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create a producer which is used to send requests
producer = session.createProducer(requestQueue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
protected void sendMessages(Destination destination, int count) throws Exception {
ConnectionFactory factory = createConnectionFactory();
Connection connection = factory.createConnection();
connection.start();
sendMessages(connection, destination, count);
connection.close();
}