下面列出了javax.jms.QueueSession#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void sendBytesMessage(String destName, byte[] buffer) throws Exception {
InitialContext ic = getInitialContext();
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ic.lookup("ConnectionFactory");
QueueConnection connection = queueConnectionFactory.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
BytesMessage bm = session.createBytesMessage();
bm.writeBytes(buffer);
QueueSender sender = session.createSender((Queue) ic.lookup(destName));
sender.send(bm);
sender.close();
session.close();
connection.close();
}
@Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"})
@Test
public void testSpecificQueueRetrieval(String username, String password, String hostname, String port)
throws JMSException, NamingException, IOException {
String queueName = "testSpecificQueueRetrieval";
// Create a durable queue using a JMS client
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(username, password, hostname, port)
.withQueue(queueName)
.build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
QueueSession queueSession = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver receiver = queueSession.createReceiver(queue);
// Test queue retrieval through REST API
HttpGet httpGet = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH + "/" + queueName);
ClientHelper.setAuthHeader(httpGet, username, password);
CloseableHttpResponse response = client.execute(httpGet);
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_OK, "Incorrect status code.");
String body = EntityUtils.toString(response.getEntity());
QueueMetadata queueMetadata = objectMapper.readValue(body, QueueMetadata.class);
Assert.assertEquals(queueMetadata.getName(), queueName, "Incorrect queue name.");
Assert.assertEquals(queueMetadata.getConsumerCount().intValue(), 1, "JMS consumer should be present.");
Assert.assertTrue(queueMetadata.isDurable());
Assert.assertEquals(queueMetadata.getSize().intValue(), 0, "Queue should be empty.");
Assert.assertFalse(queueMetadata.isAutoDelete());
receiver.close();
queueSession.close();
connection.close();
}
@Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"})
@Test
public void testRetrieveConsumerList(String username, String password,
String hostname, String port) throws Exception {
String queueName = "testSpecificQueueRetrieval";
// Create a durable queue using a JMS client
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(username, password, hostname, port)
.withQueue(queueName)
.build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
QueueSession queueSession = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver receiver1 = queueSession.createReceiver(queue);
QueueReceiver receiver2 = queueSession.createReceiver(queue);
HttpGet httpGet = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH
+ "/" + queueName + "/consumers");
ClientHelper.setAuthHeader(httpGet, username, password);
CloseableHttpResponse response = client.execute(httpGet);
String body = EntityUtils.toString(response.getEntity());
ConsumerMetadata[] consumers = objectMapper.readValue(body, ConsumerMetadata[].class);
for (ConsumerMetadata consumerMetadata : consumers) {
validateTransportPropertyExistence(consumerMetadata);
}
Assert.assertEquals(consumers.length, 2, "Number of consumers returned is incorrect.");
receiver1.close();
receiver2.close();
queueSession.close();
connection.close();
}
@Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"})
@Test
public void testSpecificConsumerRetrieval(String username, String password,
String hostname, String port) throws Exception {
String queueName = "testSpecificConsumerRetrieval";
// Create a durable queue using a JMS client
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(username, password, hostname, port)
.withQueue(queueName)
.build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
QueueSession queueSession = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver receiver = queueSession.createReceiver(queue);
HttpGet getAllConsumers = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH
+ "/" + queueName + "/consumers");
ClientHelper.setAuthHeader(getAllConsumers, username, password);
CloseableHttpResponse response = client.execute(getAllConsumers);
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_OK);
String body = EntityUtils.toString(response.getEntity());
ConsumerMetadata[] consumers = objectMapper.readValue(body, ConsumerMetadata[].class);
Assert.assertTrue(consumers.length > 0, "Number of consumers returned is incorrect.");
int id = consumers[0].getId();
validateTransportPropertyExistence(consumers[0]);
HttpGet getConsumer = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH + "/"
+ queueName + "/consumers/" + id);
ClientHelper.setAuthHeader(getConsumer, username, password);
response = client.execute(getConsumer);
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_OK);
String consumerString = EntityUtils.toString(response.getEntity());
ConsumerMetadata consumerMetadata = objectMapper.readValue(consumerString, ConsumerMetadata.class);
Assert.assertEquals(consumerMetadata.getId().intValue(), id, "incorrect message id");
receiver.close();
queueSession.close();
connection.close();
}
@Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"})
@Test
public void testNonExistingConsumer(String username, String password,
String hostname, String port) throws Exception {
String queueName = "testNonExistingConsumer";
// Create a durable queue using a JMS client
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(username, password, hostname, port)
.withQueue(queueName)
.build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
QueueSession queueSession = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver receiver1 = queueSession.createReceiver(queue);
HttpGet getAllConsumers = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH
+ "/" + queueName + "/consumers");
ClientHelper.setAuthHeader(getAllConsumers, username, password);
CloseableHttpResponse response = client.execute(getAllConsumers);
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_OK,
"Incorrect status code");
String consumerArray = EntityUtils.toString(response.getEntity());
ConsumerMetadata[] consumers = objectMapper.readValue(consumerArray, ConsumerMetadata[].class);
Assert.assertEquals(consumers.length, 1, "There should be a single consumer");
int id = consumers[0].getId();
receiver1.close();
HttpGet getConsumer = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH
+ "/" + queueName + "/consumers/" + String.valueOf(id));
ClientHelper.setAuthHeader(getConsumer, username, password);
response = client.execute(getConsumer);
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_NOT_FOUND);
String errorMessage = EntityUtils.toString(response.getEntity());
Error error = objectMapper.readValue(errorMessage, Error.class);
Assert.assertFalse(error.getMessage().isEmpty(), "Error message should be non empty.");
queueSession.close();
connection.close();
}
public void stopClientSync(){
if (null != connection && null != session && null != receiver) {
try {
log.info("Closing Consumer");
if (ExchangeType.TOPIC == consumerConfig.getExchangeType()) {
if (null != receiver) {
TopicSubscriber topicSubscriber = (TopicSubscriber) receiver;
topicSubscriber.close();
}
if (null != session) {
TopicSession topicSession = (TopicSession) session;
topicSession.close();
}
if (null != connection) {
TopicConnection topicConnection = (TopicConnection) connection;
topicConnection.close();
}
} else if (ExchangeType.QUEUE == consumerConfig.getExchangeType()) {
if (null != receiver) {
QueueReceiver queueReceiver = (QueueReceiver) receiver;
queueReceiver.close();
}
if (null != session) {
QueueSession queueSession = (QueueSession) session;
queueSession.close();
}
if (null != connection) {
QueueConnection queueConnection = (QueueConnection) connection;
queueConnection.stop();
queueConnection.close();
}
}
receiver = null;
session = null;
connection = null;
log.info("Consumer Closed");
} catch (JMSException e) {
log.error("Error in stopping client.", e);
throw new RuntimeException("Error in stopping client.", e);
}
}
}
@Test
public void testTXTypeInvalid() throws Exception {
conn = cf.createConnection();
Session sess = conn.createSession(false, Session.SESSION_TRANSACTED);
assertEquals(Session.AUTO_ACKNOWLEDGE, sess.getAcknowledgeMode());
sess.close();
TopicSession tpSess = ((TopicConnection) conn).createTopicSession(false, Session.SESSION_TRANSACTED);
assertEquals(Session.AUTO_ACKNOWLEDGE, tpSess.getAcknowledgeMode());
tpSess.close();
QueueSession qSess = ((QueueConnection) conn).createQueueSession(false, Session.SESSION_TRANSACTED);
assertEquals(Session.AUTO_ACKNOWLEDGE, qSess.getAcknowledgeMode());
qSess.close();
}