下面列出了javax.jms.Connection#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void messageListener() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Utils.sendMessages(session, queue, MSG_COUNT);
connection.start();
final MessageConsumer consumer = session.createConsumer(queue);
CountingMessageListener countingMessageListener = new CountingMessageListener(MSG_COUNT);
consumer.setMessageListener(countingMessageListener);
countingMessageListener.awaitMessages(getReceiveTimeout());
assertEquals("Unexpected number of outstanding messages", 0, countingMessageListener.getOutstandingCount());
}
finally
{
connection.close();
}
}
@Test(timeout = 20000)
public void testCloseConsumer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test
public void testWithTopicConnectionFactoryAndJms102Usage() throws JMSException {
TopicConnectionFactory cf = mock(TopicConnectionFactory.class);
TopicConnection con = mock(TopicConnection.class);
given(cf.createTopicConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createTopicConnection();
Connection con2 = scf.createTopicConnection();
con1.start();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).start();
verify(con).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
public void submitBuild(Build build) {
try {
/*
* Getting JMS connection from the JMS server and starting it
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActiveMQUrl());
Connection connection = connectionFactory.createConnection();
connection.start();
/*
* Creating a non transactional session to send/receive JMS message.
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/*
* The queue will be created automatically on the server.
*/
Destination destination = session.createQueue(config.getActiveMQSubmitQueueName());
/*
* Destination represents here our queue 'MESSAGE_QUEUE' on the JMS server.
*
* MessageProducer is used for sending messages to the queue.
*/
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(Long.toString(build.getId()));
producer.send(message);
LOGGER.info("Build id '" + message.getText() + ", Sent Successfully to the Queue " + config.getActiveMQSubmitQueueName());
connection.close();
}catch (JMSException e) {
throw new RuntimeException(e);
}
}
@Test
public void testClientACK() throws Exception {
try {
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
producer.send(session.createTextMessage("test"));
Assert.assertNull(consumer.receive(100));
connection.start();
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
message.acknowledge();
connection.close();
System.err.println("Done!!!");
} catch (Throwable e) {
e.printStackTrace();
}
}
private void process() throws JMSException {
long end = System.currentTimeMillis() + 20000;
int transCount = 0;
LOG.info(toString() + " ONLINE.");
Connection con = openConnection();
Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = sess.createDurableSubscriber(topic, subName, selector, false);
try {
do {
long max = end - System.currentTimeMillis();
if (max <= 0) {
break;
}
Message message = consumer.receive(max);
if (message == null) {
continue;
}
LOG.info("Received Trans[id=" + message.getIntProperty("TRANS") + ", count=" + transCount + "] in " + this + ".");
} while (true);
} finally {
sess.close();
con.close();
LOG.info(toString() + " OFFLINE.");
}
}
private void testConsumerProducerWithAutoAck(String port, String queueName, int numberOfMessages) throws Exception {
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder("admin", "admin", "localhost", port)
.withQueue(queueName)
.build();
ConnectionFactory connectionFactory
= (ConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
Connection connection = connectionFactory.createConnection();
connection.start();
// publish 100 messages
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = producerSession.createQueue(queueName);
MessageProducer producer = producerSession.createProducer(queue);
for (int i = 0; i < numberOfMessages; i++) {
producer.send(producerSession.createTextMessage("Test message " + i));
}
producerSession.close();
// Consume published messages
Session subscriberSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination subscriberDestination = (Destination) initialContextForQueue.lookup(queueName);
MessageConsumer consumer = subscriberSession.createConsumer(subscriberDestination);
for (int i = 0; i < numberOfMessages; i++) {
Message message = consumer.receive(5000);
Assert.assertNotNull(message, "Message #" + i + " was not received");
}
connection.close();
}
public String receiveFromQueue() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(RepairnatorConfig.getInstance().getActiveMQUrl());
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue(RepairnatorConfig.getInstance().getActiveMQSubmitQueueName());
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message message = consumer.receive(1000);
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
consumer.close();
session.close();
connection.close();
return text;
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
private void sendMessageTo(ActiveMQTopic destination, BrokerService brokerService) throws Exception {
ConnectionFactory factory = createConnectionFactory(brokerService);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createProducer(destination).send(session.createTextMessage("Hi"));
conn.close();
}
public void test() throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(true);
broker.setPlugins(new BrokerPlugin[]{new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin()});
TransportConnector tcpConnector = broker.addConnector("tcp://localhost:0");
broker.addConnector("stomp://localhost:0");
broker.start();
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcpConnector.getConnectUri());
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination Queue
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a messages
Message sentMessage = session.createMessage();
// Tell the producer to send the message
long beforeSend = System.currentTimeMillis();
producer.send(sentMessage);
long afterSend = System.currentTimeMillis();
// assert message timestamp is in window
assertTrue(beforeSend <= sentMessage.getJMSTimestamp() && sentMessage.getJMSTimestamp() <= afterSend);
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message receivedMessage = consumer.receive(1000);
// assert we got the same message ID we sent
assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID());
// assert message timestamp is in window
assertTrue("JMS Message Timestamp should be set during the send method: \n" + " beforeSend = " + beforeSend + "\n" + " getJMSTimestamp = " + receivedMessage.getJMSTimestamp() + "\n" + " afterSend = " + afterSend + "\n", beforeSend <= receivedMessage.getJMSTimestamp() && receivedMessage.getJMSTimestamp() <= afterSend);
// assert message timestamp is unchanged
assertEquals("JMS Message Timestamp of received message should be the same as the sent message\n ", sentMessage.getJMSTimestamp(), receivedMessage.getJMSTimestamp());
// Clean up
producer.close();
consumer.close();
session.close();
connection.close();
}
@Test
public void testPriority() throws Exception
{
final int priorities = 10;
final Queue queue = createPriorityQueue(getTestName(), priorities);
final Connection producerConnection = getConnection();
try
{
final Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
final MessageProducer producer = producerSession.createProducer(queue);
for (int msg = 0; msg < MSG_COUNT; msg++)
{
producer.setPriority(msg % priorities);
producer.send(nextMessage(producerSession, msg));
}
producerSession.commit();
}
finally
{
producerConnection.close();
}
final Connection consumerConnection = getConnection();
try
{
final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message previous = null;
for (int messageCount = 0, expectedPriority = priorities - 1; messageCount < MSG_COUNT; messageCount++)
{
Message received = consumer.receive(getReceiveTimeout());
assertNotNull(String.format("Message '%d' is not received", messageCount), received);
assertEquals(String.format("Unexpected message '%d' priority", messageCount),
expectedPriority,
received.getJMSPriority());
if (previous != null)
{
assertTrue(String.format(
"Messages '%d' arrived in unexpected order : previous message '%d' priority is '%d', received message '%d' priority is '%d'",
messageCount,
previous.getIntProperty("msg"),
previous.getJMSPriority(),
received.getIntProperty("msg"),
received.getJMSPriority()),
previous.getJMSPriority() > received.getJMSPriority()
|| (previous.getJMSPriority() == received.getJMSPriority()
&& previous.getIntProperty("msg") < received.getIntProperty("msg")));
}
previous = received;
if (messageCount > 0 && (messageCount + 1) % (MSG_COUNT / priorities) == 0)
{
expectedPriority--;
}
}
}
finally
{
consumerConnection.close();
}
}
private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer, final Queue queue) throws Exception
{
producer.waitUntilQuarterOfMessagesSentToEncourageConflation();
Map<String, Integer> messageSequenceNumbersByKey = new HashMap<>();
Connection consumerConnection = getConnectionBuilder().setPrefetch(1).build();
try
{
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
LOGGER.info("Starting to receive");
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message message;
int numberOfShutdownsReceived = 0;
int numberOfMessagesReceived = 0;
while (numberOfShutdownsReceived < 2)
{
message = consumer.receive(getReceiveTimeout());
assertNotNull("null received after "
+ numberOfMessagesReceived
+ " messages and "
+ numberOfShutdownsReceived
+ " shutdowns", message);
if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN))
{
numberOfShutdownsReceived++;
}
else
{
numberOfMessagesReceived++;
putMessageInMap(message, messageSequenceNumbersByKey);
}
}
LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total");
}
finally
{
consumerConnection.close();
}
return messageSequenceNumbersByKey;
}
@Test
public void testExceptionListenerClosesConnectionIsAllowed() throws Exception
{
assumeThat(getBrokerAdmin().supportsRestart(), is(equalTo(true)));
final Connection connection = getConnection();
try
{
final CountDownLatch exceptionReceivedLatch = new CountDownLatch(1);
final AtomicReference<JMSException> exceptionHolder = new AtomicReference<>();
final AtomicReference<Throwable> unexpectedExceptionHolder = new AtomicReference<>();
final ExceptionListener listener = exception -> {
exceptionHolder.set(exception);
try
{
connection.close();
// PASS
}
catch (Throwable t)
{
unexpectedExceptionHolder.set(t);
}
finally
{
exceptionReceivedLatch.countDown();
}
};
connection.setExceptionListener(listener);
getBrokerAdmin().restart();
assertTrue("Exception was not propagated into exception listener in timely manner",
exceptionReceivedLatch.await(getReceiveTimeout(), TimeUnit.MILLISECONDS));
assertNotNull("Unexpected exception", exceptionHolder.get());
assertNull("Connection#close() should not have thrown exception", unexpectedExceptionHolder.get());
}
finally
{
connection.close();
}
}
@Test
public void testAMQ1925_TXBegin() throws Exception {
Connection connection = cf.createConnection();
connection.start();
connection.setExceptionListener(this);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
boolean restartDone = false;
try {
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
if (i == 222 && !restartDone) {
// Simulate broker failure & restart
bs.stop();
bs = createNewServer();
bs.start();
restartDone = true;
}
Assert.assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
try {
session.commit();
} catch (TransactionRolledBackException expectedOnOccasion) {
log.info("got rollback: " + expectedOnOccasion);
i--;
}
}
Assert.assertNull(consumer.receive(500));
} catch (Exception eee) {
log.error("got exception", eee);
throw eee;
} finally {
consumer.close();
session.close();
connection.close();
}
assertQueueEmpty();
Assert.assertNull("no exception on connection listener: " + exception, exception);
}
public void run(String... args) throws Exception {
String[] split = args[1].split("@");
String host = args[0];
String vpnName = split[1];
String username = split[0];
String password = args[2];
System.out.printf("QueueConsumerJNDI is connecting to Solace messaging at %s...%n", host);
// setup environment variables for creating of the initial context
Hashtable<String, Object> env = new Hashtable<String, Object>();
// use the Solace JNDI initial context factory
env.put(InitialContext.INITIAL_CONTEXT_FACTORY, "com.solacesystems.jndi.SolJNDIInitialContextFactory");
// assign Solace message router connection parameters
env.put(InitialContext.PROVIDER_URL, host);
env.put(Context.SECURITY_PRINCIPAL, username + '@' + vpnName); // Formatted as [email protected]
env.put(Context.SECURITY_CREDENTIALS, password);
// Create the initial context that will be used to lookup the JMS Administered Objects.
InitialContext initialContext = new InitialContext(env);
// Lookup the connection factory
ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup(CONNECTION_FACTORY_JNDI_NAME);
// Create connection to the Solace router
Connection connection = connectionFactory.createConnection();
// Create a non-transacted, client ACK session.
Session session = connection.createSession(false, SupportedProperty.SOL_CLIENT_ACKNOWLEDGE);
System.out.printf("Connected to the Solace Message VPN '%s' with client username '%s'.%n", vpnName,
username);
// Lookup the queue.
Queue queue = (Queue) initialContext.lookup(QUEUE_JNDI_NAME);
// From the session, create a consumer for the destination.
MessageConsumer messageConsumer = session.createConsumer(queue);
// Use the anonymous inner class for receiving messages asynchronously
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
System.out.printf("TextMessage received: '%s'%n", ((TextMessage) message).getText());
} else {
System.out.println("Message received.");
}
System.out.printf("Message Content:%n%s%n", SolJmsUtility.dumpMessage(message));
// ACK the received message manually because of the set SupportedProperty.SOL_CLIENT_ACKNOWLEDGE above
message.acknowledge();
latch.countDown(); // unblock the main thread
} catch (JMSException ex) {
System.out.println("Error processing incoming message.");
ex.printStackTrace();
}
}
});
// Start receiving messages
connection.start();
System.out.println("Awaiting message...");
// the main thread blocks at the next statement until a message received
latch.await();
connection.stop();
// Close everything in the order reversed from the opening order
// NOTE: as the interfaces below extend AutoCloseable,
// with them it's possible to use the "try-with-resources" Java statement
// see details at https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html
messageConsumer.close();
session.close();
connection.close();
// The initial context needs to be close; it does not extend AutoCloseable
initialContext.close();
}
public static void main(final String[] args) throws Exception {
Connection connection = null;
try {
// Step 1. Directly instantiate the JMS Queue object.
Queue queue = ActiveMQJMSClient.createQueue("exampleQueue");
// Starting with Artemis 1.0.1 you can just use the URI to instantiate the object directly
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
// Step 4.Create a JMS Connection
connection = cf.createConnection();
// Step 5. Create a JMS Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 6. Create a JMS Message Producer
MessageProducer producer = session.createProducer(queue);
// Step 7. Create a Text Message
TextMessage message = session.createTextMessage("This is a text message");
System.out.println("Sent message: " + message.getText());
// Step 8. Send the Message
producer.send(message);
// Step 9. Create a JMS Message Consumer
MessageConsumer messageConsumer = session.createConsumer(queue);
// Step 10. Start the Connection
connection.start();
// Step 11. Receive the message
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
System.out.println("Received message: " + messageReceived.getText());
} finally {
if (connection != null) {
connection.close();
}
}
}
@Test(timeout = 60000)
@BMRules(
rules = {@BMRule(
name = "Corrupt Decoding",
targetClass = "org.apache.activemq.artemis.core.protocol.ClientPacketDecoder",
targetMethod = "decode(org.apache.activemq.artemis.api.core.ActiveMQBuffer)",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.DisconnectOnCriticalFailureTest.doThrow($1);")})
public void testClientDisconnectLarge() throws Exception {
Queue q1 = createQueue("queue1");
final Connection connection = nettyCf.createConnection();
final CountDownLatch latch = new CountDownLatch(1);
ServerLocator locator = ((ActiveMQConnectionFactory)nettyCf).getServerLocator();
int minSize = locator.getMinLargeMessageSize();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < minSize; i++) {
builder.append("a");
}
try {
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException e) {
latch.countDown();
}
});
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(q1);
TextMessage m = session.createTextMessage(builder.toString());
producer.send(m);
connection.start();
corruptPacket.set(true);
MessageConsumer consumer = session.createConsumer(q1);
Message lm = consumer.receive(2000);
//first receive won't crash because the packet
//is SESS_RECEIVE_LARGE_MSG
assertNotNull(lm);
//second receive will force server to send a
//"forced delivery" message, and will cause
//the exception to be thrown.
lm = consumer.receive(5000);
assertNull(lm);
assertTrue(latch.await(5, TimeUnit.SECONDS));
} finally {
corruptPacket.set(false);
if (connection != null) {
connection.close();
}
}
}
/**
* 1. Consumer from testtenant1 listens to messages from "testtenant1.com/tenant1queue" destination.
* 2. Publish messages to "testtenant1.com/www" by a tenant user from testtenant2.
* 3. No messages should be received by the consumer.
*
* @throws javax.jms.JMSException
* @throws javax.naming.NamingException
*/
@Test(groups = "wso2.mb", description = "Inter tenant queue publish test case")
public void performSingleQueueSendReceiveTestCase()
throws NamingException, JMSException, XPathExpressionException {
String queueName = "testtenant1.com/tenant1queue";
InitialContext subscriberInitialContext = JMSClientHelper
.getInitialContextForQueue("tenant1user1!testtenant1.com", "tenant1user1",
"localhost", getAMQPPort().toString(), queueName);
InitialContext publisherInitialContext = JMSClientHelper
.getInitialContextForQueue("tenant2user1!testtenant2.com", "tenant2user1",
"localhost", getAMQPPort().toString(), queueName);
// Initialize subscriber
ConnectionFactory subscriberConnectionFactory = (ConnectionFactory) subscriberInitialContext.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection subscriberConnection = subscriberConnectionFactory.createConnection();
subscriberConnection.start();
Session subscriberSession = subscriberConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination subscriberDestination = (Destination) subscriberInitialContext.lookup(queueName);
MessageConsumer consumer = subscriberSession.createConsumer(subscriberDestination);
// Initialize publisher
ConnectionFactory publisherConnectionFactory = (ConnectionFactory) publisherInitialContext.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection publisherConnection = publisherConnectionFactory.createConnection();
publisherConnection.start();
Session publisherSession = publisherConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination publisherDestination = (Destination) publisherInitialContext.lookup(queueName);
MessageProducer producer = publisherSession.createProducer(publisherDestination);
producer.send(publisherSession.createTextMessage("Test"));
// Assuming latency is less than 5 seconds
Message message = consumer.receive(5000);
assertNull(message, "Publisher was able to publish from a different domain");
// Close all connections
subscriberConnection.close();
// publisher session will be closed by the server since it didn't had permissions
}
@Test
public void testCloseTwice() throws Exception {
Connection conn = createConnection();
conn.close();
conn.close();
}
private static void stopStartAcceptor(final boolean stop) throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61617");
Connection connection = null;
try {
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue managementQueue = ActiveMQJMSClient.createQueue("activemq.management");
MessageProducer producer = session.createProducer(managementQueue);
connection.start();
Message m = session.createMessage();
String oper = stop ? "stop" : "start";
JMSManagementHelper.putOperationInvocation(m, "core.acceptor.netty-acceptor", oper);
producer.send(m);
} finally {
if (connection != null) {
connection.close();
}
}
}