下面列出了怎么用javax.jms.TemporaryQueue的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Actually execute the given request, sending the invoker request message
* to the specified target queue and waiting for a corresponding response.
* <p>The default implementation is based on standard JMS send/receive,
* using a {@link javax.jms.TemporaryQueue} for receiving the response.
* @param session the JMS Session to use
* @param queue the resolved target Queue to send to
* @param requestMessage the JMS Message to send
* @return the RemoteInvocationResult object
* @throws JMSException in case of JMS failure
*/
@Nullable
protected Message doExecuteRequest(Session session, Queue queue, Message requestMessage) throws JMSException {
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(queue);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
producer.send(requestMessage);
long timeout = getReceiveTimeout();
return (timeout > 0 ? consumer.receive(timeout) : consumer.receive());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
/**
* Actually execute the given request, sending the invoker request message
* to the specified target queue and waiting for a corresponding response.
* <p>The default implementation is based on standard JMS send/receive,
* using a {@link javax.jms.TemporaryQueue} for receiving the response.
* @param session the JMS Session to use
* @param queue the resolved target Queue to send to
* @param requestMessage the JMS Message to send
* @return the RemoteInvocationResult object
* @throws JMSException in case of JMS failure
*/
@Nullable
protected Message doExecuteRequest(Session session, Queue queue, Message requestMessage) throws JMSException {
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(queue);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
producer.send(requestMessage);
long timeout = getReceiveTimeout();
return (timeout > 0 ? consumer.receive(timeout) : consumer.receive());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
/**
* 初始化key-value值
*
* @throws JMSException
*/
private void initKeyValues() throws JMSException {
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
Queue queue = queueSession.createQueue(QUEUE);
TextMessage requestMessage = queueSession.createTextMessage();
requestMessage.setText(generateKeyString());
responseQueue = queueSession.createTemporaryQueue();
producer = queueSession.createProducer(queue);
consumer = queueSession.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
producer.send(requestMessage);
MapMessage receiveMap = (MapMessage) consumer.receive();
@SuppressWarnings("unchecked")
Enumeration<String> mapNames = receiveMap.getPropertyNames();
while (mapNames.hasMoreElements()) {
String key = mapNames.nextElement();
String value = receiveMap.getStringProperty(key);
keyValueMap.put(key, value);
LOGGER.info("init key = " + key + ",value = " + value);
}
}
/**
* Test that if the JMS ReplyTo header field has been set as a <code>TemporaryQueue</code>,
* it will be rightly get also as a <code>TemporaryQueue</code>
* (and not only as a <code>Queue</code>).
*/
@Test
public void testJMSReplyTo_2() {
try {
TemporaryQueue tempQueue = senderSession.createTemporaryQueue();
Message message = senderSession.createMessage();
message.setJMSReplyTo(tempQueue);
sender.send(message);
Message msg = receiver.receive(TestConfig.TIMEOUT);
Destination dest = msg.getJMSReplyTo();
Assert.assertTrue("JMS ReplyTo header field should be a TemporaryQueue", dest instanceof TemporaryQueue);
Queue replyTo = (Queue) dest;
Assert.assertEquals("JMS ReplyTo header field should be equals to the temporary queue", replyTo.getQueueName(), tempQueue.getQueueName());
} catch (JMSException e) {
fail(e);
}
}
private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException, InterruptedException {
Connection connection = cf.createConnection();
try {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = session.createTemporaryQueue();
TextMessage msg = session.createTextMessage("Request");
msg.setJMSReplyTo(tempQueue);
MessageProducer producer = session.createProducer(session.createQueue(serviceQueue));
producer.send(msg);
MessageConsumer consumer = session.createConsumer(tempQueue);
Message replyMsg = consumer.receive();
assertNotNull(replyMsg);
LOG.debug("Reply message: {}", replyMsg);
consumer.close();
producer.close();
session.close();
} finally {
connection.close();
}
}
@Test
public void testTemporaryQueueShouldNotBeInJNDI() throws Exception {
Connection producerConnection = createConnection();
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = producerSession.createTemporaryQueue();
String queueName = tempQueue.getQueueName();
try {
ic.lookup("/queue/" + queueName);
ProxyAssertSupport.fail("The temporary queue should not be bound to JNDI");
} catch (NamingException e) {
// Expected
}
}
@Test
public void testCreateTemporaryQueueSuccess() throws Exception
{
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", USER1),
String.format("ACL ALLOW-LOG %s CREATE QUEUE temporary=\"true\"", USER1),
isLegacyClient() ? String.format("ACL ALLOW-LOG %s BIND EXCHANGE name=\"*\" temporary=true", USER1) : "");
Connection connection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
assertNotNull(queue);
}
finally
{
connection.close();
}
}
@Test
public void testNoPrefixSet() throws Exception
{
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
assertTrue(queue.getQueueName() + " does not start with \"TempQueue\".",
queue.getQueueName().startsWith("TempQueue"));
}
finally
{
connection.close();
}
}
@Test
public void testEmptyPrefix() throws Exception
{
updateGlobalAddressDomains("[]");
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
assertTrue(queue.getQueueName() + " does not start with \"TempQueue\".",
queue.getQueueName().startsWith("TempQueue"));
}
finally
{
connection.close();
}
}
@Test
public void testTwoDomains() throws Exception
{
final String primaryPrefix = "/testPrefix";
updateGlobalAddressDomains("[\"" + primaryPrefix + "\", \"/foo\" ]");
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
assertFalse(queue.getQueueName() + " has superfluous slash in prefix.",
queue.getQueueName().startsWith(("[\"" + primaryPrefix + "\", \"/foo\" ]") + "/"));
assertTrue(queue.getQueueName() + " does not start with expected prefix \"" + primaryPrefix + "\".",
queue.getQueueName().startsWith(primaryPrefix));
}
finally
{
connection.close();
}
}
@Test
public void testPrefix() throws Exception
{
String prefix = "/testPrefix";
updateGlobalAddressDomains("[ \"" + prefix + "\" ]");
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
assertTrue(queue.getQueueName() + " does not start with expected prefix \"" + prefix + "/\".",
queue.getQueueName().startsWith(prefix + "/"));
}
finally
{
connection.close();
}
}
/**
* Actually execute the given request, sending the invoker request message
* to the specified target queue and waiting for a corresponding response.
* <p>The default implementation is based on standard JMS send/receive,
* using a {@link javax.jms.TemporaryQueue} for receiving the response.
* @param session the JMS Session to use
* @param queue the resolved target Queue to send to
* @param requestMessage the JMS Message to send
* @return the RemoteInvocationResult object
* @throws JMSException in case of JMS failure
*/
protected Message doExecuteRequest(Session session, Queue queue, Message requestMessage) throws JMSException {
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(queue);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
producer.send(requestMessage);
long timeout = getReceiveTimeout();
return (timeout > 0 ? consumer.receive(timeout) : consumer.receive());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
public final boolean oneIteration() throws Exception {
if ((tempQueues) && (tempQueuePerMessage)) {
// Close existing temporary queue
if (messageConsumer != null) messageConsumer.close();
if (destConsumer != null) {
((TemporaryQueue) destConsumer).delete();
}
// Open new temporary queue
destConsumer = context.createTemporaryQueue();
messageConsumer = context.createConsumer(destConsumer);
outMessage.setJMSReplyTo(destConsumer);
}
messageProducer.send(destProducer, outMessage);
if ( transacted ) context.commit();
if ((inMessage = messageConsumer.receive(timeout)) != null) {
if ( transacted ) context.commit();
incIterations();
} else {
throw new Exception("No response to message (\nID: " + outMessage.getJMSMessageID() + "\nCorrID: " + outMessage.getJMSCorrelationID() +" )");
}
return true;
}
public final boolean oneIteration() throws Exception {
if ((tempQueues) && (tempQueuePerMessage)) {
// Close temporary queue
if (messageConsumer != null) messageConsumer.close();
if (destConsumer != null) {
((TemporaryQueue) destConsumer).delete();
}
// Open new temporary queue
destConsumer = session.createTemporaryQueue();
messageConsumer = session.createConsumer(destConsumer);
outMessage.setJMSReplyTo(destConsumer);
}
startResponseTimePeriod();
messageProducer.send(outMessage, deliveryMode, priority, expiry);
if (transacted) session.commit();
if ((inMessage = messageConsumer.receive(timeout))!= null) {
if (transacted) session.commit();
incIterations();
} else {
throw new Exception("No response to message (\nID: "+outMessage.getJMSMessageID()+ "\nCorrId: " + outMessage.getJMSCorrelationID() +")");
}
return true;
}
@Test
public void testForTempQueueCleanerUpperLeak() throws Exception {
try {
conn = createConnection();
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue temporaryQueue = s.createTemporaryQueue();
temporaryQueue.delete();
for (ServerSession serverSession : server.getSessions()) {
assertEquals(0, ((ServerSessionImpl)serverSession).getTempQueueCleanUppers().size());
}
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
public void testForTempQueueTargetInfosLeak() throws Exception {
try {
conn = createConnection();
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue temporaryQueue = s.createTemporaryQueue();
MessageProducer producer = s.createProducer(temporaryQueue);
producer.send(s.createMessage());
temporaryQueue.delete();
for (ServerSession serverSession : server.getSessions()) {
assertFalse(((ServerSessionImpl)serverSession).cloneTargetAddresses().containsKey(SimpleString.toSimpleString(temporaryQueue.getQueueName())));
}
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
public void testForTempQueueTargetInfosSizeLimit() throws Exception {
try {
conn = createConnection();
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < 200; i++) {
TemporaryQueue temporaryQueue = s.createTemporaryQueue();
MessageProducer producer = s.createProducer(temporaryQueue);
producer.send(s.createMessage());
}
for (ServerSession serverSession : server.getSessions()) {
assertTrue(((ServerSessionImpl)serverSession).cloneTargetAddresses().size() <= 100);
}
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
public void testForSecurityCacheLeak() throws Exception {
server.getSecurityStore().setSecurityEnabled(true);
ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
securityManager.getConfiguration().addUser("IDo", "Exist");
securityManager.getConfiguration().addRole("IDo", "myrole");
Role myRole = new Role("myrole", true, true, true, true, true, true, true, true, true, true);
Set<Role> anySet = new HashSet<>();
anySet.add(myRole);
server.getSecurityRepository().addMatch("#", anySet);
try {
conn = addConnection(cf.createConnection("IDo", "Exist"));
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < 10; i++) {
TemporaryQueue temporaryQueue = s.createTemporaryQueue();
temporaryQueue.delete();
}
assertEquals(0, server.getSecurityRepository().getCacheSize());
} finally {
if (conn != null) {
conn.close();
}
}
}
public static byte destinationType(Destination destination) {
if (destination instanceof Queue) {
if (destination instanceof TemporaryQueue) {
return TEMP_QUEUE_TYPE;
} else {
return QUEUE_TYPE;
}
} else if (destination instanceof Topic) {
if (destination instanceof TemporaryTopic) {
return TEMP_TOPIC_TYPE;
} else {
return TOPIC_TYPE;
}
}
return QUEUE_TYPE;
}
@Test
public void testTempQueues() throws Exception {
TemporaryQueue temp = localSession.createTemporaryQueue();
MessageProducer producer = localSession.createProducer(temp);
producer.send(localSession.createTextMessage("test"));
Thread.sleep(100);
assertEquals("Destination not created", 1, remoteBroker.getAdminView().getTemporaryQueues().length);
temp.delete();
assertTrue("Destination not deleted", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == remoteBroker.getAdminView().getTemporaryQueues().length;
}
}));
}
/**
* https://jira.jboss.org/jira/browse/JBMESSAGING-1566
*/
@Test
public void testCanNotCreateConsumerFromAnotherConnectionForTemporaryQueue() throws Exception {
Connection conn = createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = sess.createTemporaryQueue();
Connection anotherConn = createConnection();
Session sessFromAnotherConn = anotherConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
sessFromAnotherConn.createConsumer(tempQueue);
ProxyAssertSupport.fail("Only temporary destination's own connection is allowed to create MessageConsumers for them.");
} catch (JMSException e) {
}
conn.close();
anotherConn.close();
}
public void testNoSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue();
MessageConsumer consumer = s.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
}
});
Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(1000);
assertNull(msg);
}
public void testSlowConsumerAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue();
MessageConsumer consumer = s.createConsumer(queue);
assertNotNull(consumer);
Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
}
public void testMessageDeliveryAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue();
MessageConsumer consumer = s.createConsumer(queue);
assertNotNull(consumer);
Topic advisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
Message msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
}
public void testTempMessageConsumedAdvisory() throws Exception {
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = s.createTemporaryQueue();
MessageConsumer consumer = s.createConsumer(queue);
Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
//start throwing messages at the consumer
MessageProducer producer = s.createProducer(queue);
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
String id = m.getJMSMessageID();
Message msg = consumer.receive(1000);
assertNotNull(msg);
msg = advisoryConsumer.receive(1000);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
String originalId = payload.getJMSMessageID();
assertEquals(originalId, id);
}
/**
* Test you can't delete a Destination with Active Subscribers
*
* @throws JMSException
*/
@Test
public void testDeleteDestinationWithSubscribersFails() throws JMSException {
Connection connection = factory.createConnection();
connections.add(connection);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
connection.start();
session.createConsumer(queue);
// This message delivery should NOT work since the temp connection is
// now closed.
try {
queue.delete();
Assert.fail("Should fail as Subscribers are active");
} catch (JMSException e) {
Assert.assertTrue("failed to throw an exception", true);
}
}
/**
* Send a request message to the given {@link Destination} and block until
* a reply has been received on a temporary queue created on-the-fly.
* <p>Return the response message or {@code null} if no message has
* @throws JMSException if thrown by JMS API methods
*/
@Nullable
protected Message doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator)
throws JMSException {
Assert.notNull(messageCreator, "MessageCreator must not be null");
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
Message requestMessage = messageCreator.createMessage(session);
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(destination);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
if (logger.isDebugEnabled()) {
logger.debug("Sending created message: " + requestMessage);
}
doSend(producer, requestMessage);
return receiveFromConsumer(consumer, getReceiveTimeout());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
/**
* Send a request message to the given {@link Destination} and block until
* a reply has been received on a temporary queue created on-the-fly.
* <p>Return the response message or {@code null} if no message has
* @throws JMSException if thrown by JMS API methods
*/
@Nullable
protected Message doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator)
throws JMSException {
Assert.notNull(messageCreator, "MessageCreator must not be null");
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
Message requestMessage = messageCreator.createMessage(session);
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(destination);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
if (logger.isDebugEnabled()) {
logger.debug("Sending created message: " + requestMessage);
}
doSend(producer, requestMessage);
return receiveFromConsumer(consumer, getReceiveTimeout());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
private void testQueueSendListen(Queue queue, Function<Destination, CompletableFuture<Message>> listenerRegistrationFunction)
throws Exception {
CompletableFuture<Message> incomingMessageFuture = listenerRegistrationFunction.apply(queue);
String message = UUID.randomUUID().toString();
TextMessage outgoingMessage = brokerFacade.createTextMessage(message);
brokerFacade.send(queue, outgoingMessage, false);
Message incomingMessage = incomingMessageFuture.get(3, TimeUnit.SECONDS);
verifyMessage(message, incomingMessage);
String queueName = queue.getQueueName();
// special handling for temp queues
if (queue instanceof TemporaryQueue || queueName.startsWith(TIBCO_TMP_QUEUE_PREFIX)) {
queueName = TEMP;
}
verifySendListenOnNonTracedThread(queueName, outgoingMessage, 1);
}
@Override
public TemporaryQueue createTemporaryQueue() throws JMSException {
TemporaryQueue result;
result = getInternalSession().createTemporaryQueue();
// Notify all of the listeners of the created temporary Queue.
for (JmsPoolSessionEventListener listener : this.sessionEventListeners) {
listener.onTemporaryQueueCreate(result);
}
return result;
}