下面列出了javax.jms.ServerSessionPool#org.apache.qpid.jms.JmsQueue 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void prepareSession(final Session mockSession, final JmsMessageConsumer mockConsumer) throws JMSException {
doReturn(mockConsumer).when(mockSession).createConsumer(any(JmsQueue.class));
doAnswer((Answer<MessageProducer>) destinationInv -> {
final MessageProducer messageProducer = mock(MessageProducer.class);
doReturn(destinationInv.getArgument(0)).when(messageProducer).getDestination();
mockProducers.add(messageProducer);
return messageProducer;
}).when(mockSession).createProducer(any(Destination.class));
doAnswer((Answer<JmsMessage>) textMsgInv -> {
final String textMsg = textMsgInv.getArgument(0);
final AmqpJmsTextMessageFacade facade = new AmqpJmsTextMessageFacade();
facade.initialize(Mockito.mock(AmqpConnection.class));
final JmsTextMessage jmsTextMessage = new JmsTextMessage(facade);
jmsTextMessage.setText(textMsg);
return jmsTextMessage;
}).when(mockSession).createTextMessage(anyString());
}
@Test(timeout = 20000)
public void testCreateConnectionConsumer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsServerSessionPool sessionPool = new JmsServerSessionPool();
Connection connection = testFixture.establishConnecton(testPeer);
// No additional Begin calls as there's no Session created for a Connection Consumer
testPeer.expectReceiverAttach();
testPeer.expectLinkFlow();
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
testPeer.expectDetach(true, true, true);
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test
public void testVariableExpansionQueue() throws Exception {
String lookupName = "myQueueLookup";
String variableName = "myQueueVariable";
String variableValue = "myQueueName";
Hashtable<Object, Object> env = new Hashtable<Object, Object>();
env.put("queue." + lookupName, "${" + variableName +"}");
setTestSystemProperty(variableName, variableValue);
Context ctx = createInitialContext(env);
Object o = ctx.lookup(lookupName);
assertNotNull("No object returned", o);
assertEquals("Unexpected class type for returned object", JmsQueue.class, o.getClass());
assertEquals("Unexpected name for returned queue", variableValue, ((JmsQueue) o).getQueueName());
}
@Test
public void testIsConsumerPresettledPresettleTopicConsumer() {
JmsDestination queue = new JmsQueue("test");
JmsDestination topic = new JmsTopic("test");
JmsSession session = Mockito.mock(JmsSession.class);
Mockito.when(session.isTransacted()).thenReturn(false);
JmsDefaultPresettlePolicy policy = new JmsDefaultPresettlePolicy();
assertFalse(policy.isConsumerPresttled(session, queue));
assertFalse(policy.isConsumerPresttled(session, topic));
assertFalse(policy.isConsumerPresttled(session, null));
policy.setPresettleTopicConsumers(true);
assertFalse(policy.isConsumerPresttled(session, queue));
assertTrue(policy.isConsumerPresttled(session, topic));
assertFalse(policy.isConsumerPresttled(session, null));
}
@Test
public void testIsConsumerPresettledPresettleQueueConsumer() {
JmsDestination queue = new JmsQueue("test");
JmsDestination topic = new JmsTopic("test");
JmsSession session = Mockito.mock(JmsSession.class);
Mockito.when(session.isTransacted()).thenReturn(false);
JmsDefaultPresettlePolicy policy = new JmsDefaultPresettlePolicy();
assertFalse(policy.isConsumerPresttled(session, queue));
assertFalse(policy.isConsumerPresttled(session, topic));
assertFalse(policy.isConsumerPresttled(session, null));
policy.setPresettleQueueConsumers(true);
assertTrue(policy.isConsumerPresttled(session, queue));
assertFalse(policy.isConsumerPresttled(session, topic));
assertFalse(policy.isConsumerPresttled(session, null));
}
@Test
public void testIsProducerPresettledPresettleTopicProducers() {
JmsDestination queue = new JmsQueue("test");
JmsDestination topic = new JmsTopic("test");
JmsSession session = Mockito.mock(JmsSession.class);
Mockito.when(session.isTransacted()).thenReturn(false);
JmsDefaultPresettlePolicy policy = new JmsDefaultPresettlePolicy();
assertFalse(policy.isProducerPresttled(session, queue));
assertFalse(policy.isProducerPresttled(session, topic));
assertFalse(policy.isProducerPresttled(session, null));
policy.setPresettleTopicProducers(true);
assertFalse(policy.isProducerPresttled(session, queue));
assertTrue(policy.isProducerPresttled(session, topic));
assertFalse(policy.isProducerPresttled(session, null));
}
@Test
public void testIsProducerPresettledPresettleQueueProducers() {
JmsDestination queue = new JmsQueue("test");
JmsDestination topic = new JmsTopic("test");
JmsSession session = Mockito.mock(JmsSession.class);
Mockito.when(session.isTransacted()).thenReturn(false);
JmsDefaultPresettlePolicy policy = new JmsDefaultPresettlePolicy();
assertFalse(policy.isProducerPresttled(session, queue));
assertFalse(policy.isProducerPresttled(session, topic));
assertFalse(policy.isProducerPresttled(session, null));
policy.setPresettleQueueProducers(true);
assertTrue(policy.isProducerPresttled(session, queue));
assertFalse(policy.isProducerPresttled(session, topic));
assertFalse(policy.isProducerPresttled(session, null));
}
@Test
public void testIsProducerPresettledPresettleTransactedProducers() {
JmsDestination queue = new JmsQueue("test");
JmsDestination topic = new JmsTopic("test");
JmsSession session = Mockito.mock(JmsSession.class);
Mockito.when(session.isTransacted()).thenReturn(false);
JmsDefaultPresettlePolicy policy = new JmsDefaultPresettlePolicy();
assertFalse(policy.isProducerPresttled(session, queue));
assertFalse(policy.isProducerPresttled(session, topic));
assertFalse(policy.isProducerPresttled(session, null));
Mockito.when(session.isTransacted()).thenReturn(true);
policy.setPresettleTransactedProducers(true);
assertTrue(policy.isProducerPresttled(session, queue));
assertTrue(policy.isProducerPresttled(session, topic));
assertFalse(policy.isProducerPresttled(session, null));
}
@Test
public void testGetJmsDestinationWithUnknownTypeAnnotationWithQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
AmqpConnection conn = Mockito.mock(AmqpConnection.class);
Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL)).thenReturn((byte) 5);
JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
JmsDestination destination = AmqpDestinationHelper.getJmsDestination(message, consumerDestination);
assertNotNull(destination);
assertTrue(destination.isQueue());
assertFalse(destination.isTemporary());
assertEquals(testAddress, destination.getAddress());
}
@Test
public void testGetJmsDestinationWithEmptyLegacyTypeAnnotationWithQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
AmqpConnection conn = Mockito.mock(AmqpConnection.class);
Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(LEGACY_TO_TYPE_MSG_ANNOTATION_SYMBOL)).thenReturn("");
JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
JmsDestination destination = AmqpDestinationHelper.getJmsDestination(message, consumerDestination);
assertNotNull(destination);
assertTrue(destination.isQueue());
assertFalse(destination.isTemporary());
assertEquals(testAddress, destination.getAddress());
}
@Test
public void testGetJmsDestinationWithUnknownLegacyTypeAnnotationWithQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
AmqpConnection conn = Mockito.mock(AmqpConnection.class);
Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(LEGACY_TO_TYPE_MSG_ANNOTATION_SYMBOL)).thenReturn("jms.queue");
JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
JmsDestination destination = AmqpDestinationHelper.getJmsDestination(message, consumerDestination);
assertNotNull(destination);
assertTrue(destination.isQueue());
assertFalse(destination.isTemporary());
assertEquals(testAddress, destination.getAddress());
}
@Test
public void testGetJmsReplyToWithoutTypeAnnotationWithQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
AmqpConnection conn = Mockito.mock(AmqpConnection.class);
Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
JmsDestination destination = AmqpDestinationHelper.getJmsReplyTo(message, consumerDestination);
assertNotNull(destination);
assertTrue(destination.isQueue());
assertFalse(destination.isTemporary());
assertEquals(testAddress, destination.getAddress());
}
@Test
public void testGetJmsReplyToWithUnknownTypeAnnotationWithQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
AmqpConnection conn = Mockito.mock(AmqpConnection.class);
Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL)).thenReturn((byte) 5);
JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
JmsDestination destination = AmqpDestinationHelper.getJmsReplyTo(message, consumerDestination);
assertNotNull(destination);
assertTrue(destination.isQueue());
assertFalse(destination.isTemporary());
assertEquals(testAddress, destination.getAddress());
}
@Test
public void testGetJmsReplyToWithEmptyLegacyTypeAnnotationWithQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
AmqpConnection conn = Mockito.mock(AmqpConnection.class);
Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(LEGACY_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL)).thenReturn("");
JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
JmsDestination destination = AmqpDestinationHelper.getJmsReplyTo(message, consumerDestination);
assertNotNull(destination);
assertTrue(destination.isQueue());
assertFalse(destination.isTemporary());
assertEquals(testAddress, destination.getAddress());
}
@Test
public void testGetJmsReplyToWithUnknownLegacyTypeAnnotationWithQueueConsumerDest() throws Exception {
String testAddress = "testAddress";
AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
AmqpConnection conn = Mockito.mock(AmqpConnection.class);
Mockito.when(message.getConnection()).thenReturn(conn);
Mockito.when(message.getReplyToAddress()).thenReturn(testAddress);
Mockito.when(message.getMessageAnnotation(LEGACY_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL)).thenReturn("jms.queue");
JmsQueue consumerDestination = new JmsQueue("ConsumerDestination");
JmsDestination destination = AmqpDestinationHelper.getJmsReplyTo(message, consumerDestination);
assertNotNull(destination);
assertTrue(destination.isQueue());
assertFalse(destination.isTemporary());
assertEquals(testAddress, destination.getAddress());
}
@Nullable
private ConsumerData createJmsConsumer(final Session session, final Map<String, Exception> failedSources,
final Source source, final String sourceAddress, final String addressWithIndex) {
log.debug("Creating AMQP Consumer for <{}>", addressWithIndex);
final Destination destination = new JmsQueue(sourceAddress);
final MessageConsumer messageConsumer;
try {
messageConsumer = session.createConsumer(destination);
return ConsumerData.of(source, sourceAddress, addressWithIndex, messageConsumer);
} catch (final JMSException jmsException) {
failedSources.put(addressWithIndex, jmsException);
return null;
}
}
private static Message mockMessage() throws JMSException {
final AmqpJmsTextMessageFacade amqpJmsTextMessageFacade = new AmqpJmsTextMessageFacade();
amqpJmsTextMessageFacade.setContentType(Symbol.getSymbol(DittoConstants.DITTO_PROTOCOL_CONTENT_TYPE));
amqpJmsTextMessageFacade.initialize(Mockito.mock(AmqpConnection.class));
final TextMessage jmsTextMessage = new JmsTextMessage(amqpJmsTextMessageFacade);
jmsTextMessage.setJMSCorrelationID("cid");
jmsTextMessage.setJMSReplyTo(new JmsQueue("reply"));
jmsTextMessage.setText(TestConstants.modifyThing());
return jmsTextMessage;
}
@Override
public void createQueue(String name) {
super.createQueue(name);
try {
context.bind(name, new JmsQueue(name));
} catch (NamingException e) {
throw new RuntimeException(e);
}
}
@Override
public JmsDestination transform(Destination destination) throws JMSException {
String queueName = null;
String topicName = null;
if (destination instanceof Queue) {
queueName = ((Queue) destination).getQueueName();
}
if (destination instanceof Topic) {
topicName = ((Topic) destination).getTopicName();
}
if (queueName == null && topicName == null) {
throw new JMSException("Unresolvable destination: Both queue and topic names are null: " + destination);
}
try {
Method isQueueMethod = destination.getClass().getMethod("isQueue");
Method isTopicMethod = destination.getClass().getMethod("isTopic");
Boolean isQueue = (Boolean) isQueueMethod.invoke(destination);
Boolean isTopic = (Boolean) isTopicMethod.invoke(destination);
if (isQueue) {
return new JmsQueue(queueName);
} else if (isTopic) {
return new JmsTopic(topicName);
} else {
throw new JMSException("Unresolvable destination: Neither Queue nor Topic: " + destination);
}
} catch (Exception e) {
throw new JMSException("Unresolvable destination: " + e.getMessage() + ": " + destination);
}
}
@Override
public JmsDestination transform(String destination) throws JMSException {
if (destination == null) {
throw new JMSException("Destination objects cannot have a null name value");
}
return new JmsQueue(destination);
}
private static JmsDestination createDestination(String address, byte typeByte, JmsDestination consumerDestination, boolean useConsumerDestForTypeOnly) {
if (address == null) {
return useConsumerDestForTypeOnly ? null : consumerDestination;
}
if (typeByte != UNKNOWN_TYPE) {
switch (typeByte) {
case QUEUE_TYPE:
return new JmsQueue(address);
case TOPIC_TYPE:
return new JmsTopic(address);
case TEMP_QUEUE_TYPE:
return new JmsTemporaryQueue(address);
case TEMP_TOPIC_TYPE:
return new JmsTemporaryTopic(address);
}
}
if (consumerDestination.isQueue()) {
if (consumerDestination.isTemporary()) {
return new JmsTemporaryQueue(address);
} else {
return new JmsQueue(address);
}
} else if (consumerDestination.isTopic()) {
if (consumerDestination.isTemporary()) {
return new JmsTemporaryTopic(address);
} else {
return new JmsTopic(address);
}
}
// fall back to a Queue Destination since we need a real JMS destination
return new JmsQueue(address);
}
@Test(timeout = 10000)
public void testInOrderSendAcksCompletionsReturnInOrder() throws Exception {
final int MESSAGE_COUNT = 3;
final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
JmsConnectionFactory factory = new JmsConnectionFactory(
"mock://localhost?mock.delayCompletionCalls=true");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination destination = new JmsQueue("explicitDestination");
JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
final MyCompletionListener listener = new MyCompletionListener();
sendMessages(MESSAGE_COUNT, producer, listener);
assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
}
}));
remotePoor.completeAllPendingSends(destination);
assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return listener.getCompletedSends().size() == MESSAGE_COUNT;
}
}));
assertMessageCompletedInOrder(MESSAGE_COUNT, listener);
connection.close();
}
@Test(timeout = 10000)
public void testInOrderSendFailuresCompletionsReturnInOrder() throws Exception {
final int MESSAGE_COUNT = 3;
final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
JmsConnectionFactory factory = new JmsConnectionFactory(
"mock://localhost?mock.delayCompletionCalls=true");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination destination = new JmsQueue("explicitDestination");
JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
final MyCompletionListener listener = new MyCompletionListener();
sendMessages(MESSAGE_COUNT, producer, listener);
assertTrue("Not all messages sent", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
}
}));
remotePoor.failAllPendingSends(destination, new ProviderException("Could not send message"));
assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return listener.getFailedSends().size() == MESSAGE_COUNT;
}
}));
assertMessageFailedInOrder(MESSAGE_COUNT, listener);
connection.close();
}
@Test(timeout = 20000)
public void testOnExceptionFiredOnSessionPoolFailure() throws Exception {
final CountDownLatch exceptionFired = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exceptionFired.countDown();
}
});
connection.start();
JmsFailingServerSessionPool sessionPool = new JmsFailingServerSessionPool();
// Now the Connection consumer arrives and we give it a message
// to be dispatched to the server session.
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
assertTrue("Exception should have been fired", exceptionFired.await(5, TimeUnit.SECONDS));
testPeer.expectDetach(true, true, true);
testPeer.expectDispositionThatIsReleasedAndSettled();
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test(timeout = 20000)
public void testOnExceptionFiredOnServerSessionFailure() throws Exception {
final CountDownLatch exceptionFired = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exceptionFired.countDown();
}
});
connection.start();
JmsServerSessionPool sessionPool = new JmsServerSessionPool(new JmsFailingServerSession());
// Now the Connection consumer arrives and we give it a message
// to be dispatched to the server session.
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
Queue queue = new JmsQueue("myQueue");
ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
assertTrue("Exception should have been fired", exceptionFired.await(5, TimeUnit.SECONDS));
testPeer.expectDetach(true, true, true);
testPeer.expectDispositionThatIsReleasedAndSettled();
consumer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test
public void testPlainDestinationWithCustomInterceper() throws JMSException {
ForeignDestination destination = new ForeignDestination(DESTINATION_NAME);
JmsMessageTransformation.setUnresolvedDestinationHandler(new AlwaysQueueUnresolvedDestinationHandler());
JmsDestination result = JmsMessageTransformation.transformDestination(createMockJmsConnection(), destination);
assertNotNull(result);
assertTrue(result instanceof JmsQueue);
}
@Test
public void testCompositeTopicAndQueueDestinationCanBeInterceper() throws JMSException {
ForeignDestination destination = new ForeignTopicAndQueue(DESTINATION_NAME);
JmsMessageTransformation.setUnresolvedDestinationHandler(new AlwaysQueueUnresolvedDestinationHandler());
JmsDestination result = JmsMessageTransformation.transformDestination(createMockJmsConnection(), destination);
assertNotNull(result);
assertTrue(result instanceof JmsQueue);
}
@Test
public void testQueueTypeIsDetectedFromComposite() throws JMSException {
ForeignTopicAndQueue destination = new ForeignTopicAndQueue("destination");
destination.setTopic(false);
JmsDestination result = transformer.transform(destination);
assertNotNull(result);
assertTrue(result instanceof JmsQueue);
}
@Test
public void testQueueTypeIsDetectedFromCompositeMisMatchedNameAndType() throws JMSException {
ForeignTopicAndQueue destination = new ForeignTopicAndQueue("destination");
destination.setTopic(false);
destination.setReturnQueueName(false);
JmsDestination result = transformer.transform(destination);
assertNotNull(result);
assertTrue(result instanceof JmsQueue);
}
@Test
public void testGetJMSDestinationWhenSet() throws JMSException {
JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
JmsMessage message = Mockito.mock(JmsMapMessage.class);
Mockito.when(message.getFacade()).thenReturn(facade);
JmsDestination destination = new JmsQueue("TestDestination");
Mockito.when(facade.getDestination()).thenReturn(destination);
assertEquals(destination.getAddress(), JmsMessagePropertyIntercepter.getProperty(message, JMS_DESTINATION));
}