下面列出了javax.jms.Message#setJMSReplyTo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Actually execute the given request, sending the invoker request message
* to the specified target queue and waiting for a corresponding response.
* <p>The default implementation is based on standard JMS send/receive,
* using a {@link javax.jms.TemporaryQueue} for receiving the response.
* @param session the JMS Session to use
* @param queue the resolved target Queue to send to
* @param requestMessage the JMS Message to send
* @return the RemoteInvocationResult object
* @throws JMSException in case of JMS failure
*/
@Nullable
protected Message doExecuteRequest(Session session, Queue queue, Message requestMessage) throws JMSException {
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(queue);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
producer.send(requestMessage);
long timeout = getReceiveTimeout();
return (timeout > 0 ? consumer.receive(timeout) : consumer.receive());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
/**
* Test that if the JMS ReplyTo header field has been set as a <code>TemporaryQueue</code>,
* it will be rightly get also as a <code>TemporaryQueue</code>
* (and not only as a <code>Queue</code>).
*/
@Test
public void testJMSReplyTo_2() {
try {
TemporaryQueue tempQueue = senderSession.createTemporaryQueue();
Message message = senderSession.createMessage();
message.setJMSReplyTo(tempQueue);
sender.send(message);
Message msg = receiver.receive(TestConfig.TIMEOUT);
Destination dest = msg.getJMSReplyTo();
Assert.assertTrue("JMS ReplyTo header field should be a TemporaryQueue", dest instanceof TemporaryQueue);
Queue replyTo = (Queue) dest;
Assert.assertEquals("JMS ReplyTo header field should be equals to the temporary queue", replyTo.getQueueName(), tempQueue.getQueueName());
} catch (JMSException e) {
fail(e);
}
}
private void doSend(Destination destination, Message message) throws JMSException {
if (message == null) {
throw new MessageFormatException("Message must not be null");
}
for (Map.Entry<String, Object> entry : messageProperties.entrySet()) {
message.setObjectProperty(entry.getKey(), entry.getValue());
}
if (correlationId != null) {
message.setJMSCorrelationID(correlationId);
}
if (correlationIdBytes != null) {
message.setJMSCorrelationIDAsBytes(correlationIdBytes);
}
if (type != null) {
message.setJMSType(type);
}
if (replyTo != null) {
message.setJMSReplyTo(replyTo);
}
session.send(producer, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryDelay, completionListener);
}
private void run() throws JMSException {
Message message = remoteBroker.session.createMessage();
if (requestId != null) {
message.setJMSReplyTo(remoteBroker.replyQueue);
message.setJMSCorrelationID(requestId);
}
message.setStringProperty("source", coordinatorAddress().toString());
message.setStringProperty("target", target.toString());
message.setStringProperty("payload", OperationCodec.toJson(op));
message.setIntProperty("operationType", getOperationType(op).toInt());
switch (target.getAddressLevel()) {
case AGENT:
remoteBroker.agentProducer.send(message);
break;
case WORKER:
remoteBroker.workerProducer.send(message);
break;
default:
throw new RuntimeException("unhandled target:" + target);
}
}
/**
* Actually execute the given request, sending the invoker request message
* to the specified target queue and waiting for a corresponding response.
* <p>The default implementation is based on standard JMS send/receive,
* using a {@link javax.jms.TemporaryQueue} for receiving the response.
* @param session the JMS Session to use
* @param queue the resolved target Queue to send to
* @param requestMessage the JMS Message to send
* @return the RemoteInvocationResult object
* @throws JMSException in case of JMS failure
*/
protected Message doExecuteRequest(Session session, Queue queue, Message requestMessage) throws JMSException {
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(queue);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
producer.send(requestMessage);
long timeout = getReceiveTimeout();
return (timeout > 0 ? consumer.receive(timeout) : consumer.receive());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
public void testBrokerStats() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
Queue query = session.createQueue(StatisticsBroker.STATS_BROKER_PREFIX);
MessageProducer producer = session.createProducer(query);
Message msg = session.createMessage();
msg.setJMSReplyTo(replyTo);
producer.send(msg);
MapMessage reply = (MapMessage) consumer.receive(10 * 1000);
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
assertTrue(reply.getJMSTimestamp() > 0);
assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());
/*
for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
String name = e.nextElement().toString();
System.err.println(name+"="+reply.getObject(name));
}
*/
}
@Test
public void testBrowseWithSelector() throws Exception {
Connection connection = createConnection();
// Setup the scheduled Message
scheduleMessage(connection, TimeUnit.SECONDS.toMillis(9));
scheduleMessage(connection, TimeUnit.SECONDS.toMillis(10));
scheduleMessage(connection, TimeUnit.SECONDS.toMillis(5));
scheduleMessage(connection, TimeUnit.SECONDS.toMillis(45));
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the Browse Destination and the Reply To location
Destination requestBrowse = session.createTopic(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION);
Destination browseDest = session.createTemporaryTopic();
// Create the "Browser"
MessageConsumer browser = session.createConsumer(browseDest, ScheduledMessage.AMQ_SCHEDULED_DELAY + " = 45000");
connection.start();
// Send the browse request
MessageProducer producer = session.createProducer(requestBrowse);
Message request = session.createMessage();
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE);
request.setJMSReplyTo(browseDest);
producer.send(request);
// Now try and receive the one we selected
Message message = browser.receive(5000);
assertNotNull(message);
assertEquals(45000, message.getLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY));
// Now check if there are anymore, there shouldn't be
message = browser.receive(5000);
assertNull(message);
}
@Override
public JMSProducer send(Destination destination, Message message) {
if (message == null) {
throw new MessageFormatRuntimeException("null message");
}
try {
if (jmsHeaderCorrelationID != null) {
message.setJMSCorrelationID(jmsHeaderCorrelationID);
}
if (jmsHeaderCorrelationIDAsBytes != null && jmsHeaderCorrelationIDAsBytes.length > 0) {
message.setJMSCorrelationIDAsBytes(jmsHeaderCorrelationIDAsBytes);
}
if (jmsHeaderReplyTo != null) {
message.setJMSReplyTo(jmsHeaderReplyTo);
}
if (jmsHeaderType != null) {
message.setJMSType(jmsHeaderType);
}
// XXX HORNETQ-1209 "JMS 2.0" can this be a foreign msg?
// if so, then "SimpleString" properties will trigger an error.
setProperties(message);
if (completionListener != null) {
CompletionListener wrapped = new CompletionListenerWrapper(completionListener);
producer.send(destination, message, wrapped);
} else {
producer.send(destination, message);
}
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
return this;
}
protected void prepareMessage(final Message m) throws JMSException {
m.setBooleanProperty("booleanProperty", true);
m.setByteProperty("byteProperty", (byte) 3);
m.setDoubleProperty("doubleProperty", 4.0);
m.setFloatProperty("floatProperty", 5.0f);
m.setIntProperty("intProperty", 6);
m.setLongProperty("longProperty", 7);
m.setShortProperty("shortProperty", (short) 8);
m.setStringProperty("stringProperty", "this is a String property");
m.setJMSCorrelationID("this is the correlation ID");
m.setJMSReplyTo(ActiveMQServerTestCase.topic1);
m.setJMSType("someArbitraryType");
}
@Test
public void testJMSDestinationNull() throws Exception {
Message m = queueProducerSession.createMessage();
m.setJMSReplyTo(null);
queueProducer.send(m);
queueConsumer.receive();
ProxyAssertSupport.assertNull(m.getJMSReplyTo());
}
public static Message sendMessageWithReplyTo(final Session session,
final Destination destination,
final String replyTo) throws JMSException {
MessageProducer producer = session.createProducer(destination);
Message message = session.createMessage();
message.setJMSReplyTo(ActiveMQJMSClient.createQueue(replyTo));
producer.send(message);
return message;
}
private Message createMessage(final Session jmsSession, final ProcessContext context, final byte[] messageContent,
final FlowFile flowFile, final Destination replyToQueue, final Integer priority) throws JMSException {
final Message message;
switch (context.getProperty(MESSAGE_TYPE).getValue()) {
case MSG_TYPE_EMPTY: {
message = jmsSession.createTextMessage("");
break;
}
case MSG_TYPE_STREAM: {
final StreamMessage streamMessage = jmsSession.createStreamMessage();
streamMessage.writeBytes(messageContent);
message = streamMessage;
break;
}
case MSG_TYPE_TEXT: {
message = jmsSession.createTextMessage(new String(messageContent, UTF8));
break;
}
case MSG_TYPE_MAP: {
message = jmsSession.createMapMessage();
break;
}
case MSG_TYPE_BYTE:
default: {
final BytesMessage bytesMessage = jmsSession.createBytesMessage();
bytesMessage.writeBytes(messageContent);
message = bytesMessage;
}
}
message.setJMSTimestamp(System.currentTimeMillis());
if (replyToQueue != null) {
message.setJMSReplyTo(replyToQueue);
}
if (priority != null) {
message.setJMSPriority(priority);
}
if (context.getProperty(ATTRIBUTES_TO_JMS_PROPS).asBoolean()) {
copyAttributesToJmsProps(flowFile, message);
}
return message;
}
private Message createMessage(final Session jmsSession, final ProcessContext context, final byte[] messageContent,
final FlowFile flowFile, final Destination replyToQueue, final Integer priority) throws JMSException {
final Message message;
switch (context.getProperty(MESSAGE_TYPE).getValue()) {
case MSG_TYPE_EMPTY: {
message = jmsSession.createTextMessage("");
break;
}
case MSG_TYPE_STREAM: {
final StreamMessage streamMessage = jmsSession.createStreamMessage();
streamMessage.writeBytes(messageContent);
message = streamMessage;
break;
}
case MSG_TYPE_TEXT: {
message = jmsSession.createTextMessage(new String(messageContent, UTF8));
break;
}
case MSG_TYPE_MAP: {
message = jmsSession.createMapMessage();
break;
}
case MSG_TYPE_BYTE:
default: {
final BytesMessage bytesMessage = jmsSession.createBytesMessage();
bytesMessage.writeBytes(messageContent);
message = bytesMessage;
}
}
message.setJMSTimestamp(System.currentTimeMillis());
if (replyToQueue != null) {
message.setJMSReplyTo(replyToQueue);
}
if (priority != null) {
message.setJMSPriority(priority);
}
if (context.getProperty(ATTRIBUTES_TO_JMS_PROPS).asBoolean()) {
copyAttributesToJmsProps(flowFile, message);
}
return message;
}
private void doSendMessageOnConnectionWithPrefixTestImpl(Class<? extends Destination> destType,
String destPrefix,
String destName,
String destAddress,
Byte destTypeAnnotationValue) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = null;
if (destType == Topic.class) {
connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix);
} else if (destType == Queue.class) {
connection = testFixture.establishConnecton(testPeer, "?jms.queuePrefix=" + destPrefix);
} else {
// Set both the non-temporary prefixes, we wont use non-temp dests but want to ensure they don't affect anything
connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix=" + destPrefix + "&jms.queuePrefix=" + destPrefix);
}
connection.start();
// Set the prefix if Topic or Queue dest type.
if (destType == Topic.class) {
((JmsConnection) connection).setTopicPrefix(destPrefix);
} else if (destType == Queue.class) {
((JmsConnection) connection).setQueuePrefix(destPrefix);
}
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination
Destination dest = null;
if (destType == Topic.class) {
dest = session.createTopic(destName);
} else if (destType == Queue.class) {
dest = session.createQueue(destName);
} else if (destType == TemporaryTopic.class) {
testPeer.expectTempTopicCreationAttach(destAddress);
dest = session.createTemporaryTopic();
} else if (destType == TemporaryQueue.class) {
testPeer.expectTempQueueCreationAttach(destAddress);
dest = session.createTemporaryQueue();
}
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(destAddress));
testPeer.expectSenderAttach(targetMatcher, false, false);
MessageProducer producer = session.createProducer(dest);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL.toString()), equalTo(destTypeAnnotationValue));
msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL.toString()), equalTo(destTypeAnnotationValue));
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
propsMatcher.withTo(equalTo(destAddress));
propsMatcher.withReplyTo(equalTo(destAddress));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
//TODO: currently we aren't sending any body section, decide if this is allowed
//messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
testPeer.expectTransfer(messageMatcher);
Message message = session.createMessage();
message.setJMSReplyTo(dest);
producer.send(message);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
public Message request(Message message) throws JMSException {
message.setJMSReplyTo(getTemporaryQueue());
getSender().send(message);
return getReceiver().receive();
}
public Message request(Message message, long timeout) throws JMSException {
message.setJMSReplyTo(getTemporaryQueue());
getSender().send(message);
return getReceiver().receive(timeout);
}
/**
* Tests that the {@link AmqpDestinationHelper#JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL} is set as a byte on
* a sent message to indicate its 'reply-to' address represents a Topic JMSDestination.
*
* @throws Exception if an error occurs during the test.
*/
@Test(timeout = 20000)
public void testSentMessageContainsReplyToTypeAnnotationByte() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
String replyTopicName = "myReplyTopic";
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
Symbol annotationKey = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL;
msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_TYPE));
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withReplyTo(equalTo(replyTopicName));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
testPeer.expectTransfer(messageMatcher);
Topic replyTopic = session.createTopic(replyTopicName);
Message message = session.createMessage();
message.setJMSReplyTo(replyTopic);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.send(message);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
private Message doSearchRequest(final Session session, final Message message) throws JMSException {
final Destination replyQueue = acquireTempQueue(session);
if (log.isDebugEnabled()) {
log.debug("doSearchRequest replyQueue=" + replyQueue);
}
try {
final MessageConsumer responseConsumer = session.createConsumer(replyQueue);
message.setJMSReplyTo(replyQueue);
final String correlationId = createRandomString();
message.setJMSCorrelationID(correlationId);
final MessageProducer producer = session.createProducer(searchQueue_);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setTimeToLive(timeToLive_);
if (log.isDebugEnabled()) {
log.debug("Sending search request message with correlationId=" + correlationId);
}
producer.send(message);
producer.close();
Message returnedMessage = null;
final long start = System.currentTimeMillis();
while (true) {
final long diff = (start + receiveTimeout_) - System.currentTimeMillis();
if (diff <= 0) {
// timeout
log.info("Timeout in search. Remaining time zero or negative.");
break;
}
if (log.isDebugEnabled()) {
log.debug("doSearchRequest: call receive with timeout=" + diff);
}
returnedMessage = responseConsumer.receive(diff);
if (returnedMessage == null) {
// timeout case, we're stopping now with a reply...
log.info("Timeout in search. Reply was null.");
break;
} else if (!correlationId.equals(returnedMessage.getJMSCorrelationID())) {
// we got an old reply from a previous search request
log.info("Got a response with a wrong correlationId. Ignoring and waiting for the next");
continue;
} else {
// we got a valid reply
break;
}
}
responseConsumer.close();
if (log.isDebugEnabled()) {
log.debug("doSearchRequest: returnedMessage=" + returnedMessage);
}
return returnedMessage;
} finally {
releaseTempQueue(replyQueue);
}
}
@Test
public void testRequestResponseSuccess() throws Exception
{
String queueName = getTestName();
Queue queue = createQueue(queueName);
String groupName = "messaging-users";
createGroupProvider(groupName, USER1, USER2);
configureACL(String.format("ACL ALLOW-LOG %s ACCESS VIRTUALHOST", groupName),
String.format("ACL ALLOW-LOG %s CONSUME QUEUE name=\"%s\"", USER1, queueName),
String.format("ACL ALLOW-LOG %s CONSUME QUEUE temporary=true", USER2),
String.format("ACL ALLOW-LOG %s CREATE QUEUE temporary=true", USER2),
isLegacyClient() ?
String.format("ACL ALLOW-LOG %s BIND EXCHANGE name=\"amq.direct\" temporary=true", USER2) :
String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"\" routingKey=\"TempQueue*\"", USER1),
isLegacyClient() ?
String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"amq.direct\" routingKey=\"%s\"", USER2, queueName) :
String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"\" routingKey=\"%s\"", USER2, queueName),
isLegacyClient() ? String.format("ACL ALLOW-LOG %s CREATE QUEUE name=\"%s\"", USER1, queueName) : "",
isLegacyClient() ? String.format("ACL ALLOW-LOG %s BIND EXCHANGE", USER1) : "",
isLegacyClient() ? String.format("ACL ALLOW-LOG %s PUBLISH EXCHANGE name=\"amq.direct\" routingKey=\"TempQueue*\"", USER1) : ""
);
Connection responderConnection = getConnectionBuilder().setUsername(USER1).setPassword(USER1_PASSWORD).build();
try
{
Session responderSession = responderConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer requestConsumer = responderSession.createConsumer(queue);
responderConnection.start();
Connection requesterConnection = getConnectionBuilder().setUsername(USER2).setPassword(USER2_PASSWORD).build();
try
{
Session requesterSession = requesterConnection.createSession(true, Session.SESSION_TRANSACTED);
Queue responseQueue = requesterSession.createTemporaryQueue();
MessageConsumer responseConsumer = requesterSession.createConsumer(responseQueue);
requesterConnection.start();
Message request = requesterSession.createTextMessage("Request");
request.setJMSReplyTo(responseQueue);
requesterSession.createProducer(queue).send(request);
requesterSession.commit();
Message receivedRequest = requestConsumer.receive(getReceiveTimeout());
assertNotNull("Request is not received", receivedRequest);
assertNotNull("Request should have Reply-To", receivedRequest.getJMSReplyTo());
MessageProducer responder = responderSession.createProducer(receivedRequest.getJMSReplyTo());
responder.send(responderSession.createTextMessage("Response"));
responderSession.commit();
Message receivedResponse = responseConsumer.receive(getReceiveTimeout());
requesterSession.commit();
assertNotNull("Response is not received", receivedResponse);
assertEquals("Unexpected response is received", "Response", ((TextMessage) receivedResponse).getText());
}
finally
{
requesterConnection.close();
}
}
finally
{
responderConnection.close();
}
}
/**
* So we network three brokers together, and send a message with request-reply semantics.
* The message goes to an echo service listening on broker C. We send a message on a queue
* to broker A which gets demand forwarded to broker C. the echo service will respond to the
* temp destination listed in the JMSReplyTo header. that will get demand forwarded back to
* broker A. When the consumer of the temp dest on broker A closes, that subscription should
* be removed on broker A. advisories firing from broker A to broker B should remove that
* subscription on broker B. advisories firing from broker B to broker C should remove that
* subscription on broker C.
*
* @throws Exception
*/
public void testSubscriptionsCleanedUpRace() throws Exception {
final BrokerItem brokerA = brokers.get(BROKER_A);
Runnable tester = new Runnable() {
@Override
public void run() {
for (int i = 0; i < NUM_ITER; i++) {
Connection conn = null;
try {
conn = brokerA.createConnection();
conn.start();
final Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = sess.createQueue(ECHO_QUEUE_NAME);
MessageProducer producer = sess.createProducer(destination);
LOG.info("Starting iter: " + i);
Destination replyTo = sess.createTemporaryQueue();
MessageConsumer responseConsumer = sess.createConsumer(replyTo);
Message message = sess.createTextMessage("Iteration: " + i);
message.setJMSReplyTo(replyTo);
producer.send(message);
TextMessage response = (TextMessage) responseConsumer.receive(CONSUME_TIMEOUT);
assertNotNull("We should have gotten a response, but didn't for iter: " + i, response);
assertEquals("We got the wrong response from the echo service", "Iteration: " + i, response.getText());
// so we close the consumer so that an actual RemoveInfo command gets propagated through the
// network
responseConsumer.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
fail();
}
}
}
};
ExecutorService threadService = Executors.newFixedThreadPool(2);
threadService.submit(tester);
threadService.submit(tester);
threadService.shutdown();
assertTrue("executor done on time", threadService.awaitTermination(30L, TimeUnit.SECONDS));
// for the real test... we should not have any subscriptions left on broker C for the temp dests
BrokerItem brokerC = brokers.get(BROKER_C);
RegionBroker regionBroker = (RegionBroker) brokerC.broker.getRegionBroker();
final AbstractRegion region = (AbstractRegion) regionBroker.getTempQueueRegion();
assertTrue("There were no lingering temp-queue destinations", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("Lingering temps: " + region.getSubscriptions().size());
return 0 == region.getSubscriptions().size();
}
}));
}