下面列出了怎么用javax.jms.QueueSession的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testConnectionCredentialsFail() throws Exception {
resourceAdapter = newResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
resourceAdapter.start(ctx);
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
mcf.setResourceAdapter(resourceAdapter);
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
ManagedConnection mc = ((ActiveMQRASession) session).getManagedConnection();
queueConnection.close();
mc.destroy();
try {
queueConnection = qraConnectionFactory.createQueueConnection("testuser", "testwrongpassword");
queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).close();
fail("should throw esxception");
} catch (JMSException e) {
//pass
}
}
/**
* Create a queue receiver
*
* @param queue The queue
* @return The queue receiver
* @throws JMSException Thrown if an error occurs
*/
@Override
public QueueReceiver createReceiver(final Queue queue) throws JMSException {
lock();
try {
QueueSession session = getQueueSessionInternal();
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createReceiver " + session + " queue=" + queue);
}
QueueReceiver result = session.createReceiver(queue);
result = new ActiveMQRAQueueReceiver(result, this);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createdReceiver " + session + " receiver=" + result);
}
addConsumer(result);
return result;
} finally {
unlock();
}
}
@Test
public void testGetQueue() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
QueueReceiver receiver = session.createReceiver(queue);
assertNotNull(receiver.getQueue());
assertSame(queue, receiver.getQueue());
receiver.close();
try {
receiver.getQueue();
fail("Cannot read topic on closed receiver");
} catch (IllegalStateException ise) {}
}
@Test
public void testGetTopicSubscriber() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
JmsPoolQueueReceiver receiver = (JmsPoolQueueReceiver) session.createReceiver(queue);
assertNotNull(receiver.getQueueReceiver());
assertTrue(receiver.getQueueReceiver() instanceof MockJMSQueueReceiver);
receiver.close();
try {
receiver.getQueueReceiver();
fail("Cannot read state on closed receiver");
} catch (IllegalStateException ise) {}
}
private void doTestCreateQueueSender(boolean useAnonymousProducers) throws JMSException {
cf.setUseAnonymousProducers(useAnonymousProducers);
JmsPoolConnection connection = (JmsPoolConnection) cf.createConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue1 = session.createTemporaryQueue();
Queue queue2 = session.createTemporaryQueue();
JmsPoolQueueSender sender1 = (JmsPoolQueueSender) session.createSender(queue1);
JmsPoolQueueSender sender2 = (JmsPoolQueueSender) session.createSender(queue2);
if (useAnonymousProducers) {
assertSame(sender1.getMessageProducer(), sender2.getMessageProducer());
} else {
assertNotSame(sender1.getMessageProducer(), sender2.getMessageProducer());
}
connection.close();
}
@Test
public void testGetTopicSubscriber() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
JmsPoolQueueSender sender = (JmsPoolQueueSender) session.createSender(queue);
assertNotNull(sender.getQueueSender());
assertTrue(sender.getQueueSender() instanceof MockJMSQueueSender);
sender.close();
try {
sender.getQueueSender();
fail("Cannot read state on closed sender");
} catch (IllegalStateException ise) {}
}
@Test
public void testGetQueue() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser.getQueue());
browser.close();
browser.close();
try {
browser.getQueue();
fail("Should not be able to use a closed browser");
} catch (IllegalStateException ise) {
}
}
@Test
public void testGetQueueBrowser() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
JmsPoolQueueBrowser browser = (JmsPoolQueueBrowser) session.createBrowser(queue);
assertNotNull(browser.getQueueBrowser());
browser.close();
try {
browser.getQueueBrowser();
fail("Should not be able to use a closed browser");
} catch (IllegalStateException ise) {
}
}
@Test
public void testGetEnumeration() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser.getEnumeration());
browser.close();
try {
browser.getEnumeration();
fail("Should not be able to use a closed browser");
} catch (IllegalStateException ise) {
}
}
/**
* Create a queue sender
*
* @param queue The queue
* @return The queue sender
* @throws JMSException Thrown if an error occurs
*/
@Override
public QueueSender createSender(final Queue queue) throws JMSException {
lock();
try {
QueueSession session = getQueueSessionInternal();
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createSender " + session + " queue=" + queue);
}
QueueSender result = session.createSender(queue);
result = new ActiveMQRAQueueSender(result, this);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createdSender " + session + " sender=" + result);
}
addProducer(result);
return result;
} finally {
unlock();
}
}
@Override
protected void setUp() throws Exception {
super.setUp();
context = createApplicationContext();
createConnections();
requestServerSession = localConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
theQueue = requestServerSession.createQueue(QUEUE_NAME);
requestServerConsumer = requestServerSession.createConsumer(theQueue);
requestServerProducer = requestServerSession.createProducer(null);
QueueSession session = localConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
requestor = new QueueRequestor(session, theQueue);
}
public static ManagedSession create( Session session ) {
if ( (session instanceof XAQueueSession) && (session instanceof XATopicSession))
return new ManagedXAQueueTopicSession(session);
if (session instanceof XAQueueSession)
return new ManagedXAQueueSession((XAQueueSession) session);
if (session instanceof XATopicSession)
return new ManagedXATopicSession((XATopicSession) session);
if ( (session instanceof QueueSession) && (session instanceof TopicSession))
return new ManagedQueueTopicSession(session);
if (session instanceof QueueSession)
return new ManagedQueueSession((QueueSession) session);
if (session instanceof TopicSession)
return new ManagedTopicSession((TopicSession) session);
return new ManagedSession(session);
}
public Destination lookupDestination(String destinationName) throws JmsException {
Session session=null;
try {
session = createSession(false,Session.AUTO_ACKNOWLEDGE);
log.debug("Session class ["+session.getClass().getName()+"]");
Destination destination;
/* create the destination */
if (session instanceof TopicSession) {
destination = ((TopicSession)session).createTopic(destinationName);
} else {
destination = ((QueueSession)session).createQueue(destinationName);
}
return destination;
} catch (Exception e) {
throw new JmsException("cannot create destination", e);
} finally {
releaseSession(session);
}
}
/**
* Creates a AMQP connection with the number of channels specified, registered on top of it.
*
* @param numberOfChannels number of channels to be created using the connection
* @param userName admin user
* @param password admin password
* @param hostName localhost
* @param port the AMQP port for which the broker listens to
* @return the created JMS connection
* @throws NamingException if an error occurs while creating the context/connection factory using given properties.
* @throws JMSException if an error occurs while creating/starting the connection/session
*/
private Connection createConnection(int numberOfChannels, String userName, String password, String hostName,
String port) throws NamingException, JMSException {
InitialContext initialContext
= ClientHelper.getInitialContextBuilder(userName, password, hostName, port).build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
for (int i = 0; i < numberOfChannels; i++) {
QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
/*
For each channel, create a number of consumers that is equal to the channel number.
e.g. if the channel count is 3, channel1 has 1 consumer, channel2 has 2 consumers and channel3 has 3
consumers
*/
for (int j = 0; j < i; j++) {
Queue queue = session.createQueue("queue");
session.createReceiver(queue);
}
}
return connection;
}
/**
* Create a queue receiver
*
* @param queue The queue
* @param messageSelector
* @return The queue receiver
* @throws JMSException Thrown if an error occurs
*/
@Override
public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException {
lock();
try {
QueueSession session = getQueueSessionInternal();
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
}
QueueReceiver result = session.createReceiver(queue, messageSelector);
result = new ActiveMQRAQueueReceiver(result, this);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createdReceiver " + session + " receiver=" + result);
}
addConsumer(result);
return result;
} finally {
unlock();
}
}
protected void consumeTestQueueMessages(String queueName, int num) throws Exception {
// Start the connection
Connection connection = cf.createConnection();
connection.setClientID("clientId2" + queueName);
connection.start();
Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer;
try {
consumer = session.createConsumer(queue);
for (int i = 0; i < num; i++) {
consumer.receive();
}
consumer.close();
} finally {
// consumer.close();
connection.close();
}
}
@Test
public void testQueueSessionCannotCreateTopics() throws Exception
{
QueueConnection queueConnection = getQueueConnection();
try
{
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
queueSession.createTopic("abc");
fail("expected exception did not occur");
}
catch (javax.jms.IllegalStateException s)
{
// PASS
}
}
finally
{
queueConnection.close();
}
}
@Test
public void testQueueSessionCannotCreateDurableSubscriber() throws Exception
{
Topic topic = createTopic(getTestName());
QueueConnection queueConnection = getQueueConnection();
try
{
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
queueSession.createDurableSubscriber(topic, "abc");
fail("expected exception did not occur");
}
catch (javax.jms.IllegalStateException s)
{
// PASS
}
}
finally
{
queueConnection.close();
}
}
@Override
public Map openThread() throws ListenerException {
Map threadContext = new HashMap();
try {
if (!isSessionsArePooled()) {
QueueSession session = createSession();
threadContext.put(THREAD_CONTEXT_SESSION_KEY, session);
QueueReceiver receiver;
receiver = getServiceReceiver(session);
threadContext.put(THREAD_CONTEXT_RECEIVER_KEY, receiver);
}
return threadContext;
} catch (IfsaException e) {
throw new ListenerException(getLogPrefix()+"exception in openThread()", e);
}
}
@Test
public void anonymousSenderSendToUnknownQueue() throws Exception
{
QueueConnection connection = ((QueueConnection) getConnectionBuilder().setSyncPublish(true).build());
try
{
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue invalidDestination = session.createQueue("unknown");
try
{
QueueSender sender = session.createSender(null);
sender.send(invalidDestination, session.createMessage());
fail("Exception not thrown");
}
catch (InvalidDestinationException e)
{
//PASS
}
}
finally
{
connection.close();
}
}
protected void consumeDurableTestMessages(Connection connection, String sub, int size, String topicName,
AtomicLong publishedMessageSize) throws Exception {
Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
try {
TopicSubscriber consumer = session.createDurableSubscriber(topic, sub);
for (int i = 0; i < size; i++) {
ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
if (publishedMessageSize != null) {
publishedMessageSize.addAndGet(-message.getCoreMessage().getEncodeSize());
}
}
} finally {
session.close();
}
}
/**
* Create a MessageConsumer. In this overloaded function the selector is taken into account.
* This ensures that listeners (or other extensions of this class) do not influence how the selector
* is used: when a correlationID should be in the filter the <code>getMessageConsumerForCorrelationId</code>
* should be used, other wise the <code>getMessageConsumer</code> function which has no attribute for
* <code>selector</code>. When a MessageSelector is set, it will be used when no correlation id is required.
* @param session the Session
* @param destination the Destination
* @param selector the MessageSelector
* @return MessageConsumer
*/
public MessageConsumer getMessageConsumer(Session session, Destination destination, String selector) throws NamingException, JMSException {
if (useTopicFunctions) {
if (useJms102()) {
return getTopicSubscriber((TopicSession)session, (Topic)destination, selector);
} else {
return getTopicSubscriber(session, (Topic)destination, selector);
}
} else {
if (useJms102()) {
return getQueueReceiver((QueueSession)session, (Queue)destination, selector);
} else {
return session.createConsumer(destination, selector);
}
}
}
/**
* Send a message to testInboundQueue queue
*
* @throws Exception
*/
private void sendMessage() throws Exception {
InitialContext initialContext = JmsClientHelper.getActiveMqInitialContext();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContext.lookup(JmsClientHelper.QUEUE_CONNECTION_FACTORY);
QueueConnection queueConnection = connectionFactory.createQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
QueueSender sender = queueSession.createSender(queueSession.createQueue(QUEUE_NAME));
String message = "<?xml version='1.0' encoding='UTF-8'?>" +
" <ser:getQuote xmlns:ser=\"http://services.samples\" xmlns:xsd=\"http://services.samples/xsd\"> " +
" <ser:request>" +
" <xsd:symbol>IBM</xsd:symbol>" +
" </ser:request>" +
" </ser:getQuote>";
try {
TextMessage jmsMessage = queueSession.createTextMessage(message);
jmsMessage.setJMSType("incorrecttype");
sender.send(jmsMessage);
} finally {
queueConnection.close();
}
}
public MessageConsumer createMessageConsumer(Session session, Destination destination) {
try {
if (JMSConstants.JMS_SPEC_VERSION_2_0.equals(jmsSpec) && isSharedSubscription) {
if (isDurable) {
return session.createSharedDurableConsumer((Topic) destination, subscriptionName, messageSelector);
} else {
return session.createSharedConsumer((Topic) destination, subscriptionName, messageSelector);
}
} else if ((JMSConstants.JMS_SPEC_VERSION_1_1.equals(jmsSpec)) || (
JMSConstants.JMS_SPEC_VERSION_2_0.equals(jmsSpec) && !isSharedSubscription)) {
if (isDurable) {
return session.createDurableSubscriber((Topic) destination, subscriptionName, messageSelector,
noPubSubLocal);
} else {
return session.createConsumer(destination, messageSelector);
}
} else {
if (this.destinationType.equals(JMSConstants.JMSDestinationType.QUEUE)) {
return ((QueueSession) session).createReceiver((Queue) destination, messageSelector);
} else {
if (isDurable) {
return ((TopicSession) session)
.createDurableSubscriber((Topic) destination, subscriptionName, messageSelector,
noPubSubLocal);
} else {
return ((TopicSession) session).createSubscriber((Topic) destination, messageSelector, false);
}
}
}
} catch (JMSException e) {
logger.error("JMS Exception while creating consumer. " + e.getMessage(), e);
}
return null;
}
@Override public QueueSession getQueueSession() throws JMSException {
if ((types & TYPE_XA_QUEUE) != TYPE_XA_QUEUE) {
throw new IllegalStateException(delegate + " is not an XAQueueSession");
}
QueueSession xats = ((XAQueueSession) delegate).getQueueSession();
return TracingSession.create(xats, jmsTracing);
}
private Session getCloseSuppressingSessionProxy(Session target) {
List<Class<?>> classes = new ArrayList<>(3);
classes.add(SessionProxy.class);
if (target instanceof QueueSession) {
classes.add(QueueSession.class);
}
if (target instanceof TopicSession) {
classes.add(TopicSession.class);
}
return (Session) Proxy.newProxyInstance(SessionProxy.class.getClassLoader(),
ClassUtils.toClassArray(classes), new CloseSuppressingSessionInvocationHandler(target));
}
/**
* Wrap the given Session with a proxy that delegates every method call to it
* but adapts close calls. This is useful for allowing application code to
* handle a special framework Session just like an ordinary Session.
* @param target the original Session to wrap
* @param sessionList the List of cached Sessions that the given Session belongs to
* @return the wrapped Session
*/
protected Session getCachedSessionProxy(Session target, LinkedList<Session> sessionList) {
List<Class<?>> classes = new ArrayList<>(3);
classes.add(SessionProxy.class);
if (target instanceof QueueSession) {
classes.add(QueueSession.class);
}
if (target instanceof TopicSession) {
classes.add(TopicSession.class);
}
return (Session) Proxy.newProxyInstance(SessionProxy.class.getClassLoader(),
ClassUtils.toClassArray(classes), new CachedSessionInvocationHandler(target, sessionList));
}
/**
* Dynamically creates a topic. This goes against the normal idea
* that JMS queues and topics should managed administratively, using
* management tools. But for some applications this would be too
* burdensome. The user would have to additionally know about the
* administration tools as well. Given that might be creating quite
* a few AVL feeds, each one being a separate topic, this could be
* a real nuisance.
*
* @param topicName
* @return true if topic created successfully
* @throws JMSException
*/
private boolean createTopic(String topicName) throws JMSException {
QueueConnectionFactory queueConnectionFactory =
(QueueConnectionFactory) connectionFactory;
QueueConnection connection = queueConnectionFactory.createQueueConnection();
Queue managementQueue = HornetQJMSClient.createQueue("hornetq.management");
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Message message = session.createMessage();
JMSManagementHelper.putOperationInvocation(message,
"jms.server",
"createTopic", // management command
topicName, // Name in hornetq
topicName); // JNDI name. This peculiar seemingly undocumented
// parameter is needed so that can use JNDI to access
// the dynamically created topic. Found info on doing
// this at https://community.jboss.org/thread/165355 .
QueueRequestor requestor = new QueueRequestor(session, managementQueue);
// Determine if was successful
Message reply = requestor.request(message);
boolean topicCreated = JMSManagementHelper.hasOperationSucceeded(reply);
if (topicCreated)
logger.info("Dynamically created topic \"" + topicName + "\"");
else
logger.error("Failed to dynamically created topic \"" + topicName + "\"");
// Return whether successful
return topicCreated;
}
@Test
public void testWithResponsiveMessageDelegateNoDefaultDestination_SendsReturnTextMessageWhenSessionSupplied_AndSendingThrowsJMSException() throws Exception {
Queue destination = mock(Queue.class);
final TextMessage sentTextMessage = mock(TextMessage.class);
// correlation ID is queried when response is being created...
given(sentTextMessage.getJMSCorrelationID()).willReturn(CORRELATION_ID);
// Reply-To is queried when response is being created...
given(sentTextMessage.getJMSReplyTo()).willReturn(destination);
TextMessage responseTextMessage = mock(TextMessage.class);
MessageProducer messageProducer = mock(MessageProducer.class);
willThrow(new JMSException("Doe!")).given(messageProducer).send(responseTextMessage);
final QueueSession session = mock(QueueSession.class);
given(session.createTextMessage(RESPONSE_TEXT)).willReturn(responseTextMessage);
given(session.createProducer(destination)).willReturn(messageProducer);
ResponsiveMessageDelegate delegate = mock(ResponsiveMessageDelegate.class);
given(delegate.handleMessage(sentTextMessage)).willReturn(RESPONSE_TEXT);
final MessageListenerAdapter adapter = new MessageListenerAdapter(delegate) {
@Override
protected Object extractMessage(Message message) {
return message;
}
};
try {
adapter.onMessage(sentTextMessage, session);
fail("expected CouldNotSendReplyException with JMSException");
}
catch (ReplyFailureException ex) {
assertEquals(JMSException.class, ex.getCause().getClass());
}
verify(responseTextMessage).setJMSCorrelationID(CORRELATION_ID);
verify(messageProducer).close();
verify(delegate).handleMessage(sentTextMessage);
}
@Before
public void setUpMocks() throws Exception {
mockConnectionFactory = mock(QueueConnectionFactory.class);
mockConnection = mock(QueueConnection.class);
mockSession = mock(QueueSession.class);
mockQueue = mock(Queue.class);
given(mockConnectionFactory.createConnection()).willReturn(mockConnection);
given(mockConnection.createSession(false, Session.AUTO_ACKNOWLEDGE)).willReturn(mockSession);
}