下面列出了javax.jms.QueueSession#createReceiver ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testGetQueue() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
QueueReceiver receiver = session.createReceiver(queue);
assertNotNull(receiver.getQueue());
assertSame(queue, receiver.getQueue());
receiver.close();
try {
receiver.getQueue();
fail("Cannot read topic on closed receiver");
} catch (IllegalStateException ise) {}
}
@Test
public void testGetTopicSubscriber() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
JmsPoolQueueReceiver receiver = (JmsPoolQueueReceiver) session.createReceiver(queue);
assertNotNull(receiver.getQueueReceiver());
assertTrue(receiver.getQueueReceiver() instanceof MockJMSQueueReceiver);
receiver.close();
try {
receiver.getQueueReceiver();
fail("Cannot read state on closed receiver");
} catch (IllegalStateException ise) {}
}
/**
* Creates a AMQP connection with the number of channels specified, registered on top of it.
*
* @param numberOfChannels number of channels to be created using the connection
* @param userName admin user
* @param password admin password
* @param hostName localhost
* @param port the AMQP port for which the broker listens to
* @return the created JMS connection
* @throws NamingException if an error occurs while creating the context/connection factory using given properties.
* @throws JMSException if an error occurs while creating/starting the connection/session
*/
private Connection createConnection(int numberOfChannels, String userName, String password, String hostName,
String port) throws NamingException, JMSException {
InitialContext initialContext
= ClientHelper.getInitialContextBuilder(userName, password, hostName, port).build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
for (int i = 0; i < numberOfChannels; i++) {
QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
/*
For each channel, create a number of consumers that is equal to the channel number.
e.g. if the channel count is 3, channel1 has 1 consumer, channel2 has 2 consumers and channel3 has 3
consumers
*/
for (int j = 0; j < i; j++) {
Queue queue = session.createQueue("queue");
session.createReceiver(queue);
}
}
return connection;
}
/**
* Creates a AMQP connection with the number of channels specified, registered on top of it.
*
* @param numberOfChannels number of channels to be created using the connection
* @param userName admin user
* @param password admin password
* @param hostName localhost
* @param port the AMQP port for which the broker listens to
* @return the created JMS connection
* @throws NamingException if an error occurs while creating the context/connection factory using given properties.
* @throws JMSException if an error occurs while creating/starting the connection/session
*/
private Connection createConnection(int numberOfChannels, String userName, String password, String hostName,
String port) throws NamingException, JMSException {
InitialContext initialContext
= ClientHelper.getInitialContextBuilder(userName, password, hostName, port).build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
for (int i = 0; i < numberOfChannels; i++) {
QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
/*
For each channel, create a number of consumers that is equal to the channel number.
e.g. if the channel count is 3, channel1 has 1 consumer, channel2 has 2 consumers and channel3 has 3
consumers
*/
for (int j = 0; j < i; j++) {
Queue queue = session.createQueue("queue");
session.createReceiver(queue);
}
}
return connection;
}
@Test
public void createReceiver() throws Exception
{
Queue queue = createQueue(getTestName());
QueueConnection queueConnection = getQueueConnection();
try
{
queueConnection.start();
Utils.sendMessages(queueConnection, queue, 3);
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver receiver = session.createReceiver(queue, String.format("%s=2", INDEX));
assertEquals("Queue names should match from QueueReceiver", queue.getQueueName(), receiver.getQueue().getQueueName());
Message received = receiver.receive(getReceiveTimeout());
assertNotNull("Message is not received", received);
assertEquals("Unexpected message is received", 2, received.getIntProperty(INDEX));
}
finally
{
queueConnection.close();
}
}
/**
* Create a queue receiver
*
* @param queue The queue
* @return The queue receiver
* @throws JMSException Thrown if an error occurs
*/
@Override
public QueueReceiver createReceiver(final Queue queue) throws JMSException {
lock();
try {
QueueSession session = getQueueSessionInternal();
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createReceiver " + session + " queue=" + queue);
}
QueueReceiver result = session.createReceiver(queue);
result = new ActiveMQRAQueueReceiver(result, this);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createdReceiver " + session + " receiver=" + result);
}
addConsumer(result);
return result;
} finally {
unlock();
}
}
/**
* Create a queue receiver
*
* @param queue The queue
* @param messageSelector
* @return The queue receiver
* @throws JMSException Thrown if an error occurs
*/
@Override
public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException {
lock();
try {
QueueSession session = getQueueSessionInternal();
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
}
QueueReceiver result = session.createReceiver(queue, messageSelector);
result = new ActiveMQRAQueueReceiver(result, this);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createdReceiver " + session + " receiver=" + result);
}
addConsumer(result);
return result;
} finally {
unlock();
}
}
@Test
public void testToString() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
QueueReceiver receiver = session.createReceiver(queue);
assertNotNull(receiver.toString());
}
/**
* Creates a queue connection, session and receiver.
*
* @throws NamingException
* @throws JMSException
*/
private void createQueueConnection() throws NamingException, JMSException {
// Creates a queue connection, sessions and receiver
QueueConnectionFactory connFactory = (QueueConnectionFactory) super.getInitialContext()
.lookup(AndesClientConstants.CF_NAME);
QueueConnection queueConnection = connFactory.createQueueConnection();
queueConnection.start();
QueueSession queueSession;
// Sets acknowledgement mode
if (QueueSession.SESSION_TRANSACTED == this.consumerConfig.getAcknowledgeMode().getType()) {
queueSession = queueConnection
.createQueueSession(true, this.consumerConfig.getAcknowledgeMode().getType());
} else {
queueSession = queueConnection
.createQueueSession(false, this.consumerConfig.getAcknowledgeMode().getType());
}
Queue queue =
(Queue) super.getInitialContext().lookup(this.consumerConfig.getDestinationName());
connection = queueConnection;
session = queueSession;
// If selectors exists
if (null != this.consumerConfig.getSelectors()) {
receiver = queueSession.createReceiver(queue, this.consumerConfig.getSelectors());
} else {
receiver = queueSession.createReceiver(queue);
}
}
@Test
public void testTempQueueDelete() throws Exception {
connection.start();
QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = queueSession.createTemporaryQueue();
ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection();
try {
QueueSession newQueueSession = newConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender queueSender = newQueueSession.createSender(tempQueue);
Message msg = queueSession.createMessage();
queueSender.send(msg);
try {
QueueReceiver consumer = newQueueSession.createReceiver(tempQueue);
fail("should have gotten exception but got consumer: " + consumer);
} catch (JMSException ex) {
//correct
}
connection.close();
try {
Message newMsg = newQueueSession.createMessage();
queueSender.send(newMsg);
} catch (JMSException e) {
//ok
}
} finally {
newConn.close();
}
}
public void testTryToReproduceNullPointerBug() throws Exception {
String url = connectionUri;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
QueueConnection queueConnection = factory.createQueueConnection();
this.connection = queueConnection;
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
session.createSender(null); // Unidentified
Queue receiverQueue = session.createTemporaryQueue();
session.createReceiver(receiverQueue);
queueConnection.start();
}
/**
* com.sun.ts.tests.jms.ee.all.queueconn.QueueConnTest line 171
*/
@Test
public void testCreateReceiverWithMessageSelector() throws Exception {
QueueConnection qc = null;
try {
qc = createQueueConnection();
QueueSession qs = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver qreceiver = qs.createReceiver(queue1, "targetMessage = TRUE");
qc.start();
TextMessage m = qs.createTextMessage();
m.setText("one");
m.setBooleanProperty("targetMessage", false);
QueueSender qsender = qs.createSender(queue1);
qsender.send(m);
m.setText("two");
m.setBooleanProperty("targetMessage", true);
qsender.send(m);
TextMessage rm = (TextMessage) qreceiver.receive(1000);
ProxyAssertSupport.assertEquals("two", rm.getText());
} finally {
if (qc != null) {
qc.close();
}
Thread.sleep(2000);
removeAllMessages(queue1.getQueueName(), true);
checkEmpty(queue1);
}
}
protected void createTestResources() throws Exception {
connection = createQueueConnectionToMockProvider();
QueueSession session = ((QueueConnection) connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(_testName.getMethodName());
receiver = session.createReceiver(destination);
receiver.close();
}
@Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"})
@Test
public void testSpecificQueueRetrieval(String username, String password, String hostname, String port)
throws JMSException, NamingException, IOException {
String queueName = "testSpecificQueueRetrieval";
// Create a durable queue using a JMS client
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(username, password, hostname, port)
.withQueue(queueName)
.build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
QueueSession queueSession = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver receiver = queueSession.createReceiver(queue);
// Test queue retrieval through REST API
HttpGet httpGet = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH + "/" + queueName);
ClientHelper.setAuthHeader(httpGet, username, password);
CloseableHttpResponse response = client.execute(httpGet);
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_OK, "Incorrect status code.");
String body = EntityUtils.toString(response.getEntity());
QueueMetadata queueMetadata = objectMapper.readValue(body, QueueMetadata.class);
Assert.assertEquals(queueMetadata.getName(), queueName, "Incorrect queue name.");
Assert.assertEquals(queueMetadata.getConsumerCount().intValue(), 1, "JMS consumer should be present.");
Assert.assertTrue(queueMetadata.isDurable());
Assert.assertEquals(queueMetadata.getSize().intValue(), 0, "Queue should be empty.");
Assert.assertFalse(queueMetadata.isAutoDelete());
receiver.close();
queueSession.close();
connection.close();
}
@Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"})
@Test
public void testRetrieveConsumerList(String username, String password,
String hostname, String port) throws Exception {
String queueName = "testSpecificQueueRetrieval";
// Create a durable queue using a JMS client
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(username, password, hostname, port)
.withQueue(queueName)
.build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
QueueSession queueSession = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver receiver1 = queueSession.createReceiver(queue);
QueueReceiver receiver2 = queueSession.createReceiver(queue);
HttpGet httpGet = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH
+ "/" + queueName + "/consumers");
ClientHelper.setAuthHeader(httpGet, username, password);
CloseableHttpResponse response = client.execute(httpGet);
String body = EntityUtils.toString(response.getEntity());
ConsumerMetadata[] consumers = objectMapper.readValue(body, ConsumerMetadata[].class);
for (ConsumerMetadata consumerMetadata : consumers) {
validateTransportPropertyExistence(consumerMetadata);
}
Assert.assertEquals(consumers.length, 2, "Number of consumers returned is incorrect.");
receiver1.close();
receiver2.close();
queueSession.close();
connection.close();
}
@Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"})
@Test
public void testSpecificConsumerRetrieval(String username, String password,
String hostname, String port) throws Exception {
String queueName = "testSpecificConsumerRetrieval";
// Create a durable queue using a JMS client
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(username, password, hostname, port)
.withQueue(queueName)
.build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
QueueSession queueSession = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver receiver = queueSession.createReceiver(queue);
HttpGet getAllConsumers = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH
+ "/" + queueName + "/consumers");
ClientHelper.setAuthHeader(getAllConsumers, username, password);
CloseableHttpResponse response = client.execute(getAllConsumers);
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_OK);
String body = EntityUtils.toString(response.getEntity());
ConsumerMetadata[] consumers = objectMapper.readValue(body, ConsumerMetadata[].class);
Assert.assertTrue(consumers.length > 0, "Number of consumers returned is incorrect.");
int id = consumers[0].getId();
validateTransportPropertyExistence(consumers[0]);
HttpGet getConsumer = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH + "/"
+ queueName + "/consumers/" + id);
ClientHelper.setAuthHeader(getConsumer, username, password);
response = client.execute(getConsumer);
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_OK);
String consumerString = EntityUtils.toString(response.getEntity());
ConsumerMetadata consumerMetadata = objectMapper.readValue(consumerString, ConsumerMetadata.class);
Assert.assertEquals(consumerMetadata.getId().intValue(), id, "incorrect message id");
receiver.close();
queueSession.close();
connection.close();
}
@Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"})
@Test
public void testNonExistingConsumer(String username, String password,
String hostname, String port) throws Exception {
String queueName = "testNonExistingConsumer";
// Create a durable queue using a JMS client
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(username, password, hostname, port)
.withQueue(queueName)
.build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
QueueSession queueSession = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver receiver1 = queueSession.createReceiver(queue);
HttpGet getAllConsumers = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH
+ "/" + queueName + "/consumers");
ClientHelper.setAuthHeader(getAllConsumers, username, password);
CloseableHttpResponse response = client.execute(getAllConsumers);
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_OK,
"Incorrect status code");
String consumerArray = EntityUtils.toString(response.getEntity());
ConsumerMetadata[] consumers = objectMapper.readValue(consumerArray, ConsumerMetadata[].class);
Assert.assertEquals(consumers.length, 1, "There should be a single consumer");
int id = consumers[0].getId();
receiver1.close();
HttpGet getConsumer = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH
+ "/" + queueName + "/consumers/" + String.valueOf(id));
ClientHelper.setAuthHeader(getConsumer, username, password);
response = client.execute(getConsumer);
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_NOT_FOUND);
String errorMessage = EntityUtils.toString(response.getEntity());
Error error = objectMapper.readValue(errorMessage, Error.class);
Assert.assertFalse(error.getMessage().isEmpty(), "Error message should be non empty.");
queueSession.close();
connection.close();
}
@Test
public void testSendEmptyMessages() throws Exception {
Queue dest = new ActiveMQQueue(queueName);
QueueSession defaultQueueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender defaultSender = defaultQueueSession.createSender(dest);
defaultSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
connection.start();
Message msg = defaultQueueSession.createMessage();
msg.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(msg);
QueueReceiver queueReceiver = defaultQueueSession.createReceiver(dest);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//bytes
BytesMessage bytesMessage = defaultQueueSession.createBytesMessage();
bytesMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(bytesMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//map
MapMessage mapMessage = defaultQueueSession.createMapMessage();
mapMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(mapMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//object
ObjectMessage objMessage = defaultQueueSession.createObjectMessage();
objMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(objMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//stream
StreamMessage streamMessage = defaultQueueSession.createStreamMessage();
streamMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(streamMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//text
TextMessage textMessage = defaultQueueSession.createTextMessage();
textMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(textMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
}
/**
* Gets a queueReceiver value
* @see QueueReceiver
*/
private QueueReceiver getQueueReceiver(QueueSession session, Queue destination, String selector) throws NamingException, JMSException {
QueueReceiver queueReceiver = session.createReceiver(destination, selector);
return queueReceiver;
}