下面列出了javax.jms.Topic#javax.jms.Queue 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void onMessage(Message message) {
try {
TextMessage receiveMessage = (TextMessage) message;
String keys = receiveMessage.getText();
LOGGER.info("keys = " + keys);
MapMessage returnMess = session.createMapMessage();
returnMess.setStringProperty("/a2/m1", "zhaohui");
returnMess.setStringProperty("/a3/m1/v2", "nanjing");
returnMess.setStringProperty("/a3/m1/v2/t2", "zhaohui");
QueueSender sender = session.createSender((Queue) message.getJMSReplyTo());
sender.send(returnMess);
} catch (Exception e) {
LOGGER.error("onMessage error", e);
}
}
@Test
public void testTransactionalSimple() throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
producer.send(session.createTextMessage("test"));
session.commit();
Assert.assertNull(consumer.receive(100));
connection.start();
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertEquals("test", message.getText());
Assert.assertNotNull(message);
message.acknowledge();
}
}
private void sendMultipleTextMessagesUsingCoreJms(String queueName, String text, int num) throws Exception {
Connection jmsConn = null;
try {
jmsConn = coreCf.createConnection();
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < num; i++) {
TextMessage msg = session.createTextMessage(text + i);
producer.send(msg);
}
} finally {
if (jmsConn != null) {
jmsConn.close();
}
}
}
@Test
public void replyPayloadNoDestination() throws JMSException {
Queue replyDestination = mock(Queue.class);
Message<String> request = MessageBuilder.withPayload("Response").build();
Session session = mock(Session.class);
MessageProducer messageProducer = mock(MessageProducer.class);
TextMessage responseMessage = mock(TextMessage.class);
given(session.createTextMessage("Response")).willReturn(responseMessage);
given(session.createProducer(replyDestination)).willReturn(messageProducer);
MessagingMessageListenerAdapter listener =
getPayloadInstance(request, "replyPayloadNoDestination", Message.class);
listener.setDefaultResponseDestination(replyDestination);
listener.onMessage(mock(javax.jms.Message.class), session);
verify(session, times(0)).createQueue(anyString());
verify(session).createTextMessage("Response");
verify(messageProducer).send(responseMessage);
verify(messageProducer).close();
}
@Test
public void createReceiver() throws Exception
{
Queue queue = createQueue(getTestName());
QueueConnection queueConnection = getQueueConnection();
try
{
queueConnection.start();
Utils.sendMessages(queueConnection, queue, 3);
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver receiver = session.createReceiver(queue, String.format("%s=2", INDEX));
assertEquals("Queue names should match from QueueReceiver", queue.getQueueName(), receiver.getQueue().getQueueName());
Message received = receiver.receive(getReceiveTimeout());
assertNotNull("Message is not received", received);
assertEquals("Unexpected message is received", 2, received.getIntProperty(INDEX));
}
finally
{
queueConnection.close();
}
}
@Test
public void testGetEnumeration() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser.getEnumeration());
browser.close();
try {
browser.getEnumeration();
fail("Should not be able to use a closed browser");
} catch (IllegalStateException ise) {
}
}
public TextMessage testReplyWithJackson(String methodName, String replyContent) throws JMSException {
Queue replyDestination = mock(Queue.class);
Session session = mock(Session.class);
MessageProducer messageProducer = mock(MessageProducer.class);
TextMessage responseMessage = mock(TextMessage.class);
given(session.createTextMessage(replyContent)).willReturn(responseMessage);
given(session.createProducer(replyDestination)).willReturn(messageProducer);
MessagingMessageListenerAdapter listener = getPayloadInstance("Response", methodName, Message.class);
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setTargetType(MessageType.TEXT);
listener.setMessageConverter(messageConverter);
listener.setDefaultResponseDestination(replyDestination);
listener.onMessage(mock(javax.jms.Message.class), session);
verify(session, times(0)).createQueue(anyString());
verify(session).createTextMessage(replyContent);
verify(messageProducer).send(responseMessage);
verify(messageProducer).close();
return responseMessage;
}
@Test
public void sendAndReceiveEmpty() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage(null);
producer.send(message);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Message receivedMessage = consumer.receive(getReceiveTimeout());
assertTrue("TextMessage should be received", receivedMessage instanceof TextMessage);
assertNull("Unexpected body", ((TextMessage) receivedMessage).getText());
}
finally
{
connection.close();
}
}
private static void createJMSObjects(final int server) throws Exception {
// Step 1. Instantiate a JMS Connection Factory object from JNDI on server 1
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:" + (61616 + server));
// Step 2. We create a JMS Connection connection
connection = connectionFactory.createConnection();
// Step 3. We start the connection to ensure delivery occurs
connection.start();
// Step 4. We create a JMS Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 5. Look-up the JMS Queue object from JNDI
Queue queue = session.createQueue("exampleQueue");
// Step 6. We create a JMS MessageConsumer object
consumer = session.createConsumer(queue);
// Step 7. We create a JMS MessageProducer object
producer = session.createProducer(queue);
}
@Test
public void emptyQueue() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
assertFalse(enumeration.hasMoreElements());
}
finally
{
connection.close();
}
}
private String retrieveDestinationName(Destination destination, String headerName) {
String destinationName = null;
if (destination != null) {
try {
destinationName = (destination instanceof Queue) ? ((Queue) destination).getQueueName()
: ((Topic) destination).getTopicName();
} catch (JMSException e) {
this.processLog.warn("Failed to retrieve Destination name for '" + headerName + "' header", e);
}
}
return destinationName;
}
private void doGetObjectInstanceCreatesJmsQueueTestImpl(String nameAddressProp) throws Exception, JMSException {
Reference reference = createTestReference(JmsQueue.class.getName(), nameAddressProp, TEST_QUEUE_ADDRESS);
Object queue = referenceFactory.getObjectInstance(reference, mockName, mockContext, testEnvironment);
assertNotNull("Expected object to be created", queue);
assertEquals("Unexpected object type created", JmsQueue.class, queue.getClass());
assertEquals("Unexpected address", TEST_QUEUE_ADDRESS, ((JmsQueue) queue).getAddress());
assertEquals("Unexpected queue name", TEST_QUEUE_ADDRESS, ((Queue) queue).getQueueName());
}
/**
* Test SendBackMapMessages
*
* @throws Exception
*/
@Test
public void testSendBackMapMessages() throws Exception {
String replyQueueName = "testQueueReplyMap";
Properties jmsProperties = JMSTestsUtils.getJMSPropertiesForDestination(replyQueueName, PROVIDER_URL, true);
JMSBrokerController brokerController = new JMSBrokerController(PROVIDER_URL, jmsProperties);
try {
brokerController.startProcess();
Queue replyQueue = brokerController.connect(replyQueueName, true);
CachedJMSConnectionFactory cachedJMSConnectionFactory = new CachedJMSConnectionFactory(jmsProperties);
MessageContext messageContext = JMSTestsUtils.createMessageContext();
String correlationID = UUID.randomUUID().toString();
this.setSOAPEnvelopWithMapMessageBody(messageContext);
this.setTransportHeaders(((Axis2MessageContext) messageContext).getAxis2MessageContext(),
JMSConstants.JMS_MAP_MESSAGE, correlationID);
messageContext.setProperty(JMSConstants.JMS_COORELATION_ID, correlationID);
JMSReplySender replySender = new JMSReplySender(replyQueue, cachedJMSConnectionFactory, null, null);
replySender.sendBack(messageContext);
Message replyMsg = brokerController.receiveMessage(replyQueue);
Assert.assertNotNull("The reply message cannot be null", replyMsg);
Assert.assertEquals("The Message type of received message does not match", JMSConstants.JMS_MAP_MESSAGE,
replyMsg.getJMSType());
Assert.assertEquals("The Content of received message does not match", "10",
((ActiveMQMapMessage) replyMsg).getContentMap().get("Price"));
} finally {
brokerController.disconnect();
brokerController.stopProcess();
}
}
private void sendJms(String queue, String text, ConnectionFactory factory) throws JMSException {
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(queue);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage();
message.setText(text);
producer.send(message);
producer.close();
session.close();
connection.close();
}
public void testAsyncCallbackIsFaster() throws JMSException, InterruptedException {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getName());
// setup a consumer to drain messages..
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
}
});
// warmup...
for (int i = 0; i < 10; i++) {
benchmarkNonCallbackRate();
benchmarkCallbackRate();
}
double callbackRate = benchmarkCallbackRate();
double nonCallbackRate = benchmarkNonCallbackRate();
LOG.info(String.format("AsyncCallback Send rate: %,.2f m/s", callbackRate));
LOG.info(String.format("NonAsyncCallback Send rate: %,.2f m/s", nonCallbackRate));
// The async style HAS to be faster than the non-async style..
assertTrue("async rate[" + callbackRate + "] should beat non-async rate[" + nonCallbackRate + "]", callbackRate / nonCallbackRate > 1.5);
}
@Test
public void redelivered() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnectionBuilder().setPrefetch(1).build();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("A"));
producer.send(session.createTextMessage("B"));
session.commit();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Message message = consumer.receive(getReceiveTimeout());
assertTrue("TextMessage should be received", message instanceof TextMessage);
assertFalse("Unexpected JMSRedelivered after first receive", message.getJMSRedelivered());
assertEquals("Unexpected message content", "A", ((TextMessage) message).getText());
session.rollback();
message = consumer.receive(getReceiveTimeout());
assertTrue("TextMessage should be received", message instanceof TextMessage);
assertTrue("Unexpected JMSRedelivered after second receive", message.getJMSRedelivered());
assertEquals("Unexpected message content", "A", ((TextMessage) message).getText());
message = consumer.receive(getReceiveTimeout());
assertTrue("TextMessage should be received", message instanceof TextMessage);
assertFalse("Unexpected JMSRedelivered for second message", message.getJMSRedelivered());
assertEquals("Unexpected message content", "B", ((TextMessage) message).getText());
session.commit();
}
finally
{
connection.close();
}
}
@Test
public void testQueueSendReceiveNoWaitOnTracedThread() throws Exception {
receiveNoWaitFlow.set(true);
if (!brokerFacade.shouldTestReceiveNoWait()) {
return;
}
final Queue queue = createTestQueue();
doTestSendReceiveOnTracedThread(() -> brokerFacade.receiveNoWait(queue), queue, true, false);
}
public void basicSendReceive(String uri) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
String body = "hello world!";
Queue destination = session.createQueue("TEST");
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage(body));
MessageConsumer consumer = session.createConsumer(destination);
Message received = consumer.receive(2000);
TestCase.assertEquals(body, ((TextMessage) received).getText());
}
private void scheduleRepeating(Connection connection) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test.queue");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("test msg");
long time = 1000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, -1);
producer.send(message);
producer.close();
}
@Test(timeout = 60000)
public void testUnAckedMessageAreNotConsumedOnSessionClose() throws Exception {
connection = createAmqpConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(name.getMethodName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Hello"));
final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
assertEquals(1, proxy.getQueueSize());
// Consume the message...but don't ack it.
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(3000);
assertNotNull(msg);
session.close();
assertEquals(1, proxy.getQueueSize());
// Consume the message...and this time we ack it.
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
consumer = session.createConsumer(queue);
msg = consumer.receive(3000);
assertNotNull(msg);
msg.acknowledge();
assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return proxy.getQueueSize() == 0;
}
}));
}
@Override
public long flush() throws MailQueueException {
boolean first = true;
long count = 0;
try (Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
Queue queue = session.createQueue(queueName.asString());
try (MessageConsumer consumer = session.createConsumer(queue)) {
try (MessageProducer producer = session.createProducer(queue)) {
Message message = null;
while (first || message != null) {
if (first) {
// give the consumer 2000 ms to receive messages
message = consumer.receive(2000);
} else {
message = consumer.receiveNoWait();
}
first = false;
if (message != null) {
Message m = copy(session, message);
m.setBooleanProperty(FORCE_DELIVERY, true);
producer.send(m, message.getJMSDeliveryMode(), message.getJMSPriority(), message.getJMSExpiration());
count++;
}
}
session.commit();
return count;
}
}
} catch (Exception e) {
LOGGER.error("Unable to flush mail", e);
throw new MailQueueException("Unable to get size of queue " + queueName.asString(), e);
}
}
private void scheduleOneShot(Connection connection) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test.queue");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("test msg");
long time = TimeUnit.SECONDS.toMillis(30);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 0);
producer.send(message);
producer.close();
}
@Test
public void testWithString() throws JMSException {
WeldContainer container = prepare();
RawMessageConsumerBean bean = container.select(RawMessageConsumerBean.class).get();
assertThat(bean.messages()).isEmpty();
Queue q = jms.createQueue("queue-one");
JMSProducer producer = jms.createProducer();
TextMessage message = jms.createTextMessage("hello");
message.setStringProperty("string", "value");
message.setBooleanProperty("bool", true);
message.setLongProperty("long", 100L);
message.setByteProperty("byte", (byte) 5);
message.setFloatProperty("float", 5.5f);
message.setDoubleProperty("double", 10.3);
message.setIntProperty("int", 23);
message.setObjectProperty("object", "yop");
message.setShortProperty("short", (short) 3);
producer.send(q, message);
await().until(() -> bean.messages().size() == 1);
IncomingJmsMessage<?> incomingJmsMessage = bean.messages().get(0);
IncomingJmsMessageMetadata metadata = incomingJmsMessage.getMetadata(IncomingJmsMessageMetadata.class)
.orElseThrow(() -> new AssertionError("Metadata expected"));
assertThat(incomingJmsMessage.getPayload()).isEqualTo("hello");
assertThat(metadata.getBody(String.class)).isEqualTo("hello");
assertThat(metadata.propertyExists("string")).isTrue();
assertThat(metadata.propertyExists("missing")).isFalse();
assertThat(metadata.getStringProperty("string")).isEqualTo("value");
assertThat(metadata.getBooleanProperty("bool")).isTrue();
assertThat(metadata.getLongProperty("long")).isEqualTo(100L);
assertThat(metadata.getByteProperty("byte")).isEqualTo((byte) 5);
assertThat(metadata.getFloatProperty("float")).isEqualTo(5.5f);
assertThat(metadata.getDoubleProperty("double")).isEqualTo(10.3);
assertThat(metadata.getIntProperty("int")).isEqualTo(23);
assertThat(metadata.getObjectProperty("object")).isInstanceOf(String.class);
assertThat(((String) message.getObjectProperty("object"))).isEqualTo("yop");
assertThat(message.getShortProperty("short")).isEqualTo((short) 3);
}
@Test
public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
final String destinationName = "fooQueue";
PublishJMS pubProc = new PublishJMS();
TestRunner runner = TestRunners.newTestRunner(pubProc);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, destinationName);
Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "foo");
attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf);
BytesMessage message = (BytesMessage) jmst.receive(destinationName);
byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
assertEquals("Hey dude!", new String(messageBytes));
assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName());
assertEquals("foo", message.getStringProperty("foo"));
}
protected void buildJMSResources() throws Exception {
if(connectedThreads==0){
initialConnectStartTime = new Date();
}
super.buildJMSResources();
synchronized(ReconnectTimer.class){
connectedThreads++;
}
if(connectedThreads == numThreads) {
Log.logger.log(Level.SEVERE, "All threads initially connected. Start/End times: {0} / {1}", new Object[] {
formatter.format(initialConnectStartTime),formatter.format(new Date())
});
}
// Open queues
if (destProducer == null) {
destProducer = jmsProvider.lookupQueue(destFactory.generateDestination(getThreadNum()), session).destination;
}
outMessage = msgFactory.createMessage(session, getName(), 0);
String selector = null;
// Use CorrelID Based Selector
if (Config.parms.getBoolean("co")) {
correlID = msgFactory.setJMSCorrelationID(this, outMessage);
}
if (correlID != null) {
StringBuffer sb = new StringBuffer("JMSCorrelationID='");
sb.append(correlID);
sb.append("'");
selector = sb.toString();
}
String destName = getDestinationName( destProducer );
Log.logger.log(Level.FINE, "Creating receiver on {0} selector:{1}", new Object[] {destName, selector});
System.out.println("Creating receiver on " + destName + " with selector: " + selector);
messageConsumer = session.createConsumer((Queue)destProducer, selector);
Log.logger.log(Level.FINE, "Creating sender on {0}", destName );
messageProducer = session.createProducer((Queue)destProducer );
}
private void sendConsumeDurableMessage() throws Exception {
try {
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue jmsQueue = s.createQueue(address.toString());
MessageProducer p = s.createProducer(jmsQueue);
p.setDeliveryMode(DeliveryMode.PERSISTENT);
conn.start();
p.send(s.createTextMessage("payload"));
} catch (JMSException expected) {
} finally {
if (conn != null) {
conn.close();
}
}
}
/**
* @throws Exception
*/
protected Queue createQueue(final String name) throws Exception {
jmsServer2.createQueue(false, name, null, true, "/queue/" + name);
jmsServer1.createQueue(false, name, null, true, "/queue/" + name);
assertTrue(waitForBindings(server1, name, false, 1, 0, 10000));
assertTrue(waitForBindings(server2, name, false, 1, 0, 10000));
return (Queue) context1.lookup("/queue/" + name);
}
private boolean putBackMessageOnIndexerQueue(Message message) {
if (message instanceof ObjectMessage) {
Session session = null;
Connection conn = null;
try {
Context jndiContext = getContext();
ConnectionFactory qFactory = (ConnectionFactory) jndiContext
.lookup("jms/bss/indexerQueueFactory");
conn = qFactory.createConnection();
Queue queue = (Queue) jndiContext
.lookup("jms/bss/indexerQueue");
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
ObjectMessage msg = session.createObjectMessage();
msg.setObject(((ObjectMessage) message).getObject());
producer.send(msg);
return true;
} catch (Throwable e) {
// This should not happen because the indexer queue is in the
// local server. If it happens, than there's something terribly
// wrong.
throw new SaaSSystemException(e);
} finally {
closeSession(session);
closeConnection(conn);
}
} else {
return false;
}
}
@Test
public void testAutoRollbackWithMissingRedeliveries() throws Exception {
LOG.info(this + " running test testAutoRollbackWithMissingRedeliveries");
broker = createBroker();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
try {
connection.start();
final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(destination);
produceMessage(producerSession, destination);
Message msg = consumer.receive(20000);
Assert.assertNotNull(msg);
broker.stop();
broker = createBroker();
// use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
broker.start();
try {
consumerSession.commit();
Assert.fail("expected transaction rolledback ex");
} catch (TransactionRolledBackException expected) {
}
broker.stop();
broker = createBroker();
broker.start();
Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
} finally {
connection.close();
}
}
private String getDestName(Message message) throws JMSException {
Destination replyTo = message.getJMSReplyTo();
if (replyTo instanceof Queue) {
return ((Queue)replyTo).getQueueName();
} else if (replyTo instanceof Topic) {
return ((Topic)replyTo).getTopicName();
}
return null;
}