下面列出了org.apache.log4j.spi.ErrorCode#javax.jms.Session 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
final MQTTClientProvider provider = getMQTTClientProvider();
initializeConnection(provider);
provider.subscribe("foo/+", AT_MOST_ONCE);
Connection connection = cf.createConnection();
connection.start();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic topic = s.createTopic(destinationName);
MessageProducer producer = s.createProducer(topic);
// send retained message from JMS
final byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
BytesMessage bytesMessage = s.createBytesMessage();
bytesMessage.writeBytes(bytes);
producer.send(bytesMessage);
byte[] message = provider.receive(10000);
assertNotNull("Should get retained message", message);
assertArrayEquals(bytes, message);
provider.disconnect();
connection.close();
}
@Test
public void sendToUnknownQueue() throws Exception
{
QueueConnection connection = ((QueueConnection) getConnectionBuilder().build());
try
{
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue invalidDestination = session.createQueue("unknown");
try
{
QueueSender sender = session.createSender(invalidDestination);
sender.send(session.createMessage());
fail("Exception not thrown");
}
catch (InvalidDestinationException e)
{
//PASS
}
}
finally
{
connection.close();
}
}
@Test(timeout = 20000)
public void testCloseSender() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectSenderAttach();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
testPeer.expectDetach(true, true, true);
testPeer.expectClose();
producer.close();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
/**
* Create a MessageConsumer. In this overloaded function the selector is taken into account.
* This ensures that listeners (or other extensions of this class) do not influence how the selector
* is used: when a correlationID should be in the filter the <code>getMessageConsumerForCorrelationId</code>
* should be used, other wise the <code>getMessageConsumer</code> function which has no attribute for
* <code>selector</code>. When a MessageSelector is set, it will be used when no correlation id is required.
* @param session the Session
* @param destination the Destination
* @param selector the MessageSelector
* @return MessageConsumer
*/
public MessageConsumer getMessageConsumer(Session session, Destination destination, String selector) throws NamingException, JMSException {
if (useTopicFunctions) {
if (useJms102()) {
return getTopicSubscriber((TopicSession)session, (Topic)destination, selector);
} else {
return getTopicSubscriber(session, (Topic)destination, selector);
}
} else {
if (useJms102()) {
return getQueueReceiver((QueueSession)session, (Queue)destination, selector);
} else {
return session.createConsumer(destination, selector);
}
}
}
/**
* Apply the specified acknowledge mode to the ActivationSpec object.
* <p>This implementation applies the standard JCA 1.5 acknowledge modes
* "Auto-acknowledge" and "Dups-ok-acknowledge". It throws an exception in
* case of {@code CLIENT_ACKNOWLEDGE} or {@code SESSION_TRANSACTED}
* having been requested.
* @param bw the BeanWrapper wrapping the ActivationSpec object
* @param ackMode the configured acknowledge mode
* (according to the constants in {@link javax.jms.Session}
* @see javax.jms.Session#AUTO_ACKNOWLEDGE
* @see javax.jms.Session#DUPS_OK_ACKNOWLEDGE
* @see javax.jms.Session#CLIENT_ACKNOWLEDGE
* @see javax.jms.Session#SESSION_TRANSACTED
*/
protected void applyAcknowledgeMode(BeanWrapper bw, int ackMode) {
if (ackMode == Session.SESSION_TRANSACTED) {
throw new IllegalArgumentException("No support for SESSION_TRANSACTED: Only \"Auto-acknowledge\" " +
"and \"Dups-ok-acknowledge\" supported in standard JCA 1.5");
}
else if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
throw new IllegalArgumentException("No support for CLIENT_ACKNOWLEDGE: Only \"Auto-acknowledge\" " +
"and \"Dups-ok-acknowledge\" supported in standard JCA 1.5");
}
else if (bw.isWritableProperty("acknowledgeMode")) {
bw.setPropertyValue("acknowledgeMode",
ackMode == Session.DUPS_OK_ACKNOWLEDGE ? "Dups-ok-acknowledge" : "Auto-acknowledge");
}
else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE) {
// Standard JCA 1.5 "acknowledgeMode" apparently not supported (e.g. WebSphere MQ 6.0.2.1)
throw new IllegalArgumentException("Dups-ok-acknowledge not supported by underlying provider");
}
}
private void sendJMSMessageToTestingMessageBean(Object messageData) throws JMSException, NamingException {
Context c = new InitialContext();
ConnectionFactory cf = (ConnectionFactory) c.lookup("java:comp/env/jms/TestingMessageBeanFactory");
Connection conn = null;
Session s = null;
try {
conn = cf.createConnection();
s = conn.createSession(false, s.AUTO_ACKNOWLEDGE);
Destination destination = (Destination) c.lookup("java:comp/env/jms/TestingMessageBean");
MessageProducer mp = s.createProducer(destination);
mp.send(createJMSMessageForjmsTestingMessageBean(s, messageData));
} finally {
if (s != null) {
try {
s.close();
} catch (JMSException e) {
Logger.getLogger(this.getClass().getName()).log(Level.WARNING, "Cannot close session", e);
}
}
if (conn != null) {
conn.close();
}
}
}
@Test
public void testGetSelector() throws Exception {
Connection consumerConnection = null;
try {
consumerConnection = createConnection();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String selector = "JMSType = 'something'";
MessageConsumer topicConsumer = consumerSession.createConsumer(ActiveMQServerTestCase.topic1, selector);
ProxyAssertSupport.assertEquals(selector, topicConsumer.getMessageSelector());
} finally {
if (consumerConnection != null) {
consumerConnection.close();
}
}
}
@Test
public void testUnsubscribeDurable() throws Exception {
conn = cf.createConnection();
conn.setClientID("C1");
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = session.createSharedDurableConsumer(topic, "c1");
MessageProducer prod = session.createProducer(topic);
for (int i = 0; i < 100; i++) {
prod.send(session.createTextMessage("msg" + i));
}
Assert.assertNotNull(cons.receive(5000));
cons.close();
session.unsubscribe("c1");
cons = session.createSharedDurableConsumer(topic, "c1");
// it should be null since the queue was deleted through unsubscribe
Assert.assertNull(cons.receiveNoWait());
}
@Test
public void publishEmptyMessage() throws Exception
{
Map<String, Object> messageBody = new HashMap<>();
messageBody.put("address", QUEUE_NAME);
getHelper().submitRequest("virtualhost/publishMessage",
"POST",
Collections.singletonMap("message", messageBody),
SC_OK);
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(getReceiveTimeout());
assertThat(message, is(notNullValue()));
}
finally
{
connection.close();
}
}
public Map<String,Object> updateEntityUsingAmqpManagementAndReceiveResponse(final String name,
final String type,
Map<String, Object> attributes,
final Session session)
throws JMSException
{
Destination replyToDestination;
Destination replyConsumerDestination;
if (_protocol == Protocol.AMQP_1_0)
{
replyToDestination = session.createTemporaryQueue();
replyConsumerDestination = replyToDestination;
}
else
{
replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);
}
MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
updateEntityUsingAmqpManagement(name, type, attributes, replyToDestination, session);
return receiveManagementResponse(consumer, replyToDestination, 200);
}
@Test
public void testDeliveringStats() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(defaultQueueName));
producer.send(session.createTextMessage("test"));
verifyPendingStats(defaultQueueName, 1, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 1, publishedMessageSize.get());
verifyDeliveringStats(defaultQueueName, 0, 0);
MessageConsumer consumer = session.createConsumer(session.createQueue(defaultQueueName));
Message msg = consumer.receive();
verifyDeliveringStats(defaultQueueName, 1, publishedMessageSize.get());
msg.acknowledge();
verifyPendingStats(defaultQueueName, 0, 0);
verifyPendingDurableStats(defaultQueueName, 0, 0);
verifyDeliveringStats(defaultQueueName, 0, 0);
connection.close();
}
protected CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
final CountDownLatch done = new CountDownLatch(1);
new Thread("Send thread.") {
@Override
public void run() {
Session session = null;
try {
session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(session.createTextMessage(message));
done.countDown();
} catch (JMSException e) {
e.printStackTrace();
} finally {
safeClose(session);
}
}
}.start();
return done;
}
/**
* Create a browser
*
* @param queue The queue
* @param messageSelector The message selector
* @return The browser
* @throws JMSException Thrown if an error occurs
*/
@Override
public QueueBrowser createBrowser(final Queue queue, final String messageSelector) throws JMSException {
if (cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION) {
throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession");
}
Session session = getSessionInternal();
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createBrowser " + session + " queue=" + queue + " selector=" + messageSelector);
}
QueueBrowser result = session.createBrowser(queue, messageSelector);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createdBrowser " + session + " browser=" + result);
}
return result;
}
@Test
public void testProduceRateToTopic() throws Exception {
connection = createActiveMQConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getDestinationName());
// Warm Up the broker.
produceMessages(topic, MSG_COUNT);
List<Long> sendTimes = new ArrayList<Long>();
long cumulative = 0;
for (int i = 0; i < NUM_RUNS; ++i) {
long result = produceMessages(topic, MSG_COUNT);
sendTimes.add(result);
cumulative += result;
LOG.info("Time to send {} topic messages: {} ms", MSG_COUNT, result);
}
long smoothed = cumulative / NUM_RUNS;
LOG.info("Smoothed send time for {} messages: {}", MSG_COUNT, smoothed);
}
protected void readMessagesOnBroker(String queueName, Connection connection, int count, AtomicInteger sequence, BiConsumer<Integer, Message> additionalCheck) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < MESSAGE_COUNT; ++i) {
Message message = consumer.receive(RECEIVE_TIMEOUT);
assertNotNull(message);
LOG.debug("Read message #{}: type = {}", i, message.getClass().getSimpleName());
String gid = message.getStringProperty("JMSXGroupID");
int seq = message.getIntProperty("JMSXGroupSeq");
LOG.debug("Message assigned JMSXGroupID := {}", gid);
LOG.debug("Message assigned JMSXGroupSeq := {}", seq);
assertEquals("Sequence order should match", sequence.incrementAndGet(), seq);
if (additionalCheck != null) {
additionalCheck.accept(i, message);
}
}
session.close();
}
private void addSubscription(String subscriptionName, Session session)
{
if(_testSubscriptions.putIfAbsent(subscriptionName, session) != null)
{
throw new DistributedTestException("Subscribing session '" + subscriptionName + "' is already registered");
}
}
private void internalTestSendDirectToQueue(RoutingType routingType) throws Exception {
String address = "test";
String queue1Name = "queue1";
String queue2Name = "queue2";
createQueue(routingType, address, queue1Name);
createQueue(routingType, address, queue2Name);
try (ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = cf.createConnection("admin", "admin");) {
// send messages to queue
Session session = createSession(connection);
Destination queue1 = session.createQueue(CompositeAddress.toFullyQualified(address, queue1Name));
Destination queue2 = session.createQueue(CompositeAddress.toFullyQualified(address, queue2Name));
MessageConsumer consumer1 = session.createConsumer(queue1);
MessageConsumer consumer2 = session.createConsumer(queue2);
new Producer()
.setDestination((routingType == RoutingType.ANYCAST ? ActiveMQDestination.QUEUE_QUALIFIED_PREFIX : ActiveMQDestination.TOPIC_QUALIFIED_PREFIX) + CompositeAddress.toFullyQualified(address, queue1Name))
.setMessageCount(5)
.setUser("admin")
.setPassword("admin")
.execute(new TestActionContext());
assertNull(consumer2.receive(1000));
assertNotNull(consumer1.receive(1000));
}
}
@Test
public void testSendTextToQueue() throws Exception {
PutJMS putJms = new PutJMS();
TestRunner putRunner = TestRunners.newTestRunner(putJms);
putRunner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
putRunner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
putRunner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE);
putRunner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
putRunner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(putRunner.getProcessContext(), true);
final Session jmsSession = wrappedProducer.getSession();
final MessageProducer producer = wrappedProducer.getProducer();
final Message message = jmsSession.createTextMessage("Hello World");
producer.send(message);
jmsSession.commit();
GetJMSQueue getJmsQueue = new GetJMSQueue();
TestRunner runner = TestRunners.newTestRunner(getJmsQueue);
runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER);
runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false");
runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing");
runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO);
runner.run();
List<MockFlowFile> flowFiles = runner
.getFlowFilesForRelationship(new Relationship.Builder().name("success").build());
assertTrue(flowFiles.size() == 1);
MockFlowFile successFlowFile = flowFiles.get(0);
successFlowFile.assertContentEquals("Hello World");
successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing");
producer.close();
jmsSession.close();
}
private int drain(Connection connection, String queueName) throws JMSException
{
int counter = 0;
Session session = null;
try
{
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(session.createQueue(queueName));
try
{
while (messageConsumer.receive(_drainPollTimeout) != null)
{
counter++;
}
}
finally
{
messageConsumer.close();
}
}
finally
{
if (session != null)
{
session.close();
}
}
return counter;
}
private void testJmsConnection(final javax.jms.Connection connection) throws JMSException {
final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
final Topic topic = session.createTopic("test");
final MessageProducer producer = session.createProducer(topic);
producer.send(session.createMessage());
producer.close();
session.close();
connection.close();
}
@Test
public void testTempDestinations() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection conn = factory.createQueueConnection("jdoe", "sunflower");
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
Queue queue = sess.createTemporaryQueue();
MessageProducer producer = sess.createProducer(queue);
MessageConsumer consumer = sess.createConsumer(queue);
producer.send(sess.createTextMessage("test"));
Message msg = consumer.receive(1000);
assertNotNull(msg);
}
public void internalTestDurableSubscriber(final boolean largeMessage, final int batchSize) throws Exception {
JMSBridgeImpl bridge = null;
try {
final int NUM_MESSAGES = 10;
bridge = new JMSBridgeImpl(cff0, cff1, sourceTopicFactory, targetQueueFactory, null, null, null, null, null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE, batchSize, -1, "subTest", "clientid123", false).setBridgeName("test-bridge");
bridge.start();
sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, true, largeMessage);
checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, largeMessage);
Assert.assertEquals(0L, bridge.getAbortedMessageCount());
Assert.assertEquals("We didn't get the correct number processed messages", NUM_MESSAGES, bridge.getMessageCount());
} finally {
if (bridge != null) {
bridge.stop();
}
// Now unsubscribe
Connection conn = cf0.createConnection();
conn.setClientID("clientid123");
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
sess.unsubscribe("subTest");
conn.close();
}
}
@Test(timeout=20000)
public void testRollbackErrorCoordinatorClosedOnCommit() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
Binary txnId1 = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
Binary txnId2 = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
testPeer.expectDeclare(txnId1);
testPeer.remotelyCloseLastCoordinatorLinkOnDischarge(txnId1, false, true, txnId2);
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId2);
testPeer.expectDischarge(txnId2, true);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
try {
session.commit();
fail("Transaction should have rolled back");
} catch (TransactionRolledBackException ex) {
LOG.info("Caught expected TransactionRolledBackException");
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
private void produceMsg(int numMessages) throws Exception
{
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a messages
String text = "Hello world! From tester producer";
TextMessage message = session.createTextMessage(text);
for (int i = 0; i < numMessages; i++) {
producer.send(message);
}
// Clean up
session.close();
connection.close();
}
/**
* Create a message consumer
*
* @param destination The destination
* @param messageSelector The message selector
* @param noLocal If true inhibits the delivery of messages published by its own connection
* @return The message consumer
* @throws JMSException Thrown if an error occurs
*/
@Override
public MessageConsumer createConsumer(final Destination destination,
final String messageSelector,
final boolean noLocal) throws JMSException {
lock();
try {
Session session = getSessionInternal();
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createConsumer " + session +
" dest=" +
destination +
" messageSelector=" +
messageSelector +
" noLocal=" +
noLocal);
}
MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal);
result = new ActiveMQRAMessageConsumer(result, this);
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " consumer=" + result);
}
addConsumer(result);
return result;
} finally {
unlock();
}
}
@Test(timeout=20000)
public void testJMSExceptionOnRollbackWhenCoordinatorRemotelyClosed() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
testPeer.expectDeclare(txnId);
testPeer.remotelyCloseLastCoordinatorLink();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectCoordinatorAttach();
testPeer.expectDeclare(txnId);
testPeer.expectDischarge(txnId, true);
try {
session.rollback();
fail("Rollback should have thrown a JMSException");
} catch (JMSException ex) {
LOG.info("Caught expected JMSException");
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
/**
* Method to establish the connection with the given Queue, with message persistance as specified.
* This must be called before calling pushMessage() to send messages.
*
* @param persistMessage whether or not messages need to be persisted
* @param queueName name of the queue
* @throws JMSException if connection to the queue fails
*/
public void connect(String queueName, boolean persistMessage) throws JMSException {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
producer = session.createProducer(destination);
if (persistMessage) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
}
@Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed();
createJmsConnection();
int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
JmsSession result = new JmsSession(this, getNextSessionId(), ackMode);
if (started.get()) {
result.start();
}
return result;
}
/**
* The test queue is filled with QUEUE_SIZE test messages, each with a
* numeric id property beginning at 0. Once the queue is filled, the last
* message (id = QUEUE_SIZE-1) is moved to another queue. The test succeeds
* if the move completes within TEST_TIMEOUT milliseconds.
*
* @throws Exception
*/
public void testMoveMessages() throws Exception {
final int QUEUE_SIZE = 30000;
final String MOVE_TO_DESTINATION_NAME = getDestinationString() + ".dest";
final long TEST_TIMEOUT = 20000;
// Populate a test queue with uniquely-identifiable messages.
Connection conn = createConnection();
try {
conn.start();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < QUEUE_SIZE; i++) {
Message message = session.createMessage();
message.setIntProperty("id", i);
producer.send(message);
}
session.commit();
} finally {
conn.close();
}
// Access the implementation of the test queue and move the last message
// to another queue. Verify that the move occurred within the limits of
// the test.
Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get(destination);
ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
context.setBroker(broker.getBroker());
context.getMessageEvaluationContext().setDestination(destination);
long startTimeMillis = System.currentTimeMillis();
Assert.assertEquals(1, queue.moveMatchingMessagesTo(context, "id=" + (QUEUE_SIZE - 1), createDestination(MOVE_TO_DESTINATION_NAME)));
long durationMillis = System.currentTimeMillis() - startTimeMillis;
LOG.info("It took " + durationMillis + "ms to move the last message from a queue a " + QUEUE_SIZE + " messages.");
Assert.assertTrue("Moving the message took too long: " + durationMillis + "ms", durationMillis < TEST_TIMEOUT);
}
@Test
public void incomingMessageUsesMessageConverter() throws JMSException {
javax.jms.Message jmsMessage = mock(javax.jms.Message.class);
Session session = mock(Session.class);
MessageConverter messageConverter = mock(MessageConverter.class);
given(messageConverter.fromMessage(jmsMessage)).willReturn("FooBar");
MessagingMessageListenerAdapter listener = getSimpleInstance("simple", Message.class);
listener.setMessageConverter(messageConverter);
listener.onMessage(jmsMessage, session);
verify(messageConverter, times(1)).fromMessage(jmsMessage);
assertEquals(1, sample.simples.size());
assertEquals("FooBar", sample.simples.get(0).getPayload());
}