下面列出了javax.jms.QueueConnection#createQueueSession ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test(timeout = 60000)
public void testSenderAndPublisherDest() throws Exception {
JmsPoolXAConnectionFactory pcf = createXAPooledConnectionFactory();
QueueConnection connection = pcf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender sender = session.createSender(session.createQueue("AA"));
assertNotNull(sender.getQueue().getQueueName());
connection.close();
TopicConnection topicConnection = pcf.createTopicConnection();
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic("AA"));
assertNotNull(topicPublisher.getTopic().getTopicName());
topicConnection.close();
pcf.stop();
}
/**
* Send a message to testInboundQueue queue
*
* @throws Exception
*/
private void sendMessage() throws Exception {
InitialContext initialContext = JmsClientHelper.getActiveMqInitialContext();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContext.lookup(JmsClientHelper.QUEUE_CONNECTION_FACTORY);
QueueConnection queueConnection = connectionFactory.createQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
QueueSender sender = queueSession.createSender(queueSession.createQueue(QUEUE_NAME));
String message = "<?xml version='1.0' encoding='UTF-8'?>" +
" <ser:getQuote xmlns:ser=\"http://services.samples\" xmlns:xsd=\"http://services.samples/xsd\"> " +
" <ser:request>" +
" <xsd:symbol>IBM</xsd:symbol>" +
" </ser:request>" +
" </ser:getQuote>";
try {
TextMessage jmsMessage = queueSession.createTextMessage(message);
jmsMessage.setJMSType("incorrecttype");
sender.send(jmsMessage);
} finally {
queueConnection.close();
}
}
@Test
public void createReceiver() throws Exception
{
Queue queue = createQueue(getTestName());
QueueConnection queueConnection = getQueueConnection();
try
{
queueConnection.start();
Utils.sendMessages(queueConnection, queue, 3);
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver receiver = session.createReceiver(queue, String.format("%s=2", INDEX));
assertEquals("Queue names should match from QueueReceiver", queue.getQueueName(), receiver.getQueue().getQueueName());
Message received = receiver.receive(getReceiveTimeout());
assertNotNull("Message is not received", received);
assertEquals("Unexpected message is received", 2, received.getIntProperty(INDEX));
}
finally
{
queueConnection.close();
}
}
@Test
public void testQueueSessionCannotCreateDurableSubscriber() throws Exception
{
Topic topic = createTopic(getTestName());
QueueConnection queueConnection = getQueueConnection();
try
{
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
queueSession.createDurableSubscriber(topic, "abc");
fail("expected exception did not occur");
}
catch (javax.jms.IllegalStateException s)
{
// PASS
}
}
finally
{
queueConnection.close();
}
}
@Test
public void testCreateQueueSender() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
QueueConnection connection = factory.createQueueConnection();
assertNotNull(connection);
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(name.getMethodName());
QueueSender sender = session.createSender(queue);
assertNotNull(sender);
QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
assertEquals(0, proxy.getQueueSize());
connection.close();
}
@Test
public void anonymousSenderSendToUnknownQueue() throws Exception
{
QueueConnection connection = ((QueueConnection) getConnectionBuilder().setSyncPublish(true).build());
try
{
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue invalidDestination = session.createQueue("unknown");
try
{
QueueSender sender = session.createSender(null);
sender.send(invalidDestination, session.createMessage());
fail("Exception not thrown");
}
catch (InvalidDestinationException e)
{
//PASS
}
}
finally
{
connection.close();
}
}
@Test
public void testConnectionCredentialsFail() throws Exception {
resourceAdapter = newResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
resourceAdapter.start(ctx);
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
mcf.setResourceAdapter(resourceAdapter);
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
ManagedConnection mc = ((ActiveMQRASession) session).getManagedConnection();
queueConnection.close();
mc.destroy();
try {
queueConnection = qraConnectionFactory.createQueueConnection("testuser", "testwrongpassword");
queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).close();
fail("should throw esxception");
} catch (JMSException e) {
//pass
}
}
private void sendBytesMessage(String destName, byte[] buffer) throws Exception {
InitialContext ic = getInitialContext();
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ic.lookup("ConnectionFactory");
QueueConnection connection = queueConnectionFactory.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
BytesMessage bm = session.createBytesMessage();
bm.writeBytes(buffer);
QueueSender sender = session.createSender((Queue) ic.lookup(destName));
sender.send(bm);
sender.close();
session.close();
connection.close();
}
public void testTryToReproduceNullPointerBug() throws Exception {
String url = connectionUri;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
QueueConnection queueConnection = factory.createQueueConnection();
this.connection = queueConnection;
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
session.createSender(null); // Unidentified
Queue receiverQueue = session.createTemporaryQueue();
session.createReceiver(receiverQueue);
queueConnection.start();
}
@Test
public void testCreateTopicOnAQueueSession() throws Exception {
QueueConnection c = (QueueConnection) getConnectionFactory().createConnection();
QueueSession s = c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
try {
s.createTopic("TestTopic");
ProxyAssertSupport.fail("should throw IllegalStateException");
} catch (javax.jms.IllegalStateException e) {
// OK
}
c.close();
}
/**
* Creates a queue connection, session and receiver.
*
* @throws NamingException
* @throws JMSException
*/
private void createQueueConnection() throws NamingException, JMSException {
// Creates a queue connection, sessions and receiver
QueueConnectionFactory connFactory = (QueueConnectionFactory) super.getInitialContext()
.lookup(AndesClientConstants.CF_NAME);
QueueConnection queueConnection = connFactory.createQueueConnection();
queueConnection.start();
QueueSession queueSession;
// Sets acknowledgement mode
if (QueueSession.SESSION_TRANSACTED == this.consumerConfig.getAcknowledgeMode().getType()) {
queueSession = queueConnection
.createQueueSession(true, this.consumerConfig.getAcknowledgeMode().getType());
} else {
queueSession = queueConnection
.createQueueSession(false, this.consumerConfig.getAcknowledgeMode().getType());
}
Queue queue =
(Queue) super.getInitialContext().lookup(this.consumerConfig.getDestinationName());
connection = queueConnection;
session = queueSession;
// If selectors exists
if (null != this.consumerConfig.getSelectors()) {
receiver = queueSession.createReceiver(queue, this.consumerConfig.getSelectors());
} else {
receiver = queueSession.createReceiver(queue);
}
}
@Test
public void testConnectionCredentialsOKRecovery() throws Exception {
resourceAdapter = newResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
resourceAdapter.start(ctx);
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
mcf.setResourceAdapter(resourceAdapter);
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(mcf.getResourceRecovery());
}
/**
* Test creation of QueueSession
*/
@Test
public void testQueueConnection1() throws Exception {
QueueConnection qc = queueCf.createQueueConnection();
qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
qc.close();
}
@Test
public void sendToUnknownQueue() throws Exception
{
QueueConnection connection = ((QueueConnection) getConnectionBuilder().build());
try
{
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue invalidDestination = session.createQueue("unknown");
try
{
QueueSender sender = session.createSender(invalidDestination);
sender.send(session.createMessage());
fail("Exception not thrown");
}
catch (InvalidDestinationException e)
{
//PASS
}
}
finally
{
connection.close();
}
}
@Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"})
@Test
public void testSpecificQueueRetrieval(String username, String password, String hostname, String port)
throws JMSException, NamingException, IOException {
String queueName = "testSpecificQueueRetrieval";
// Create a durable queue using a JMS client
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(username, password, hostname, port)
.withQueue(queueName)
.build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
QueueSession queueSession = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver receiver = queueSession.createReceiver(queue);
// Test queue retrieval through REST API
HttpGet httpGet = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH + "/" + queueName);
ClientHelper.setAuthHeader(httpGet, username, password);
CloseableHttpResponse response = client.execute(httpGet);
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_OK, "Incorrect status code.");
String body = EntityUtils.toString(response.getEntity());
QueueMetadata queueMetadata = objectMapper.readValue(body, QueueMetadata.class);
Assert.assertEquals(queueMetadata.getName(), queueName, "Incorrect queue name.");
Assert.assertEquals(queueMetadata.getConsumerCount().intValue(), 1, "JMS consumer should be present.");
Assert.assertTrue(queueMetadata.isDurable());
Assert.assertEquals(queueMetadata.getSize().intValue(), 0, "Queue should be empty.");
Assert.assertFalse(queueMetadata.isAutoDelete());
receiver.close();
queueSession.close();
connection.close();
}
@Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"})
@Test
public void testRetrieveConsumerList(String username, String password,
String hostname, String port) throws Exception {
String queueName = "testSpecificQueueRetrieval";
// Create a durable queue using a JMS client
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(username, password, hostname, port)
.withQueue(queueName)
.build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
QueueSession queueSession = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver receiver1 = queueSession.createReceiver(queue);
QueueReceiver receiver2 = queueSession.createReceiver(queue);
HttpGet httpGet = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH
+ "/" + queueName + "/consumers");
ClientHelper.setAuthHeader(httpGet, username, password);
CloseableHttpResponse response = client.execute(httpGet);
String body = EntityUtils.toString(response.getEntity());
ConsumerMetadata[] consumers = objectMapper.readValue(body, ConsumerMetadata[].class);
for (ConsumerMetadata consumerMetadata : consumers) {
validateTransportPropertyExistence(consumerMetadata);
}
Assert.assertEquals(consumers.length, 2, "Number of consumers returned is incorrect.");
receiver1.close();
receiver2.close();
queueSession.close();
connection.close();
}
@Parameters({"admin-username", "admin-password", "broker-hostname", "broker-port"})
@Test
public void testSpecificConsumerRetrieval(String username, String password,
String hostname, String port) throws Exception {
String queueName = "testSpecificConsumerRetrieval";
// Create a durable queue using a JMS client
InitialContext initialContextForQueue = ClientHelper
.getInitialContextBuilder(username, password, hostname, port)
.withQueue(queueName)
.build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContextForQueue.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
QueueSession queueSession = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(queueName);
QueueReceiver receiver = queueSession.createReceiver(queue);
HttpGet getAllConsumers = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH
+ "/" + queueName + "/consumers");
ClientHelper.setAuthHeader(getAllConsumers, username, password);
CloseableHttpResponse response = client.execute(getAllConsumers);
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_OK);
String body = EntityUtils.toString(response.getEntity());
ConsumerMetadata[] consumers = objectMapper.readValue(body, ConsumerMetadata[].class);
Assert.assertTrue(consumers.length > 0, "Number of consumers returned is incorrect.");
int id = consumers[0].getId();
validateTransportPropertyExistence(consumers[0]);
HttpGet getConsumer = new HttpGet(apiBasePath + QueuesApiDelegate.QUEUES_API_PATH + "/"
+ queueName + "/consumers/" + id);
ClientHelper.setAuthHeader(getConsumer, username, password);
response = client.execute(getConsumer);
Assert.assertEquals(response.getStatusLine().getStatusCode(), HttpStatus.SC_OK);
String consumerString = EntityUtils.toString(response.getEntity());
ConsumerMetadata consumerMetadata = objectMapper.readValue(consumerString, ConsumerMetadata.class);
Assert.assertEquals(consumerMetadata.getId().intValue(), id, "incorrect message id");
receiver.close();
queueSession.close();
connection.close();
}
public void run() {
QueueConnection connection = null;
StandardLogger logger = LoggerUtil.getStandardLogger();
try {
String txt = message.getText();
if (logger.isDebugEnabled()) {
logger.debug("JMS Listener receives request: " + txt);
}
String resp;
ListenerHelper helper = new ListenerHelper();
Map<String, String> metaInfo = new HashMap<>();
metaInfo.put(Listener.METAINFO_PROTOCOL, Listener.METAINFO_PROTOCOL_JMS);
metaInfo.put(Listener.METAINFO_REQUEST_PATH, getQueueName());
metaInfo.put(Listener.METAINFO_SERVICE_CLASS, this.getClass().getName());
metaInfo.put(Listener.METAINFO_REQUEST_ID, message.getJMSMessageID());
metaInfo.put(Listener.METAINFO_CORRELATION_ID, message.getJMSCorrelationID());
if (message.getJMSReplyTo() != null)
metaInfo.put("ReplyTo", message.getJMSReplyTo().toString());
resp = helper.processRequest(txt, metaInfo);
Queue respQueue = (Queue) message.getJMSReplyTo();
String correlId = message.getJMSCorrelationID();
if (resp != null && respQueue != null) {
// String msgId = jmsMessage.getJMSMessageID();
QueueConnectionFactory qcf
= JMSServices.getInstance().getQueueConnectionFactory(null);
connection = qcf.createQueueConnection();
Message respMsg;
try (QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)) {
try (QueueSender sender = session.createSender(respQueue)) {
respMsg = session.createTextMessage(resp);
respMsg.setJMSCorrelationID(correlId);
sender.send(respMsg);
}
}
if (logger.isDebugEnabled()) {
logger.debug("JMS Listener sends response (corr id='" +
correlId + "'): " + resp);
}
}
}
catch (Throwable ex) {
logger.error(ex.getMessage(), ex);
}
finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
}
}
}
private void sendMessageToServer() throws JMSException, IOException {
QueueConnection queueConn = (QueueConnection) connectionFactory.createConnection();
queueConn.start();
QueueSession queueSession = queueConn.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Destination destination = queueSession.createQueue(jmsQueue);
MessageProducer queueSender = queueSession.createProducer(destination);
queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Message message = queueSession.createTextMessage(createMessage());
queueSender.send(message);
}
public void startListner() throws JMSException {
QueueConnection queueConn = (QueueConnection) connectionFactory.createConnection();
QueueSession queueSession = queueConn.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Destination destination = queueSession.createQueue("TESTQUEUE");
MessageConsumer consumer = queueSession.createConsumer(destination);
azureServiceBusEventsListner.setAzureEventsHandler(azureEventsHandler);
consumer.setMessageListener(azureServiceBusEventsListner);
queueConn.start();
}