下面列出了怎么用javax.jms.QueueReceiver的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testGetQueue() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
QueueReceiver receiver = session.createReceiver(queue);
assertNotNull(receiver.getQueue());
assertSame(queue, receiver.getQueue());
receiver.close();
try {
receiver.getQueue();
fail("Cannot read topic on closed receiver");
} catch (IllegalStateException ise) {}
}
@Test
public void createReceiver() throws Exception
{
Queue queue = createQueue(getTestName());
QueueConnection queueConnection = getQueueConnection();
try
{
queueConnection.start();
Utils.sendMessages(queueConnection, queue, 3);
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver receiver = session.createReceiver(queue, String.format("%s=2", INDEX));
assertEquals("Queue names should match from QueueReceiver", queue.getQueueName(), receiver.getQueue().getQueueName());
Message received = receiver.receive(getReceiveTimeout());
assertNotNull("Message is not received", received);
assertEquals("Unexpected message is received", 2, received.getIntProperty(INDEX));
}
finally
{
queueConnection.close();
}
}
@Test
public void testGetQueue() throws Exception {
Connection consumerConnection = null;
try {
consumerConnection = createConnection();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer queueConsumer = consumerSession.createConsumer(queue1);
Queue q = ((QueueReceiver) queueConsumer).getQueue();
ProxyAssertSupport.assertEquals(queue1, q);
} finally {
if (consumerConnection != null) {
consumerConnection.close();
}
}
}
@Test
public void testGetQueueOnClosedConsumer() throws Exception {
Connection consumerConnection = null;
try {
consumerConnection = createConnection();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer queueConsumer = consumerSession.createConsumer(queue1);
queueConsumer.close();
try {
((QueueReceiver) queueConsumer).getQueue();
Assert.fail("must throw a JMS IllegalStateException");
} catch (javax.jms.IllegalStateException e) {
// OK
}
} finally {
if (consumerConnection != null) {
consumerConnection.close();
}
}
}
/**
* Create a queue receiver
*
* @param queue The queue
* @return The queue receiver
* @throws JMSException Thrown if an error occurs
*/
@Override
public QueueReceiver createReceiver(final Queue queue) throws JMSException {
lock();
try {
QueueSession session = getQueueSessionInternal();
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createReceiver " + session + " queue=" + queue);
}
QueueReceiver result = session.createReceiver(queue);
result = new ActiveMQRAQueueReceiver(result, this);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createdReceiver " + session + " receiver=" + result);
}
addConsumer(result);
return result;
} finally {
unlock();
}
}
/**
* Create a queue receiver
*
* @param queue The queue
* @param messageSelector
* @return The queue receiver
* @throws JMSException Thrown if an error occurs
*/
@Override
public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException {
lock();
try {
QueueSession session = getQueueSessionInternal();
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector);
}
QueueReceiver result = session.createReceiver(queue, messageSelector);
result = new ActiveMQRAQueueReceiver(result, this);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createdReceiver " + session + " receiver=" + result);
}
addConsumer(result);
return result;
} finally {
unlock();
}
}
@Override
public Map openThread() throws ListenerException {
Map threadContext = new HashMap();
try {
if (!isSessionsArePooled()) {
QueueSession session = createSession();
threadContext.put(THREAD_CONTEXT_SESSION_KEY, session);
QueueReceiver receiver;
receiver = getServiceReceiver(session);
threadContext.put(THREAD_CONTEXT_RECEIVER_KEY, receiver);
}
return threadContext;
} catch (IfsaException e) {
throw new ListenerException(getLogPrefix()+"exception in openThread()", e);
}
}
protected synchronized String getProviderSelector() {
if (providerSelector==null && useSelectorsForProviders()) {
try {
providerSelector=""; // set default, also to avoid re-evaluation time and time again for lower ifsa-versions.
if (messageProtocol.equals(IfsaMessageProtocolEnum.REQUEST_REPLY)) {
providerSelector=IFSAConstants.QueueReceiver.SELECTOR_RR;
}
if (messageProtocol.equals(IfsaMessageProtocolEnum.FIRE_AND_FORGET)) {
providerSelector=IFSAConstants.QueueReceiver.SELECTOR_FF;
}
} catch (Throwable t) {
log.debug(getLogPrefix()+"exception determining selector, probably lower ifsa version, ignoring");
}
}
return providerSelector;
}
public InitConfServer() {
try {
connection = createSharedConnection();
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(QUEUE);
QueueReceiver receiver = session.createReceiver(queue);
receiver.setMessageListener(this);
} catch (Exception e) {
LOGGER.error("init error", e);
}
}
@Test
public void testToString() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
QueueReceiver receiver = session.createReceiver(queue);
assertNotNull(receiver.toString());
}
@Override
public QueueReceiver createReceiver(
Queue queue,
String messageSelector ) throws JMSException {
return addConsumer( ((QueueSession) session).createReceiver(queue, messageSelector));
}
@Override
public QueueReceiver createReceiver(
Queue queue,
String messageSelector ) throws JMSException {
return addConsumer(queueSession.createReceiver(queue, messageSelector));
}
public synchronized void load() throws GenericServiceException {
try {
InitialContext jndi = JNDIContextFactory.getInitialContext(jndiServer);
QueueConnectionFactory factory = (QueueConnectionFactory) jndi.lookup(jndiName);
if (factory != null) {
con = factory.createQueueConnection(userName, password);
con.setExceptionListener(this);
session = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = (Queue) jndi.lookup(queueName);
if (queue != null) {
QueueReceiver receiver = session.createReceiver(queue);
receiver.setMessageListener(this);
con.start();
this.setConnected(true);
Debug.logInfo("Listening to queue [" + queueName + "]...", module);
} else {
throw new GenericServiceException("Queue lookup failed.");
}
} else {
throw new GenericServiceException("Factory (broker) lookup failed.");
}
} catch (NamingException ne) {
throw new GenericServiceException("JNDI lookup problems; listener not running.", ne);
} catch (JMSException je) {
throw new GenericServiceException("JMS internal error; listener not running.", je);
} catch (GeneralException ge) {
throw new GenericServiceException("Problems with InitialContext; listener not running.", ge);
}
}
@Test
public void testTempQueueDelete() throws Exception {
connection.start();
QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = queueSession.createTemporaryQueue();
ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection();
try {
QueueSession newQueueSession = newConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender queueSender = newQueueSession.createSender(tempQueue);
Message msg = queueSession.createMessage();
queueSender.send(msg);
try {
QueueReceiver consumer = newQueueSession.createReceiver(tempQueue);
fail("should have gotten exception but got consumer: " + consumer);
} catch (JMSException ex) {
//correct
}
connection.close();
try {
Message newMsg = newQueueSession.createMessage();
queueSender.send(newMsg);
} catch (JMSException e) {
//ok
}
} finally {
newConn.close();
}
}
/**
* com.sun.ts.tests.jms.ee.all.queueconn.QueueConnTest line 171
*/
@Test
public void testCreateReceiverWithMessageSelector() throws Exception {
QueueConnection qc = null;
try {
qc = createQueueConnection();
QueueSession qs = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver qreceiver = qs.createReceiver(queue1, "targetMessage = TRUE");
qc.start();
TextMessage m = qs.createTextMessage();
m.setText("one");
m.setBooleanProperty("targetMessage", false);
QueueSender qsender = qs.createSender(queue1);
qsender.send(m);
m.setText("two");
m.setBooleanProperty("targetMessage", true);
qsender.send(m);
TextMessage rm = (TextMessage) qreceiver.receive(1000);
ProxyAssertSupport.assertEquals("two", rm.getText());
} finally {
if (qc != null) {
qc.close();
}
Thread.sleep(2000);
removeAllMessages(queue1.getQueueName(), true);
checkEmpty(queue1);
}
}
/**
* Create a new wrapper
*
* @param consumer the queue receiver
* @param session the session
*/
public ActiveMQRAQueueReceiver(final QueueReceiver consumer, final ActiveMQRASession session) {
super(consumer, session);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("constructor(" + consumer + ", " + session + ")");
}
}
/**
* Get queue
*
* @return The queue
* @throws JMSException Thrown if an error occurs
*/
@Override
public Queue getQueue() throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("getQueue()");
}
checkState();
return ((QueueReceiver) consumer).getQueue();
}
/**
* @see javax.jms.QueueSession#createReceiver(javax.jms.Queue)
*/
@Override
public QueueReceiver createReceiver(Queue queue) throws JMSException {
checkClosed();
checkDestination(queue);
JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue);
JmsQueueReceiver result = new JmsQueueReceiver(getNextConsumerId(), this, dest, null);
result.init();
return result;
}
/**
* @see javax.jms.QueueSession#createReceiver(javax.jms.Queue, java.lang.String)
*/
@Override
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
checkClosed();
checkDestination(queue);
messageSelector = checkSelector(messageSelector);
JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue);
JmsQueueReceiver result = new JmsQueueReceiver(getNextConsumerId(), this, dest, messageSelector);
result.init();
return result;
}
protected QueueReceiver getReceiver(Map threadContext, QueueSession session) throws ListenerException {
if (isSessionsArePooled()) {
try {
return getServiceReceiver(session);
} catch (IfsaException e) {
throw new ListenerException(getLogPrefix()+"exception creating QueueReceiver", e);
}
}
return (QueueReceiver) threadContext.get(THREAD_CONTEXT_RECEIVER_KEY);
}
protected void releaseReceiver(QueueReceiver receiver) throws ListenerException {
if (isSessionsArePooled() && receiver != null) {
try {
receiver.close();
// do not write to log, this occurs too often
} catch (Exception e) {
throw new ListenerException(getLogPrefix()+"exception closing QueueReceiver", e);
}
}
}
@Override
public void closeThread(Map threadContext) throws ListenerException {
if (!isSessionsArePooled()) {
QueueReceiver receiver = (QueueReceiver) threadContext.remove(THREAD_CONTEXT_RECEIVER_KEY);
releaseReceiver(receiver);
QueueSession session = (QueueSession) threadContext.remove(THREAD_CONTEXT_SESSION_KEY);
closeSession(session);
}
}
/**
* Gets the queueReceiver, by utilizing the <code>getInputQueue()</code> method.<br/>
* For serverside getQueueReceiver() the creating of the QueueReceiver is done
* without the <code>selector</code> information, as this is not allowed
* by IFSA.<br/>
* For a clientconnection, the receiver is done with the <code>getClientReplyQueue</code>
*/
public QueueReceiver getReplyReceiver(QueueSession session, Message sentMessage)
throws IfsaException {
if (isProvider()) {
throw new IfsaException("cannot get ReplyReceiver: Provider cannot act as Requestor");
}
return getMessagingSource().getReplyReceiver(session, sentMessage);
}
public void closeReplyReceiver(QueueReceiver receiver) throws IfsaException {
try {
if (receiver!=null) {
Queue replyQueue = receiver.getQueue();
receiver.close();
releaseClientReplyQueue(replyQueue);
}
} catch (JMSException e) {
throw new IfsaException(e);
}
}
@Override
@Nullable
public Queue getQueue() throws JMSException {
return (this.target instanceof QueueReceiver ? ((QueueReceiver) this.target).getQueue() : null);
}
@Override
@Nullable
public Queue getQueue() throws JMSException {
return (this.target instanceof QueueReceiver ? ((QueueReceiver) this.target).getQueue() : null);
}
public QueueReceiver getQueueReceiver() throws JMSException {
return (QueueReceiver) super.getMessageConsumer();
}
@Override
public QueueReceiver createReceiver(Queue queue) throws JMSException {
return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue));
}
@Override
public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector));
}
private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
receiver = new JmsPoolQueueReceiver(this, receiver);
consumers.add(receiver);
return receiver;
}