下面列出了javax.jms.Connection#stop ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testWithConnection() throws JMSException {
Connection con = mock(Connection.class);
SingleConnectionFactory scf = new SingleConnectionFactory(con);
Connection con1 = scf.createConnection();
con1.start();
con1.stop();
con1.close();
Connection con2 = scf.createConnection();
con2.start();
con2.stop();
con2.close();
scf.destroy(); // should trigger actual close
verify(con, times(2)).start();
verify(con, times(2)).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
@Test
@DisplayName("testLargeMessages")
void testLargeMessages(JmsProvider jmsProvider) throws Exception {
MessagingAddress addressTopic = new MessagingAddressBuilder()
.withNewMetadata()
.withNamespace(tenant.getMetadata().getNamespace())
.withName("jms-topic-large")
.endMetadata()
.withNewSpec()
.editOrNewTopic()
.endTopic()
.withAddress("jmsTopicLarge")
.endSpec()
.build();
resourceManager.createResource(addressTopic);
Context context = createContext(jmsProvider, addressTopic);
Connection connection = jmsProvider.createConnection(context);
connection.start();
assertSendReceiveLargeMessageTopic(jmsProvider, 1, addressTopic, 1);
assertSendReceiveLargeMessageTopic(jmsProvider, 0.5, addressTopic, 1);
assertSendReceiveLargeMessageTopic(jmsProvider, 0.25, addressTopic, 1);
connection.stop();
connection.close();
}
@Test
public void testWithConnection() throws JMSException {
Connection con = mock(Connection.class);
SingleConnectionFactory scf = new SingleConnectionFactory(con);
Connection con1 = scf.createConnection();
con1.start();
con1.stop();
con1.close();
Connection con2 = scf.createConnection();
con2.start();
con2.stop();
con2.close();
scf.destroy(); // should trigger actual close
verify(con, times(2)).start();
verify(con, times(2)).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
@Test
public void testWithConnection() throws JMSException {
Connection con = mock(Connection.class);
SingleConnectionFactory scf = new SingleConnectionFactory(con);
Connection con1 = scf.createConnection();
con1.start();
con1.stop();
con1.close();
Connection con2 = scf.createConnection();
con2.start();
con2.stop();
con2.close();
scf.destroy(); // should trigger actual close
verify(con, times(2)).start();
verify(con, times(2)).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
@Test
@DisplayName("testLargeMessagesQueue")
void testLargeMessagesQueue(JmsProvider jmsProvider) throws Exception {
MessagingAddress addressQueue = new MessagingAddressBuilder()
.withNewMetadata()
.withNamespace(tenant.getMetadata().getNamespace())
.withName("jms-queue-large")
.endMetadata()
.withNewSpec()
.editOrNewQueue()
.endQueue()
.withAddress("jmsQueueLarge")
.endSpec()
.build();
resourceManager.createResource(addressQueue);
Context context = createContext(jmsProvider, addressQueue);
Connection connection = jmsProvider.createConnection(context);
connection.start();
assertSendReceiveLargeMessageQueue(jmsProvider, 1, addressQueue, 1);
assertSendReceiveLargeMessageQueue(jmsProvider, 0.5, addressQueue, 1);
assertSendReceiveLargeMessageQueue(jmsProvider, 0.25, addressQueue, 1);
assertSendReceiveLargeMessageQueue(jmsProvider, 1, addressQueue, 1, DeliveryMode.PERSISTENT);
assertSendReceiveLargeMessageQueue(jmsProvider, 0.5, addressQueue, 1, DeliveryMode.PERSISTENT);
assertSendReceiveLargeMessageQueue(jmsProvider, 0.25, addressQueue, 1, DeliveryMode.PERSISTENT);
connection.stop();
connection.close();
}
/**
* This test simply validates that {@link ConnectionFactory} can be setup by
* pointing to the location of the client libraries at runtime. It uses
* ActiveMQ which is not present at the POM but instead pulled from Maven
* repo using {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which
* implies that for this test to run the computer must be connected to the
* Internet. If computer is not connected to the Internet, this test will
* quietly fail logging a message.
*/
@Test
public void validateFactoryCreationWithActiveMQLibraries() throws Exception {
try {
String libPath = TestUtils.setupActiveMqLibForTesting(true);
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI,
"vm://localhost?broker.persistent=false");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
"org.apache.activemq.ActiveMQConnectionFactory");
runner.enableControllerService(cfProvider);
runner.assertValid(cfProvider);
Connection connection = cfProvider.getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("myqueue");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = session.createTextMessage("Hello");
producer.send(message);
assertEquals("Hello", ((TextMessage) consumer.receive()).getText());
connection.stop();
connection.close();
} catch (Exception e) {
logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e);
}
}
/**
* This test simply validates that {@link ConnectionFactory} can be setup by
* pointing to the location of the client libraries at runtime. It uses
* ActiveMQ which is not present at the POM but instead pulled from Maven
* repo using {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which
* implies that for this test to run the computer must be connected to the
* Internet. If computer is not connected to the Internet, this test will
* quietly fail logging a message.
*/
@Test
public void validateFactoryCreationWithActiveMQLibraries() throws Exception {
try {
String libPath = TestUtils.setupActiveMqLibForTesting(true);
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI,
"vm://localhost?broker.persistent=false");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
"org.apache.activemq.ActiveMQConnectionFactory");
runner.enableControllerService(cfProvider);
runner.assertValid(cfProvider);
Connection connection = cfProvider.getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("myqueue");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = session.createTextMessage("Hello");
producer.send(message);
assertEquals("Hello", ((TextMessage) consumer.receive()).getText());
connection.stop();
connection.close();
} catch (Exception e) {
logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e);
}
}
/**
* This test simply validates that {@link ConnectionFactory} can be setup by pointing to the location of the client
* libraries at runtime. It uses ActiveMQ which is not present at the POM but instead pulled from Maven repo using
* {@link TestUtils#setupActiveMqLibForTesting(boolean)}, which implies that for this test to run the computer must
* be connected to the Internet. If computer is not connected to the Internet, this test will quietly fail logging a
* message.
*/
@Test
public void validateFactoryCreationWithActiveMQLibraries() throws Exception {
try {
String libPath = TestUtils.setupActiveMqLibForTesting(true);
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JNDIConnectionFactoryProvider cfProvider = new JNDIConnectionFactoryProvider();
runner.addControllerService("cfProvider", cfProvider);
runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.BROKER_URI,
"vm://localhost?broker.persistent=false");
runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.JNDI_CF_LOOKUP, "ConnectionFactory");
runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, libPath);
runner.setProperty(cfProvider, JNDIConnectionFactoryProvider.CONNECTION_FACTORY_IMPL,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
runner.enableControllerService(cfProvider);
runner.assertValid(cfProvider);
Connection connection = cfProvider.getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("myqueue");
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = session.createTextMessage("Hello");
producer.send(message);
assertEquals("Hello", ((TextMessage) consumer.receive()).getText());
connection.stop();
connection.close();
} catch (Exception e) {
logger.error("'validateFactoryCreationWithActiveMQLibraries' failed due to ", e);
}
}
@Test
public void connectionStopThenStart() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnectionBuilder().setPrefetch(0).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Utils.sendMessages(session, queue, MSG_COUNT);
connection.start();
final MessageConsumer consumer = session.createConsumer(queue);
final int messageToReceivedBeforeConnectionStop = MSG_COUNT / 2;
CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop);
consumer.setMessageListener(countingMessageListener);
countingMessageListener.awaitMessages(getReceiveTimeout());
connection.stop();
assertTrue("Too few messages received after Connection#stop()", countingMessageListener.getReceivedCount() >= messageToReceivedBeforeConnectionStop);
countingMessageListener.resetLatch();
connection.start();
countingMessageListener.awaitMessages(getReceiveTimeout());
assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount());
}
finally
{
connection.close();
}
}
@Test
public void connectionStopAndMessageListenerChange() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnectionBuilder().setPrefetch(0).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Utils.sendMessages(session, queue, MSG_COUNT);
connection.start();
final MessageConsumer consumer = session.createConsumer(queue);
final int messageToReceivedBeforeConnectionStop = MSG_COUNT / 2;
CountingMessageListener countingMessageListener1 = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop);
consumer.setMessageListener(countingMessageListener1);
countingMessageListener1.awaitMessages(getReceiveTimeout());
connection.stop();
assertTrue("Too few messages received after Connection#stop()", countingMessageListener1.getReceivedCount() >= messageToReceivedBeforeConnectionStop);
CountingMessageListener countingMessageListener2 = new CountingMessageListener(countingMessageListener1.getOutstandingCount());
consumer.setMessageListener(countingMessageListener2);
connection.start();
countingMessageListener2.awaitMessages(getReceiveTimeout());
assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener2.getOutstandingCount());
}
finally
{
connection.close();
}
}
@Test
public void connectionStopHaltsDeliveryToListener() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnectionBuilder().setPrefetch(0).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Utils.sendMessages(session, queue, MSG_COUNT);
connection.start();
final MessageConsumer consumer = session.createConsumer(queue);
final int messageToReceivedBeforeConnectionStop = MSG_COUNT / 2;
CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT, messageToReceivedBeforeConnectionStop);
consumer.setMessageListener(countingMessageListener);
countingMessageListener.awaitMessages(getReceiveTimeout());
connection.stop();
final int outstandingCountAtStop = countingMessageListener.getOutstandingCount();
countingMessageListener.resetLatch();
countingMessageListener.awaitMessages(getReceiveTimeout());
assertEquals("Unexpected number of outstanding messages", outstandingCountAtStop, countingMessageListener.getOutstandingCount());
}
finally
{
connection.close();
}
}
@Test
public void connectionStopReleasesPrefetchedMessages() throws Exception
{
assumeThat("Only 0-10 implements this feature", getProtocol(), is(equalTo(Protocol.AMQP_0_10)));
Connection connection1 = getConnectionBuilder().setPrefetch(3).build();
Queue queue = createQueue(getTestName());
try
{
connection1.start();
final Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer1 = session1.createConsumer(queue);
Utils.sendMessages(connection1, queue, 6);
final Message receivedMessage = consumer1.receive(getReceiveTimeout());
assertNotNull("First message was not received", receivedMessage);
assertEquals("Received message has unexpected index", 0, receivedMessage.getIntProperty(INDEX));
forceSync(session1);
connection1.stop();
observeNextAvailableMessage(queue, 1);
}
finally
{
connection1.close();
}
}
@Test
public void testStopConnectionDuringOnMessage() throws Exception {
if (log.isTraceEnabled()) {
log.trace("testStopConnectionWhileOnMessageIsExecuting");
}
final AtomicInteger messagesReceived = new AtomicInteger(0);
CountDownLatch messagesReceivedLatch = new CountDownLatch(1);
MessageListener myListener = message -> {
if (messagesReceived.incrementAndGet() == 10) {
messagesReceivedLatch.countDown();
try {
Thread.sleep(200L);
} catch (InterruptedException e) {
// Ignore
}
}
};
Connection producerConnection = null;
Connection consumerConnection = null;
try {
producerConnection = createConnection();
consumerConnection = createConnection();
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer queueProducer = producerSession.createProducer(queue1);
MessageConsumer queueConsumer = consumerSession.createConsumer(queue1);
queueConsumer.setMessageListener(myListener);
log.trace("Starting consumer connection");
consumerConnection.start();
final int MESSAGE_COUNT = 100;
log.trace("Sending the first batch of messages");
for (int i = 0; i < MESSAGE_COUNT / 2; i++) {
queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i)));
}
messagesReceivedLatch.await(500, TimeUnit.MILLISECONDS);
log.trace("Stopping consumer connection");
consumerConnection.stop();
int countAfterStop = messagesReceived.get();
ProxyAssertSupport.assertTrue("Should have received some messages before stopping", countAfterStop > 0);
log.trace("Sending the second batch of messages");
for (int i = MESSAGE_COUNT / 2; i < MESSAGE_COUNT; i++) {
queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i)));
}
ProxyAssertSupport.assertFalse("Should not receive any messages after the connection has been stopped", Wait.waitFor(() -> messagesReceived.get() > countAfterStop, 2000, 1000));
log.trace("Restarting consumer connection");
consumerConnection.start();
ProxyAssertSupport.assertTrue("Should have received all messages after restarting", Wait.waitFor(() -> messagesReceived.get() == MESSAGE_COUNT, 15000, 100));
} finally {
if (producerConnection != null) {
producerConnection.close();
}
if (consumerConnection != null) {
consumerConnection.close();
}
removeAllMessages(queue1.getQueueName(), true);
}
}
@Test
@Disabled("Not yet supported")
@DisplayName("testSharedDurableSubscription")
void testSharedDurableSubscription(JmsProvider jmsProvider) throws Exception {
String topicAddress = "jmsTopicDurable";
String subID = "sharedConsumerDurable123";
MessagingAddress addressTopic = new MessagingAddressBuilder()
.withNewMetadata()
.withNamespace(tenant.getMetadata().getNamespace())
.withName("jms-topic-durable")
.endMetadata()
.withNewSpec()
.editOrNewTopic()
.endTopic()
.withAddress(topicAddress)
.endSpec()
.build();
MessagingAddress addressSub1= new MessagingAddressBuilder()
.withNewMetadata()
.withNamespace(tenant.getMetadata().getNamespace())
.withName("jms-topic-durable-sub")
.endMetadata()
.withNewSpec()
.editOrNewSubscription()
.withTopic(topicAddress)
.endSubscription()
.withAddress(subID)
.endSpec()
.build();
resourceManager.createResource(addressTopic, addressSub1);
Context context1 = createContext(jmsProvider, addressTopic);
Connection connection1 = jmsProvider.createConnection(context1);
Context context2 = createContext(jmsProvider, addressTopic);
Connection connection2 = jmsProvider.createConnection(context2);
connection1.start();
connection2.start();
Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic testTopic = (Topic) jmsProvider.getDestination(topicAddress);
log.info("Creating subscriber 1");
MessageConsumer subscriber1 = session.createSharedDurableConsumer(testTopic, subID);
log.info("Creating subscriber 2");
MessageConsumer subscriber2 = session2.createSharedDurableConsumer(testTopic, subID);
log.info("Creating producer");
MessageProducer messageProducer = session.createProducer(testTopic);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
int count = 10;
List<javax.jms.Message> listMsgs = jmsProvider.generateMessages(session, count);
jmsProvider.sendMessages(messageProducer, listMsgs);
log.info("messages sent");
List<javax.jms.Message> recvd1 = jmsProvider.receiveMessages(subscriber1, count, 1);
List<javax.jms.Message> recvd2 = jmsProvider.receiveMessages(subscriber2, count, 1);
log.info(subID + " :messages received");
assertThat("Wrong count of messages received: by both receivers",
recvd1.size() + recvd2.size(), is(2 * count));
subscriber1.close();
subscriber2.close();
session.unsubscribe(subID);
session2.unsubscribe(subID);
connection1.stop();
connection2.stop();
session.close();
session2.close();
connection1.close();
connection2.close();
}
@Test
public void testExceptionListenerStopsConnection_ThrowsIllegalStateException() throws Exception
{
assumeThat(getBrokerAdmin().supportsRestart(), is(equalTo(true)));
final Connection connection = getConnection();
try
{
final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1);
final AtomicReference<JMSException> exceptionHolder = new AtomicReference<>();
final AtomicReference<Throwable> unexpectedExceptionHolder = new AtomicReference<>();
final ExceptionListener listener = exception -> {
exceptionHolder.set(exception);
try
{
connection.stop();
fail("Exception not thrown");
}
catch (IllegalStateException ise)
{
// PASS
}
catch (Throwable t)
{
unexpectedExceptionHolder.set(t);
}
finally
{
exceptionReceivedLatch.countDown();
}
};
connection.setExceptionListener(listener);
getBrokerAdmin().restart();
assertTrue("Exception was not propagated into exception listener in timely manner",
exceptionReceivedLatch.await(getReceiveTimeout(), TimeUnit.MILLISECONDS));
assertNotNull("Unexpected exception", exceptionHolder.get());
assertNull("Connection#stop() should not have thrown exception", unexpectedExceptionHolder.get());
}
finally
{
connection.close();
}
}
public void run(String... args) throws Exception {
String[] split = args[1].split("@");
String host = args[0];
String vpnName = split[1];
String username = split[0];
String password = args[2];
System.out.printf("BasicRequestor is connecting to Solace messaging at %s...%n", host);
// Programmatically create the connection factory using default settings
SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setVPN(vpnName);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// Create connection to the Solace router
Connection connection = connectionFactory.createConnection();
// Create a non-transacted, auto ACK session.
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.printf("Connected to the Solace Message VPN '%s' with client username '%s'.%n", vpnName,
username);
// Create the request topic programmatically
Topic requestTopic = session.createTopic(REQUEST_TOPIC_NAME);
// Create the message producer for the created queue
MessageProducer requestProducer = session.createProducer(requestTopic);
// The response will be received on this temporary queue.
TemporaryQueue replyToQueue = session.createTemporaryQueue();
// Create consumer for receiving the request's reply
MessageConsumer replyConsumer = session.createConsumer(replyToQueue);
// Start receiving replies
connection.start();
// Create a request.
TextMessage request = session.createTextMessage("Sample Request");
// The application must put the destination of the reply in the replyTo field of the request
request.setJMSReplyTo(replyToQueue);
// The application must put a correlation ID in the request
String correlationId = UUID.randomUUID().toString();
request.setJMSCorrelationID(correlationId);
System.out.printf("Sending request '%s' to topic '%s'...%n", request.getText(), requestTopic.toString());
// Send the request
requestProducer.send(requestTopic, request, DeliveryMode.NON_PERSISTENT,
Message.DEFAULT_PRIORITY,
Message.DEFAULT_TIME_TO_LIVE);
System.out.println("Sent successfully. Waiting for reply...");
// the main thread blocks at the next statement until a message received or the timeout occurs
Message reply = replyConsumer.receive(REPLY_TIMEOUT_MS);
if (reply == null) {
throw new Exception("Failed to receive a reply in " + REPLY_TIMEOUT_MS + " msecs");
}
// Process the reply
if (reply.getJMSCorrelationID() == null) {
throw new Exception(
"Received a reply message with no correlationID. This field is needed for a direct request.");
}
// Apache Qpid JMS prefixes correlation ID with string "ID:" so remove such prefix for interoperability
if (!reply.getJMSCorrelationID().replaceAll("ID:", "").equals(correlationId)) {
throw new Exception("Received invalid correlationID in reply message.");
}
if (reply instanceof TextMessage) {
System.out.printf("TextMessage response received: '%s'%n", ((TextMessage) reply).getText());
if (!reply.getBooleanProperty(SupportedProperty.SOLACE_JMS_PROP_IS_REPLY_MESSAGE)) {
System.out.println("Warning: Received a reply message without the isReplyMsg flag set.");
}
} else {
System.out.println("Message response received.");
}
System.out.printf("Message Content:%n%s%n", SolJmsUtility.dumpMessage(reply));
connection.stop();
// Close everything in the order reversed from the opening order
// NOTE: as the interfaces below extend AutoCloseable,
// with them it's possible to use the "try-with-resources" Java statement
// see details at https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
replyConsumer.close();
requestProducer.close();
session.close();
connection.close();
}
@Test
public void testCloseAfterStop() throws Exception {
Connection producerConnection = null;
Connection consumerConnection = null;
try {
producerConnection = createConnection();
consumerConnection = createConnection();
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer queueProducer = producerSession.createProducer(queue1);
MessageConsumer queueConsumer = consumerSession.createConsumer(queue1);
MessageListener myListener = new MessageListener() {
@Override
public void onMessage(final Message message) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// Ignore
}
}
};
queueConsumer.setMessageListener(myListener);
consumerConnection.start();
for (int i = 0; i < 100; i++) {
queueProducer.send(producerSession.createTextMessage("Message #" + Integer.toString(i)));
}
consumerConnection.stop();
consumerConnection.close();
consumerConnection = null;
} finally {
if (producerConnection != null) {
producerConnection.close();
}
if (consumerConnection != null) {
consumerConnection.close();
}
removeAllMessages(queue1.getQueueName(), true);
}
}
public void run(String... args) throws Exception {
String[] split = args[1].split("@");
String host = args[0];
String vpnName = split[1];
String username = split[0];
String password = args[2];
System.out.printf("QueueConsumer is connecting to Solace messaging at %s...%n", host);
// Programmatically create the connection factory using default settings
SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setVPN(vpnName);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// Enables persistent queues or topic endpoints to be created dynamically
// on the router, used when Session.createQueue() is called below
connectionFactory.setDynamicDurables(true);
// Create connection to the Solace router
Connection connection = connectionFactory.createConnection();
// Create a non-transacted, client ACK session.
Session session = connection.createSession(false, SupportedProperty.SOL_CLIENT_ACKNOWLEDGE);
System.out.printf("Connected to the Solace Message VPN '%s' with client username '%s'.%n", vpnName,
username);
// Create the queue programmatically and the corresponding router resource
// will also be created dynamically because DynamicDurables is enabled.
Queue queue = session.createQueue(QUEUE_NAME);
// From the session, create a consumer for the destination.
MessageConsumer messageConsumer = session.createConsumer(queue);
// Use the anonymous inner class for receiving messages asynchronously
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
System.out.printf("TextMessage received: '%s'%n", ((TextMessage) message).getText());
} else {
System.out.println("Message received.");
}
System.out.printf("Message Content:%n%s%n", SolJmsUtility.dumpMessage(message));
// ACK the received message manually because of the set SupportedProperty.SOL_CLIENT_ACKNOWLEDGE above
message.acknowledge();
latch.countDown(); // unblock the main thread
} catch (JMSException ex) {
System.out.println("Error processing incoming message.");
ex.printStackTrace();
}
}
});
// Start receiving messages
connection.start();
System.out.println("Awaiting message...");
// the main thread blocks at the next statement until a message received
latch.await();
connection.stop();
// Close everything in the order reversed from the opening order
// NOTE: as the interfaces below extend AutoCloseable,
// with them it's possible to use the "try-with-resources" Java statement
// see details at https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
messageConsumer.close();
session.close();
connection.close();
}
public void run(String... args) throws Exception {
String[] split = args[1].split("@");
String host = args[0];
String vpnName = split[1];
String username = split[0];
String password = args[2];
System.out.printf("QueueConsumerJNDI is connecting to Solace messaging at %s...%n", host);
// setup environment variables for creating of the initial context
Hashtable<String, Object> env = new Hashtable<String, Object>();
// use the Solace JNDI initial context factory
env.put(InitialContext.INITIAL_CONTEXT_FACTORY, "com.solacesystems.jndi.SolJNDIInitialContextFactory");
// assign Solace message router connection parameters
env.put(InitialContext.PROVIDER_URL, host);
env.put(Context.SECURITY_PRINCIPAL, username + '@' + vpnName); // Formatted as [email protected]
env.put(Context.SECURITY_CREDENTIALS, password);
// Create the initial context that will be used to lookup the JMS Administered Objects.
InitialContext initialContext = new InitialContext(env);
// Lookup the connection factory
ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup(CONNECTION_FACTORY_JNDI_NAME);
// Create connection to the Solace router
Connection connection = connectionFactory.createConnection();
// Create a non-transacted, client ACK session.
Session session = connection.createSession(false, SupportedProperty.SOL_CLIENT_ACKNOWLEDGE);
System.out.printf("Connected to the Solace Message VPN '%s' with client username '%s'.%n", vpnName,
username);
// Lookup the queue.
Queue queue = (Queue) initialContext.lookup(QUEUE_JNDI_NAME);
// From the session, create a consumer for the destination.
MessageConsumer messageConsumer = session.createConsumer(queue);
// Use the anonymous inner class for receiving messages asynchronously
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
System.out.printf("TextMessage received: '%s'%n", ((TextMessage) message).getText());
} else {
System.out.println("Message received.");
}
System.out.printf("Message Content:%n%s%n", SolJmsUtility.dumpMessage(message));
// ACK the received message manually because of the set SupportedProperty.SOL_CLIENT_ACKNOWLEDGE above
message.acknowledge();
latch.countDown(); // unblock the main thread
} catch (JMSException ex) {
System.out.println("Error processing incoming message.");
ex.printStackTrace();
}
}
});
// Start receiving messages
connection.start();
System.out.println("Awaiting message...");
// the main thread blocks at the next statement until a message received
latch.await();
connection.stop();
// Close everything in the order reversed from the opening order
// NOTE: as the interfaces below extend AutoCloseable,
// with them it's possible to use the "try-with-resources" Java statement
// see details at https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
messageConsumer.close();
session.close();
connection.close();
// The initial context needs to be close; it does not extend AutoCloseable
initialContext.close();
}
public static void main(final String[] args) throws Exception {
Connection connectionEUWest = null;
Connection connectionEUEast = null;
try {
// Step 1. Instantiate the Topic (multicast) for the producers
Topic topic = ActiveMQJMSClient.createTopic("exampleTopic");
//Create a topic for the consumers
Topic topic2 = ActiveMQJMSClient.createTopic("divertExampleTopic");
// Step 2. Instantiate connection towards server EU West
ConnectionFactory cfEUWest = new ActiveMQConnectionFactory("tcp://localhost:61616");
// Step 3. Instantiate connection towards server EU East
ConnectionFactory cfEUEast = new ActiveMQConnectionFactory("tcp://localhost:61617");
// Step 5. We create a JMS Connection connectionEUWest which is a connection to server EU West
connectionEUWest = cfEUWest.createConnection();
// Step 6. We create a JMS Connection connectionEUEast which is a connection to server EU East
connectionEUEast = cfEUEast.createConnection();
// Step 8. We create a JMS Session on server EU West
Session sessionEUWest = connectionEUWest.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 9. We create a JMS Session on server EU East
Session sessionEUEast = connectionEUEast.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 11. We start the connections to ensure delivery occurs on them
connectionEUWest.start();
connectionEUEast.start();
// Step 12. We create a JMS MessageProducer object on each server
MessageProducer producerEUEast = sessionEUEast.createProducer(topic);
// Step 13. We create JMS MessageConsumer objects on each server - Messages will be diverted to this topic
MessageConsumer consumerEUWest = sessionEUWest.createSharedDurableConsumer(topic2, "exampleSubscription");
// Step 14. Let a little time for everything to start and form.
Thread.sleep(5000);
// Step 13. We send some messages to server EU West
final int numMessages = 10;
// Step 15. Repeat same test one last time, this time sending on EU East
for (int i = 0; i < numMessages; i++) {
TextMessage message = sessionEUEast.createTextMessage("This is text sent from EU East, message " + i);
producerEUEast.send(message);
System.out.println("EU East :: Sent message: " + message.getText());
}
// Step 14. We now consume those messages on *all* servers .
// We note that every consumer, receives a message even so on seperate servers
for (int i = 0; i < numMessages; i++) {
TextMessage messageEUWest = (TextMessage) consumerEUWest.receive(5000);
System.out.println("EU West :: Got message: " + messageEUWest.getText());
}
} finally {
// Step 16. Be sure to close our resources!
if (connectionEUWest != null) {
connectionEUWest.stop();
connectionEUWest.close();
}
if (connectionEUEast != null) {
connectionEUEast.stop();
connectionEUEast.close();
}
}
}