下面列出了怎么用javax.jms.JMSContext的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testRuntimeExceptionOnCreateQueueBrowserWithSelector() throws JMSException {
JmsConnection connection = Mockito.mock(JmsConnection.class);
JmsSession session = Mockito.mock(JmsSession.class);
Mockito.when(connection.createSession(Mockito.anyInt())).thenReturn(session);
Mockito.when(session.createTemporaryQueue()).thenReturn(new JmsTemporaryQueue());
JmsContext context = new JmsContext(connection, JMSContext.AUTO_ACKNOWLEDGE);
Mockito.doThrow(IllegalStateException.class).when(session).createBrowser(any(Queue.class), anyString());
try {
context.createBrowser(context.createTemporaryQueue(), "a == b");
fail("Should throw ISRE");
} catch (IllegalStateRuntimeException isre) {
} finally {
context.close();
}
Mockito.verify(session, Mockito.times(1)).createBrowser(any(Queue.class), anyString());
}
@Test
public void testAutoStartOffDoesNotStartTheConnectionMessageConsumerSelectorNoLocal() throws JMSException {
JmsConnection connection = Mockito.mock(JmsConnection.class);
JmsSession session = Mockito.mock(JmsSession.class);
JmsMessageConsumer consumer = Mockito.mock(JmsMessageConsumer.class);
Mockito.when(connection.createSession(Mockito.anyInt())).thenReturn(session);
Mockito.when(session.createTemporaryTopic()).thenReturn(new JmsTemporaryTopic());
Mockito.when(session.createConsumer(any(Destination.class), anyString(), anyBoolean())).thenReturn(consumer);
JmsContext context = new JmsContext(connection, JMSContext.AUTO_ACKNOWLEDGE);
context.setAutoStart(false);
try {
context.createConsumer(context.createTemporaryTopic(), "a = b", true);
} finally {
context.close();
}
Mockito.verify(session, Mockito.times(1)).createConsumer(any(Topic.class), anyString(), anyBoolean());
Mockito.verify(connection, Mockito.times(0)).start();
}
@Test
public void testJMSContextConsumerThrowsMessageFormatExceptionOnMalformedBody() throws Exception {
Queue queue = createQueue(true, "ContextMalformedBodyTestQueue");
JMSContext context = qraConnectionFactory.createContext();
JMSProducer producer = context.createProducer();
TextMessage message = context.createTextMessage("TestMessage");
producer.send(queue, message);
JMSConsumer consumer = context.createConsumer(queue);
try {
consumer.receiveBody(Boolean.class);
fail("Should thrown MessageFormatException");
} catch (MessageFormatRuntimeException mfre) {
// Do nothing test passed
} catch (Exception e) {
fail("Threw wrong exception, should be MessageFormatRuntimeException, instead got: " + e.getClass().getCanonicalName());
}
}
@Override
public JMSContext createContext(int sessionMode) {
switch (sessionMode) {
case Session.AUTO_ACKNOWLEDGE:
case Session.CLIENT_ACKNOWLEDGE:
case Session.DUPS_OK_ACKNOWLEDGE:
case Session.SESSION_TRANSACTED:
case ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE:
case ActiveMQJMSConstants.PRE_ACKNOWLEDGE:
break;
default:
throw new JMSRuntimeException("Invalid ackmode: " + sessionMode);
}
refCounter.increment();
return new ActiveMQJMSContext(this, sessionMode, threadAwareContext);
}
@Test
public void testRuntimeExceptionOnCreateTemporaryTopicFailure() throws JMSException {
JmsConnection connection = Mockito.mock(JmsConnection.class);
JmsSession session = Mockito.mock(JmsSession.class);
Mockito.when(connection.createSession(Mockito.anyInt())).thenReturn(session);
JmsContext context = new JmsContext(connection, JMSContext.CLIENT_ACKNOWLEDGE);
Mockito.doThrow(IllegalStateException.class).when(session).createTemporaryTopic();
try {
context.createTemporaryTopic();
fail("Should throw ISRE");
} catch (IllegalStateRuntimeException isre) {
} finally {
context.close();
}
}
public static void main(final String[] args) throws Exception {
// Step 2. Perfom a lookup on the queue
Queue queue = ActiveMQJMSClient.createQueue("exampleQueue");
// Step 4.Create a JMS Context using the try-with-resources statement
try
(
// Even though ConnectionFactory is not closeable it would be nice to close an ActiveMQConnectionFactory
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
JMSContext jmsContext = cf.createContext()
) {
// Step 5. create a jms producer
JMSProducer jmsProducer = jmsContext.createProducer();
// Step 6. Try sending a message, we don't have the appropriate privileges to do this so this will throw an exception
jmsProducer.send(queue, "A Message from JMS2!");
System.out.println("Received:" + jmsContext.createConsumer(queue).receiveBody(String.class));
}
}
@Test
public void testAutoStartOffDoesNotStartTheConnectionQueueBrowserWithSelector() throws JMSException {
JmsConnection connection = Mockito.mock(JmsConnection.class);
JmsSession session = Mockito.mock(JmsSession.class);
JmsQueueBrowser browser = Mockito.mock(JmsQueueBrowser.class);
Mockito.when(connection.createSession(Mockito.anyInt())).thenReturn(session);
Mockito.when(session.createBrowser(any(Queue.class))).thenReturn(browser);
Mockito.when(session.createTemporaryQueue()).thenReturn(new JmsTemporaryQueue());
JmsContext context = new JmsContext(connection, JMSContext.AUTO_ACKNOWLEDGE);
context.setAutoStart(false);
try {
context.createBrowser(context.createTemporaryQueue(), "a == b");
} finally {
context.close();
}
Mockito.verify(session, Mockito.times(1)).createBrowser(any(Queue.class), anyString());
Mockito.verify(connection, Mockito.times(0)).start();
}
@Test
public void testAutoStartOnDoesStartTheConnectionMessageSharedDurableConsumerSelector() throws JMSException {
JmsConnection connection = Mockito.mock(JmsConnection.class);
JmsSession session = Mockito.mock(JmsSession.class);
JmsMessageConsumer consumer = Mockito.mock(JmsMessageConsumer.class);
Mockito.when(connection.createSession(Mockito.anyInt())).thenReturn(session);
Mockito.when(session.createSharedDurableConsumer(any(Topic.class), anyString(), anyString())).thenReturn(consumer);
Mockito.when(session.createTemporaryTopic()).thenReturn(new JmsTemporaryTopic());
JmsContext context = new JmsContext(connection, JMSContext.AUTO_ACKNOWLEDGE);
context.setAutoStart(true);
try {
context.createSharedDurableConsumer(context.createTemporaryTopic(), "name", "a = b");
} finally {
context.close();
}
Mockito.verify(session, Mockito.times(1)).createSharedDurableConsumer(any(Topic.class), anyString(), anyString());
Mockito.verify(connection, Mockito.times(1)).start();
}
@Test
public void testCreateQueuePassthrough() throws JMSException {
JmsConnection connection = Mockito.mock(JmsConnection.class);
JmsSession session = Mockito.mock(JmsSession.class);
Mockito.when(connection.createSession(Mockito.anyInt())).thenReturn(session);
JmsContext context = new JmsContext(connection, JMSContext.AUTO_ACKNOWLEDGE);
try {
context.createQueue("test");
} finally {
context.close();
}
Mockito.verify(session, Mockito.times(1)).createQueue(anyString());
}
@After
public void stop() throws Exception {
JMSContext context = factory.createContext();
Destination channel = context.createQueue(MockEvent.class.getName());
JMSConsumer consumer = context.createConsumer(channel);
// 清理测试消息
logger.info("清理JMS测试消息开始");
AtomicInteger count = new AtomicInteger();
consumer.setMessageListener((data) -> {
String message = StringUtility.format("清理JMS测试消息[{}]", count.incrementAndGet());
logger.info(message);
});
Thread.sleep(1000L);
logger.info("清理JMS测试消息结束");
factory.close();
}
@Test
public void testRecoverPassthrough() throws JMSException {
JmsConnection connection = Mockito.mock(JmsConnection.class);
JmsSession session = Mockito.mock(JmsSession.class);
Mockito.when(connection.createSession(Mockito.anyInt())).thenReturn(session);
JmsContext context = new JmsContext(connection, JMSContext.CLIENT_ACKNOWLEDGE);
try {
context.recover();
} finally {
context.close();
}
Mockito.verify(session, Mockito.times(1)).recover();
}
/**
* Creates and sets the JMS connection and session variables.
* @throws Exception
*/
protected void buildJMSResources() throws Exception {
destroyJMSResources(true);
if (!connectionInitialised) buildConnectionResources();
//Build any JMS 2.0 thread resources here
//Create the first JMSContext here, which can be used to create other JMSContexts for each thread
if (transacted) {
Log.logger.log(Level.FINE, "Using Transacted Mode");
context = masterContext.createContext(JMSContext.SESSION_TRANSACTED);
} else {
int ackMode = Config.parms.getInt("am");
Log.logger.log(Level.FINE, "Using Acknowledge Mode: {0}", ackMode);
context = masterContext.createContext(ackMode);
}
}
/**
* The target queue, which is addressed directly by the client, does not have
* holdsOnPublish turned on. The Broker must reject the message.
*/
@Test
public void testDeliveryDelayNotSupportedByQueue_MessageRejected() throws Exception
{
try (JMSContext context = getConnectionBuilder().buildConnectionFactory().createContext())
{
Destination queue = createQueue(context, BrokerAdmin.TEST_QUEUE_NAME, false);
JMSProducer producer = context.createProducer().setDeliveryDelay(DELIVERY_DELAY);
try
{
producer.send(queue, "message");
fail("Exception not thrown");
}
catch (JMSRuntimeException e)
{
assertTrue("Unexpected exception message: " + e.getMessage(),
e.getMessage().contains("amqp:precondition-failed"));
}
}
}
@Test
public void testAcknowledgeNoopDupsOkAcknowledge() throws JMSException {
JmsConnection connection = Mockito.mock(JmsConnection.class);
JmsSession session = Mockito.mock(JmsSession.class);
Mockito.when(connection.createSession(Mockito.anyInt())).thenReturn(session);
JmsContext context = new JmsContext(connection, JMSContext.DUPS_OK_ACKNOWLEDGE);
try {
context.acknowledge();
} finally {
context.close();
}
Mockito.verify(session, Mockito.times(0)).acknowledge(ACK_TYPE.ACCEPTED);
}
@Test
public void testRuntimeExceptionOnStartFailure() throws JMSException {
JmsConnection connection = Mockito.mock(JmsConnection.class);
JmsContext context = new JmsContext(connection, JMSContext.AUTO_ACKNOWLEDGE);
Mockito.doThrow(IllegalStateException.class).when(connection).start();
try {
context.start();
fail("Should throw ISRE");
} catch (IllegalStateRuntimeException isre) {
} finally {
context.close();
}
}
static JMSContext create(JMSContext delegate, JmsTracing jmsTracing) {
if (delegate == null) throw new NullPointerException("delegate == null");
if (jmsTracing == null) throw new NullPointerException("jmsTracing == null");
if (delegate instanceof XAJMSContext) {
return new TracingXAJMSContext((XAJMSContext) delegate, jmsTracing);
}
return new TracingJMSContext(delegate, jmsTracing);
}
@Test(timeout = 30000)
public void testCreateContextOptOutConfigurationWithCredentials() {
cf.setUseProviderJMSContext(true);
JMSContext context = cf.createContext("user", "password");
assertNotNull(context);
assertFalse(context instanceof JmsPoolJMSContext);
assertTrue(context instanceof MockJMSContext);
assertEquals(0, cf.getNumConnections());
}
@Test(timeout = 20000)
public void testCreateContextWithClientId() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JMSContext context = testFixture.createJMSContext(testPeer, false, null, null, null, true);
testPeer.expectClose();
context.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
int mode;
boolean xa;
TransactionSupportLevel transactionSupportLevel;
if (connection instanceof TomEEManagedConnection) {
transactionSupportLevel = ((TomEEManagedConnection) connection).getTransactionSupportLevel();
} else if (!transacted) {
transactionSupportLevel = TransactionSupportLevel.NoTransaction;
} else {
transactionSupportLevel = TransactionSupportLevel.XATransaction;
}
switch (transactionSupportLevel) {
case XATransaction:
if (JMS2.inTx()) {
mode = -1;
xa = true;
break;
}
case NoTransaction:
mode = acknowledgeMode;
xa = false;
break;
case LocalTransaction:
mode = JMSContext.SESSION_TRANSACTED;
xa = false;
break;
default:
throw new IllegalStateException("transactionSupportLevel mode not supported:" + transactionSupportLevel);
}
if (xa) {
return createXASession();
} else {
return connection.getPhysicalConnection().createSession(mode);
}
}
@Test
public void testWithJMSFQQN() throws Exception {
SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
String fqqn = CompositeAddress.toFullyQualified(dla, dlqName).toString();
triggerDlaDelivery();
JMSContext context = new ActiveMQConnectionFactory("vm://0").createContext();
context.start();
assertNotNull(context.createConsumer(context.createQueue(fqqn)).receive(2000));
}
/**
* <b>JMS 2.0</b>
* @param uri String containing queue name
* @param context Active JMSContext must be supplied if not retrieving from JNDI
* @return A valid DestinationWrapper object containing a queue retrieved from JNDI or created from the provided JMSContext
* @throws NamingException
*/
public DestinationWrapper<Queue> lookupQueue(String uri, JMSContext context) throws JMSException, NamingException {
if (usingJNDI || context == null) {
return lookupQueueFromJNDI(uri);
} else {
return new DestinationWrapper<Queue>(uri, context.createQueue(uri));
}
}
@Test(timeout = 30000)
public void testCreateSubContextAfterParentClosed() {
JMSContext context = cf.createContext();
context.close();
try {
context.createContext(Session.AUTO_ACKNOWLEDGE);
fail("Should not be able to call with invalid mode.");
} catch (JMSRuntimeException jmsre) {}
}
@Override
public JMSContext createContext(final int sessionMode) {
boolean inTx = JMS2.inTx();
int mode;
if (inTx) {
mode = -1;
} else {
mode = sessionMode;
}
return new JMSContextImpl(this, mode, null, null, inTx);
}
@Test
public void testRuntimeExceptionOnGetExceptionListenerFailure() throws JMSException {
JmsConnection connection = Mockito.mock(JmsConnection.class);
JmsContext context = new JmsContext(connection, JMSContext.CLIENT_ACKNOWLEDGE);
Mockito.doThrow(IllegalStateException.class).when(connection).getExceptionListener();
try {
context.getExceptionListener();
fail("Should throw ISRE");
} catch (IllegalStateRuntimeException isre) {
} finally {
context.close();
}
}
private void publishQuotePriceChange(QuoteDataBean quoteData, BigDecimal oldPrice, BigDecimal changeFactor, double sharesTraded) throws Exception {
if (Log.doTrace()) {
Log.trace("TradeDirect:publishQuotePrice PUBLISHING to MDB quoteData = " + quoteData);
}
try (JMSContext context = tConnFactory.createContext();){
TextMessage message = context.createTextMessage();
message.setStringProperty("command", "updateQuote");
message.setStringProperty("symbol", quoteData.getSymbol());
message.setStringProperty("company", quoteData.getCompanyName());
message.setStringProperty("price", quoteData.getPrice().toString());
message.setStringProperty("oldPrice", oldPrice.toString());
message.setStringProperty("open", quoteData.getOpen().toString());
message.setStringProperty("low", quoteData.getLow().toString());
message.setStringProperty("high", quoteData.getHigh().toString());
message.setDoubleProperty("volume", quoteData.getVolume());
message.setStringProperty("changeFactor", changeFactor.toString());
message.setDoubleProperty("sharesTraded", sharesTraded);
message.setLongProperty("publishTime", System.currentTimeMillis());
message.setText("Update Stock price for " + quoteData.getSymbol() + " old price = " + oldPrice + " new price = " + quoteData.getPrice());
context.createProducer().send(streamerTopic, message);
} catch (Exception e) {
throw e; // pass exception back
}
}
@Test(timeout = 20000)
public void testCreateContextAndSetClientID() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JMSContext context = testFixture.createJMSContext(testPeer, false, null, null, null, false);
context.setClientID(UUID.randomUUID().toString());
testPeer.expectClose();
context.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Override
public JMSContext createContext(final String userName, final String password, final int sessionMode) {
boolean inTx = JMS2.inTx();
int mode;
if (inTx) {
mode = -1;
} else {
mode = sessionMode;
}
return new JMSContextImpl(this, mode, userName, password, inTx);
}
@Test(timeout = 30000)
public void testCreateMapMessage() {
JMSContext context = cf.createContext();
assertNotNull(context.createMapMessage());
context.close();
try {
context.createMapMessage();
fail("Should not be able to create resource when context is closed");
} catch (IllegalStateRuntimeException isre) {}
}
@Test(timeout = 30000)
public void testCreateSubContextWithInvalidSessionMode() {
JMSContext context = cf.createContext();
try {
context.createContext(9);
fail("Should not be able to call with invalid mode.");
} catch (JMSRuntimeException jmsre) {}
}
@Override
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config);
JMSContext context = createJmsContext(ic);
contexts.add(context);
JmsSource source = new JmsSource(context, ic, json, executor);
sources.add(source);
return source.getSource();
}