下面列出了javax.jms.TemporaryQueue#delete ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Actually execute the given request, sending the invoker request message
* to the specified target queue and waiting for a corresponding response.
* <p>The default implementation is based on standard JMS send/receive,
* using a {@link javax.jms.TemporaryQueue} for receiving the response.
* @param session the JMS Session to use
* @param queue the resolved target Queue to send to
* @param requestMessage the JMS Message to send
* @return the RemoteInvocationResult object
* @throws JMSException in case of JMS failure
*/
@Nullable
protected Message doExecuteRequest(Session session, Queue queue, Message requestMessage) throws JMSException {
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(queue);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
producer.send(requestMessage);
long timeout = getReceiveTimeout();
return (timeout > 0 ? consumer.receive(timeout) : consumer.receive());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
/**
* Actually execute the given request, sending the invoker request message
* to the specified target queue and waiting for a corresponding response.
* <p>The default implementation is based on standard JMS send/receive,
* using a {@link javax.jms.TemporaryQueue} for receiving the response.
* @param session the JMS Session to use
* @param queue the resolved target Queue to send to
* @param requestMessage the JMS Message to send
* @return the RemoteInvocationResult object
* @throws JMSException in case of JMS failure
*/
@Nullable
protected Message doExecuteRequest(Session session, Queue queue, Message requestMessage) throws JMSException {
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(queue);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
producer.send(requestMessage);
long timeout = getReceiveTimeout();
return (timeout > 0 ? consumer.receive(timeout) : consumer.receive());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
/**
* Actually execute the given request, sending the invoker request message
* to the specified target queue and waiting for a corresponding response.
* <p>The default implementation is based on standard JMS send/receive,
* using a {@link javax.jms.TemporaryQueue} for receiving the response.
* @param session the JMS Session to use
* @param queue the resolved target Queue to send to
* @param requestMessage the JMS Message to send
* @return the RemoteInvocationResult object
* @throws JMSException in case of JMS failure
*/
protected Message doExecuteRequest(Session session, Queue queue, Message requestMessage) throws JMSException {
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(queue);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
producer.send(requestMessage);
long timeout = getReceiveTimeout();
return (timeout > 0 ? consumer.receive(timeout) : consumer.receive());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
@Test
public void testForTempQueueCleanerUpperLeak() throws Exception {
try {
conn = createConnection();
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue temporaryQueue = s.createTemporaryQueue();
temporaryQueue.delete();
for (ServerSession serverSession : server.getSessions()) {
assertEquals(0, ((ServerSessionImpl)serverSession).getTempQueueCleanUppers().size());
}
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
public void testTempQueues() throws Exception {
TemporaryQueue temp = localSession.createTemporaryQueue();
MessageProducer producer = localSession.createProducer(temp);
producer.send(localSession.createTextMessage("test"));
Thread.sleep(100);
assertEquals("Destination not created", 1, remoteBroker.getAdminView().getTemporaryQueues().length);
temp.delete();
assertTrue("Destination not deleted", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == remoteBroker.getAdminView().getTemporaryQueues().length;
}
}));
}
/**
* Send a request message to the given {@link Destination} and block until
* a reply has been received on a temporary queue created on-the-fly.
* <p>Return the response message or {@code null} if no message has
* @throws JMSException if thrown by JMS API methods
*/
@Nullable
protected Message doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator)
throws JMSException {
Assert.notNull(messageCreator, "MessageCreator must not be null");
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
Message requestMessage = messageCreator.createMessage(session);
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(destination);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
if (logger.isDebugEnabled()) {
logger.debug("Sending created message: " + requestMessage);
}
doSend(producer, requestMessage);
return receiveFromConsumer(consumer, getReceiveTimeout());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
/**
* Send a request message to the given {@link Destination} and block until
* a reply has been received on a temporary queue created on-the-fly.
* <p>Return the response message or {@code null} if no message has
* @throws JMSException if thrown by JMS API methods
*/
@Nullable
protected Message doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator)
throws JMSException {
Assert.notNull(messageCreator, "MessageCreator must not be null");
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
Message requestMessage = messageCreator.createMessage(session);
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(destination);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
if (logger.isDebugEnabled()) {
logger.debug("Sending created message: " + requestMessage);
}
doSend(producer, requestMessage);
return receiveFromConsumer(consumer, getReceiveTimeout());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
public void testCreateLotsOfTemporaryQueues() throws Exception {
LOG.info("Creating " + numberToCreate + " temporary queue(s)");
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < numberToCreate; i++) {
if (i % 1000 == 0) {
LOG.info("attempt " + i);
}
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
temporaryQueue.delete();
Thread.sleep(sleep);
}
LOG.info("Created " + numberToCreate + " temporary queue(s)");
}
@Test
public void testTempQueueLeak() throws Exception {
Connection connection = null;
try {
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
temporaryQueue.delete();
Object[] queueResources = server.getManagementService().getResources(QueueControl.class);
for (Object queueResource : queueResources) {
if (((QueueControl) queueResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) {
QueueControl queueControl = (QueueControl) queueResource;
Wait.waitFor(() -> queueControl.getMessageCount() == 0);
assertNotNull("addressControl for temp advisory", queueControl);
Wait.assertEquals(0, queueControl::getMessageCount);
Wait.assertEquals(2, queueControl::getMessagesAdded);
}
}
} finally {
if (connection != null) {
connection.close();
}
}
}
@Test
public void testTempQueueLeakManyConnections() throws Exception {
final Connection[] connections = new Connection[20];
try {
for (int i = 0; i < connections.length; i++) {
connections[i] = factory.createConnection();
connections[i].start();
}
Session session = connections[0].createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < connections.length; i++) {
TemporaryQueue temporaryQueue = session.createTemporaryQueue();
temporaryQueue.delete();
}
Object[] addressResources = server.getManagementService().getResources(AddressControl.class);
for (Object addressResource : addressResources) {
if (((AddressControl) addressResource).getAddress().equals("ActiveMQ.Advisory.TempQueue")) {
AddressControl addressControl = (AddressControl) addressResource;
Wait.waitFor(() -> addressControl.getMessageCount() == 0);
assertNotNull("addressControl for temp advisory", addressControl);
Wait.assertEquals(0, addressControl::getMessageCount);
}
}
//sleep a bit to allow message count to go down.
} finally {
for (Connection conn : connections) {
if (conn != null) {
conn.close();
}
}
}
}
@Test(timeout = 30000)
public void testDeleteTemporaryQueue() throws Exception {
Connection connection = createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final javax.jms.Queue queue = session.createTemporaryQueue();
assertNotNull(queue);
assertTrue(queue instanceof TemporaryQueue);
Queue queueView = getProxyToQueue(queue.getQueueName());
assertNotNull(queueView);
TemporaryQueue tempQueue = (TemporaryQueue) queue;
tempQueue.delete();
assertTrue("Temp Queue should be deleted.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return getProxyToQueue(queue.getQueueName()) == null;
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
} finally {
connection.close();
}
}
@Test
public void testTempQueues() throws Exception {
TemporaryQueue temp = localSession.createTemporaryQueue();
MessageProducer producer = localSession.createProducer(temp);
producer.send(localSession.createTextMessage("test"));
Thread.sleep(100);
assertEquals("Destination not created", 1, remoteBroker.getAdminView().getTemporaryQueues().length);
temp.delete();
Thread.sleep(100);
assertEquals("Destination not deleted", 0, remoteBroker.getAdminView().getTemporaryQueues().length);
}
@Test
public void testTemporaryQueueDeleteWithConsumer() throws Exception {
Connection conn = null;
try {
conn = createConnection();
Session producerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session consumerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = producerSession.createTemporaryQueue();
MessageConsumer consumer = consumerSession.createConsumer(tempQueue);
try {
tempQueue.delete();
ProxyAssertSupport.fail("Should throw JMSException");
} catch (JMSException e) {
// Should fail - you can't delete a temp queue if it has active consumers
}
consumer.close();
} finally {
if (conn != null) {
conn.close();
}
}
}
public void testLoadTempAdvisoryQueues() throws Exception {
for (int i = 0; i < MESSAGE_COUNT; i++) {
TemporaryQueue tempQueue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(tempQueue);
MessageProducer producer = session.createProducer(tempQueue);
consumer.close();
producer.close();
tempQueue.delete();
}
AdvisoryBroker ab = (AdvisoryBroker) broker.getBroker().getAdaptor(AdvisoryBroker.class);
assertTrue(ab.getAdvisoryDestinations().size() == 0);
assertTrue(ab.getAdvisoryConsumers().size() == 0);
assertTrue(ab.getAdvisoryProducers().size() == 0);
RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(RegionBroker.class);
for (Destination dest : rb.getDestinationMap().values()) {
LOG.debug("Destination: {}", dest);
}
// there should be at least 2 destinations - advisories -
// 1 for the connection + 1 generic ones
assertTrue("Should be at least 2 destinations", rb.getDestinationMap().size() > 2);
}
public void testSendToRemovedTemp() throws Exception {
ActiveMQQueue requestReplyDest = new ActiveMQQueue("RequestReply");
NetworkConnector nc = bridgeBrokers("BrokerA", "BrokerB");
if (useDuplex) {
nc.setDuplex(true);
} else {
bridgeBrokers("BrokerB", "BrokerA");
}
// destination advisory can loose the race with message dispatch, so we need to allow replies on network broker
// to work in the absence of an advisory, the destination will be cleaned up in the normal
// way
if (!useDuplex) {
brokers.get("BrokerB").broker.setAllowTempAutoCreationOnSend(true);
}
TransportConnector forClient = brokers.get("BrokerA").broker.addConnector("tcp://localhost:0");
startAllBrokers();
waitForBridgeFormation();
waitForMinTopicRegionConsumerCount("BrokerB", 1);
waitForMinTopicRegionConsumerCount("BrokerA", 1);
ConnectionFactory factory = new ActiveMQConnectionFactory(forClient.getConnectUri());
ActiveMQConnection conn = (ActiveMQConnection) factory.createConnection();
conn.setWatchTopicAdvisories(false);
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ConnectionFactory replyFactory = getConnectionFactory("BrokerB");
for (int i = 0; i < 500; i++) {
TemporaryQueue tempDest = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(requestReplyDest);
javax.jms.Message message = session.createTextMessage("req-" + i);
message.setJMSReplyTo(tempDest);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(tempDest);
producer.send(message);
ActiveMQConnection replyConnection = (ActiveMQConnection) replyFactory.createConnection();
replyConnection.setWatchTopicAdvisories(false);
replyConnection.start();
Session replySession = replyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer replyConsumer = (ActiveMQMessageConsumer) replySession.createConsumer(requestReplyDest);
javax.jms.Message msg = replyConsumer.receive(10000);
assertNotNull("request message not null: " + i, msg);
MessageProducer replyProducer = replySession.createProducer(msg.getJMSReplyTo());
replyProducer.send(session.createTextMessage("reply-" + i));
replyConnection.close();
javax.jms.Message reply = consumer.receive(10000);
assertNotNull("reply message : " + i + ", to: " + tempDest + ", by consumer:" + consumer.getConsumerId(), reply);
consumer.close();
tempDest.delete();
}
}
public void testRequestReply() throws Exception {
ActiveMQQueue requestReplyDest = new ActiveMQQueue("RequestReply");
startAllBrokers();
waitForBridgeFormation();
waitForMinTopicRegionConsumerCount("sender", 1);
waitForMinTopicRegionConsumerCount("receiver", 1);
ConnectionFactory factory = getConnectionFactory("sender");
ActiveMQConnection conn = (ActiveMQConnection) factory.createConnection("system", "manager");
conn.setWatchTopicAdvisories(false);
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ConnectionFactory replyFactory = getConnectionFactory("receiver");
for (int i = 0; i < 2000; i++) {
TemporaryQueue tempDest = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(requestReplyDest);
javax.jms.Message message = session.createTextMessage("req-" + i);
message.setJMSReplyTo(tempDest);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(tempDest);
producer.send(message);
ActiveMQConnection replyConnection = (ActiveMQConnection) replyFactory.createConnection("system", "manager");
replyConnection.setWatchTopicAdvisories(false);
replyConnection.start();
Session replySession = replyConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer replyConsumer = (ActiveMQMessageConsumer) replySession.createConsumer(requestReplyDest);
javax.jms.Message msg = replyConsumer.receive(10000);
assertNotNull("request message not null: " + i, msg);
MessageProducer replyProducer = replySession.createProducer(msg.getJMSReplyTo());
replyProducer.send(session.createTextMessage("reply-" + i));
replyConnection.close();
javax.jms.Message reply = consumer.receive(10000);
assertNotNull("reply message : " + i + ", to: " + tempDest + ", by consumer:" + consumer.getConsumerId(), reply);
consumer.close();
tempDest.delete();
LOG.info("message #" + i + " processed");
}
}
@Test
public void testDuplicateCacheCleanupForTempQueues() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
servers[0].getConfiguration().getClusterConfigurations().get(0).setDuplicateDetection(true);
servers[0].getAddressSettingsRepository().addMatch("#", new AddressSettings().setRedistributionDelay(0));
setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
servers[1].getConfiguration().getClusterConfigurations().get(0).setDuplicateDetection(true);
servers[1].getAddressSettingsRepository().addMatch("#", new AddressSettings().setRedistributionDelay(0));
startServers(0, 1);
final Map<String, TextMessage> requestMap = new HashMap<>();
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
for (int j = 0; j < 10; j++) {
try (Connection connection = cf.createConnection()) {
SimpleMessageListener server = new SimpleMessageListener().start();
Queue requestQueue = ActiveMQJMSClient.createQueue("exampleQueue");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(requestQueue);
TemporaryQueue replyQueue = session.createTemporaryQueue();
MessageConsumer replyConsumer = session.createConsumer(replyQueue);
int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
TextMessage requestMsg = session.createTextMessage("A request message");
requestMsg.setJMSReplyTo(replyQueue);
producer.send(requestMsg);
requestMap.put(requestMsg.getJMSMessageID(), requestMsg);
}
for (int i = 0; i < numMessages; i++) {
TextMessage replyMessageReceived = (TextMessage) replyConsumer.receive();
assertNotNull(requestMap.get(replyMessageReceived.getJMSCorrelationID()));
}
replyConsumer.close();
replyQueue.delete();
server.shutdown();
}
}
assertTrue(((PostOfficeImpl) servers[0].getPostOffice()).getDuplicateIDCaches().size() <= 1);
assertTrue(((PostOfficeImpl) servers[1].getPostOffice()).getDuplicateIDCaches().size() <= 1);
}
/**
* Make sure you cannot publish to a temp destination that does not exist
* anymore.
*
* @throws JMSException
* @throws InterruptedException
*/
@Test
public void testPublishFailsForDestroyedTempDestination() throws Exception {
Connection tempConnection = factory.createConnection();
connections.add(tempConnection);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
//In artemis, if you send a message to a topic where the consumer isn't there yet,
//message will get lost. So the create temp queue request has to happen
//after the connection is started (advisory consumer registered).
Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final TemporaryQueue queue = tempSession.createTemporaryQueue();
final ActiveMQConnection activeMQConnection = (ActiveMQConnection) connection;
Assert.assertTrue("creation advisory received in time with async dispatch", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return activeMQConnection.activeTempDestinations.containsKey(queue);
}
}));
// This message delivery should work since the temp connection is still
// open.
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = session.createTextMessage("First");
producer.send(message);
// deleting the Queue will cause sends to fail
queue.delete();
Thread.sleep(5000); // Wait a little bit to let the delete take effect.
// This message delivery NOT should work since the temp connection is
// now closed.
try {
message = session.createTextMessage("Hello");
producer.send(message);
Assert.fail("Send should fail since temp destination should not exist anymore.");
} catch (JMSException e) {
Assert.assertTrue("failed to throw an exception", true);
}
}
@Test
public void testTemporaryQueueDeleted() throws Exception {
Connection conn = null;
try {
conn = createConnection();
Session producerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session consumerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Make sure temporary queue cannot be used after it has been deleted
TemporaryQueue tempQueue = producerSession.createTemporaryQueue();
MessageProducer producer = producerSession.createProducer(tempQueue);
MessageConsumer consumer = consumerSession.createConsumer(tempQueue);
conn.start();
final String messageText = "This is a message";
Message m = producerSession.createTextMessage(messageText);
producer.send(m);
TextMessage m2 = (TextMessage) consumer.receive(2000);
ProxyAssertSupport.assertNotNull(m2);
ProxyAssertSupport.assertEquals(messageText, m2.getText());
consumer.close();
tempQueue.delete();
conn.close();
conn = createConnection("guest", "guest");
try {
producer.send(m);
ProxyAssertSupport.fail();
} catch (JMSException e) {
}
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
public void testTemporaryQueueLeak() throws Exception {
ActiveMQConnection conn = null;
try {
conn = (ActiveMQConnection) createConnection();
Session producerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session consumerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = producerSession.createTemporaryQueue();
MessageProducer producer = producerSession.createProducer(tempQueue);
MessageConsumer consumer = consumerSession.createConsumer(tempQueue);
conn.start();
final String messageText = "This is a message";
javax.jms.Message m = producerSession.createTextMessage(messageText);
producer.send(m);
TextMessage m2 = (TextMessage) consumer.receive(2000);
assertNotNull(m2);
assertEquals(messageText, m2.getText());
consumer.close();
assertTrue(((ActiveMQDestination) tempQueue).isCreated());
tempQueue.delete();
assertFalse(((ActiveMQDestination) tempQueue).isCreated());
assertFalse(conn.containsTemporaryQueue(SimpleString.toSimpleString(tempQueue.getQueueName())));
} finally {
if (conn != null) {
conn.close();
}
}
}