下面列出了javax.jms.ConnectionFactory#createContext ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private JMSContext createContext(ConnectionFactory factory, String username, String password, String mode) {
int sessionMode;
switch (mode.toUpperCase()) {
case "AUTO_ACKNOWLEDGE":
sessionMode = JMSContext.AUTO_ACKNOWLEDGE;
break;
case "SESSION_TRANSACTED":
sessionMode = JMSContext.SESSION_TRANSACTED;
break;
case "CLIENT_ACKNOWLEDGE":
sessionMode = JMSContext.CLIENT_ACKNOWLEDGE;
break;
case "DUPS_OK_ACKNOWLEDGE":
sessionMode = JMSContext.DUPS_OK_ACKNOWLEDGE;
break;
default:
throw ex.illegalStateUnknowSessionMode(mode);
}
if (username != null) {
return factory.createContext(username, password, sessionMode);
} else {
return factory.createContext(sessionMode);
}
}
@Test
public void testAddressQueryDefaultsOnAutoCreatedAddress() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings());
ConnectionFactory cf = new ActiveMQConnectionFactory("vm://0");
JMSContext c = cf.createContext();
c.createProducer().send(c.createTopic(addressName.toString()), c.createMessage());
AddressQueryResult addressQueryResult = server.addressQuery(addressName);
assertTrue(addressQueryResult.isExists());
assertFalse(addressQueryResult.getRoutingTypes().contains(RoutingType.ANYCAST));
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
assertEquals(addressName, addressQueryResult.getName());
assertTrue(addressQueryResult.isAutoCreateAddresses());
assertEquals(-1, addressQueryResult.getDefaultMaxConsumers());
assertTrue(addressQueryResult.isAutoCreated());
assertFalse(addressQueryResult.isDefaultPurgeOnNoConsumers());
}
@Test
public void testAddressQueryNonDefaultsOnAutoCreatedAddress() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
server.getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setAutoCreateAddresses(true).setDefaultMaxConsumers(1).setDefaultPurgeOnNoConsumers(true));
ConnectionFactory cf = new ActiveMQConnectionFactory("vm://0");
JMSContext c = cf.createContext();
c.createProducer().send(c.createTopic(addressName.toString()), c.createMessage());
AddressQueryResult addressQueryResult = server.addressQuery(addressName);
assertTrue(addressQueryResult.isExists());
assertTrue(addressQueryResult.getRoutingTypes().contains(RoutingType.MULTICAST));
assertEquals(addressName, addressQueryResult.getName());
assertTrue(addressQueryResult.isAutoCreateAddresses());
assertEquals(1, addressQueryResult.getDefaultMaxConsumers());
assertTrue(addressQueryResult.isAutoCreated());
assertTrue(addressQueryResult.isDefaultPurgeOnNoConsumers());
}
JMSContext createJMSContext(TestAmqpPeer testPeer, boolean ssl, String optionsString, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties, boolean setClientId, int sessionMode) throws JMSException {
testPeer.expectSaslPlain("guest", "guest");
testPeer.expectOpen(serverProperties, serverCapabilities);
// Each connection creates a session for managing temporary destinations etc
testPeer.expectBegin();
String remoteURI = buildURI(testPeer, ssl, optionsString);
ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
JMSContext context = factory.createContext("guest", "guest", sessionMode);
if (setClientId) {
// Set a clientId to provoke the actual AMQP connection process to occur.
context.setClientID("clientName");
}
assertNull(testPeer.getThrowable());
return context;
}
public JmsEventChannel(EventMode mode, String name, ConnectionFactory factory, ContentCodec codec) {
super(mode, name);
this.factory = factory;
this.context = factory.createContext();
this.codec = codec;
this.producer = context.createProducer();
this.consumers = new ConcurrentHashMap<>();
}
@Test
public void testContextOnConsumerAMQP() throws Throwable {
if (!isNetty()) {
// no need to run the test, there's no AMQP support
return;
}
assertNull(server.getAddressInfo(SimpleString.toSimpleString("queue")));
ConnectionFactory factory = createFactory(2);
JMSContext context = factory.createContext("admin", "admin", Session.AUTO_ACKNOWLEDGE);
try {
javax.jms.Queue queue = context.createQueue("queue");
JMSConsumer consumer = context.createConsumer(queue);
ServerConsumer serverConsumer = null;
for (ServerSession session : server.getSessions()) {
for (ServerConsumer sessionConsumer : session.getServerConsumers()) {
serverConsumer = sessionConsumer;
}
}
consumer.close();
Assert.assertTrue(serverConsumer.getProtocolContext() instanceof ProtonServerSenderContext);
final AMQPSessionContext sessionContext = ((ProtonServerSenderContext)
serverConsumer.getProtocolContext()).getSessionContext();
Wait.assertEquals(0, () -> sessionContext.getSenderCount(), 1000, 10);
} finally {
context.stop();
context.close();
}
}
@Test
public void testAddressQueryOnAutoCreatedAddressWithFQQN() throws Exception {
SimpleString addressName = SimpleString.toSimpleString(UUID.randomUUID().toString());
SimpleString fqqn = addressName.concat("::").concat(SimpleString.toSimpleString(UUID.randomUUID().toString()));
ConnectionFactory cf = new ActiveMQConnectionFactory("vm://0");
JMSContext c = cf.createContext();
c.createProducer().send(c.createTopic(fqqn.toString()), c.createMessage());
assertEquals(addressName, server.addressQuery(addressName).getName());
assertEquals(addressName, server.addressQuery(fqqn).getName());
}
private void doMechanismNegotiationFailsToFindMatchTestImpl(boolean createContext) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
String failureMessageBreadcrumb = "Could not find a suitable SASL mechanism."
+ " No supported mechanism, or none usable with the available credentials. Server offered: [SCRAM-SHA-1, UNKNOWN, PLAIN]";
Symbol[] serverMechs = new Symbol[] { SCRAM_SHA_1, Symbol.valueOf("UNKNOWN"), PLAIN};
testPeer.expectSaslMechanismNegotiationFailure(serverMechs);
String uriOptions = "?jms.clientID=myclientid";
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + uriOptions);
if(createContext) {
try {
factory.createContext(null, null);
fail("Excepted exception to be thrown");
} catch (JMSSecurityRuntimeException jmssre) {
// Expected, we deliberately failed the mechanism negotiation process.
assertNotNull("Expected an exception message", jmssre.getMessage());
assertEquals("Unexpected message details", jmssre.getMessage(), failureMessageBreadcrumb);
}
} else {
try {
factory.createConnection(null, null);
fail("Excepted exception to be thrown");
} catch (JMSSecurityException jmsse) {
// Expected, we deliberately failed the mechanism negotiation process.
assertNotNull("Expected an exception message", jmsse.getMessage());
assertEquals("Unexpected message details", jmsse.getMessage(), failureMessageBreadcrumb);
}
}
testPeer.waitForAllHandlersToComplete(1000);
}
}
@SuppressWarnings ( "all" ) // suppressing autocloseable error. inconsistent w/ other connectionfactory impls.
@Override public JMSContext getContext( JmsDelegate delegate ) {
String finalUrl = buildUrl( delegate, false );
ConnectionFactory factory = new ActiveMQConnectionFactory( finalUrl );
return factory.createContext( delegate.amqUsername, delegate.amqPassword );
}
@Test
public void testChangeQueueRoutingTypeOnRestart() throws Exception {
Configuration configuration = createDefaultInVMConfig();
configuration.addAddressesSetting("#", new AddressSettings());
List addressConfigurations = new ArrayList();
CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration()
.setName("myAddress")
.addRoutingType(RoutingType.ANYCAST)
.addQueueConfiguration(new QueueConfiguration("myQueue")
.setAddress("myAddress")
.setRoutingType(RoutingType.ANYCAST));
addressConfigurations.add(addressConfiguration);
configuration.setAddressConfigurations(addressConfigurations);
server = createServer(true, configuration);
server.start();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0");
try (JMSContext context = connectionFactory.createContext()) {
context.createProducer().send(context.createQueue("myAddress"), "hello");
}
server.stop();
addressConfiguration = new CoreAddressConfiguration()
.setName("myAddress")
.addRoutingType(RoutingType.MULTICAST)
.addQueueConfiguration(new QueueConfiguration("myQueue")
.setAddress("myAddress")
.setRoutingType(RoutingType.MULTICAST));
addressConfigurations.clear();
addressConfigurations.add(addressConfiguration);
configuration.setAddressConfigurations(addressConfigurations);
server.start();
assertEquals(RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType());
assertEquals(RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType());
//Ensures the queue isnt detroyed by checking message sent before change is consumable after (e.g. no message loss)
try (JMSContext context = connectionFactory.createContext()) {
Message message = context.createSharedDurableConsumer(context.createTopic("myAddress"), "myQueue").receive();
assertEquals("hello", ((TextMessage) message).getText());
}
server.stop();
}
@Test
public void testChangeQueueFilterOnRestart() throws Exception {
final String filter1 = "x = 'x'";
final String filter2 = "x = 'y'";
Configuration configuration = createDefaultInVMConfig( );
configuration.addAddressesSetting("#", new AddressSettings());
List addressConfigurations = new ArrayList();
CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration()
.setName("myAddress")
.addRoutingType(RoutingType.ANYCAST)
.addQueueConfiguration(new QueueConfiguration("myQueue")
.setAddress("myAddress")
.setFilterString(filter1)
.setRoutingType(RoutingType.ANYCAST));
addressConfigurations.add(addressConfiguration);
configuration.setAddressConfigurations(addressConfigurations);
server = createServer(true, configuration);
server.start();
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0");
try (JMSContext context = connectionFactory.createContext()) {
context.createProducer().setProperty("x", "x").send(context.createQueue("myAddress"), "hello");
}
long originalBindingId = server.getPostOffice().getBinding(SimpleString.toSimpleString("myQueue")).getID();
server.stop();
addressConfiguration = new CoreAddressConfiguration()
.setName("myAddress")
.addRoutingType(RoutingType.ANYCAST)
.addQueueConfiguration(new QueueConfiguration("myQueue")
.setAddress("myAddress")
.setFilterString(filter2)
.setRoutingType(RoutingType.ANYCAST));
addressConfigurations.clear();
addressConfigurations.add(addressConfiguration);
configuration.setAddressConfigurations(addressConfigurations);
server.start();
assertEquals(filter2, server.locateQueue(SimpleString.toSimpleString("myQueue")).getFilter().getFilterString().toString());
//Ensures the queue is not destroyed by checking message sent before change is consumable after (e.g. no message loss)
try (JMSContext context = connectionFactory.createContext()) {
Message message = context.createConsumer(context.createQueue("myAddress::myQueue")).receive();
assertEquals("hello", ((TextMessage) message).getText());
}
long bindingId = server.getPostOffice().getBinding(SimpleString.toSimpleString("myQueue")).getID();
assertEquals("Ensure the original queue is not destroyed by checking the binding id is the same", originalBindingId, bindingId);
server.stop();
}
@Test
public void testRedeployAddressQueue() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-address-queues.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-address-queues-updated.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = new Runnable() {
@Override
public void run() {
latch.countDown();
}
};
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
try (JMSContext jmsContext = connectionFactory.createContext()) {
jmsContext.createSharedDurableConsumer(jmsContext.createTopic("config_test_consumer_created_queues"),"mySub").receive(100);
}
try {
latch.await(10, TimeUnit.SECONDS);
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_consumer_created_queues").contains("mySub"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal_no_queue"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_1"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "permanent_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "permanent_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_1"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_change"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(10, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isPurgeOnNoConsumers());
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
//Ensure queues created by clients (NOT by broker.xml are not removed when we reload).
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_consumer_created_queues").contains("mySub"));
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal_no_queue"));
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_1"));
Assert.assertFalse(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "permanent_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "permanent_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_1"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_change"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(1, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isPurgeOnNoConsumers());
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_change_queue"));
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal_queue_1"));
} finally {
embeddedActiveMQ.stop();
}
}
@Test
public void testRedeployChangeQueueRoutingType() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype-updated.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
final ReusableLatch latch = new ReusableLatch(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(latch::countDown);
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616");
try (JMSContext context = connectionFactory.createContext()) {
context.createProducer().send(context.createQueue("myAddress"), "hello");
}
latch.await(10, TimeUnit.SECONDS);
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(latch::countDown);
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
//Ensures the queue isnt detroyed by checking message sent before change is consumable after (e.g. no message loss)
try (JMSContext context = connectionFactory.createContext()) {
Message message = context.createSharedDurableConsumer(context.createTopic("myAddress"), "myQueue").receive();
assertEquals("hello", ((TextMessage) message).getText());
}
} finally {
embeddedActiveMQ.stop();
}
}