下面列出了javax.jms.Topic#org.apache.activemq.ActiveMQConnectionFactory 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@BeforeEach
public void before() throws Exception {
connectionFactory = mock(ActiveMQConnectionFactory.class);
producer = mock(MessageProducer.class);
session = mock(Session.class);
connection = mock(Connection.class);
destination = mock(Destination.class);
message = mock(BytesMessage.class);
when(connectionFactory.createConnection()).thenReturn(connection);
when(connection.createSession(anyBoolean(), anyInt())).thenReturn(session);
when(session.createProducer(null)).thenReturn(producer);
when(session.createBytesMessage()).thenReturn(message);
serializationSchema = new SimpleStringSchema();
AMQSinkConfig<String> config = new AMQSinkConfig.AMQSinkConfigBuilder<String>()
.setConnectionFactory(connectionFactory)
.setDestinationName(DESTINATION_NAME)
.setSerializationSchema(serializationSchema)
.build();
amqSink = new AMQSink<>(config);
amqSink.open(new Configuration());
}
@Override
public ConnectionFactory create(Map<String, String> properties) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
if (properties.containsKey("brokerUrl")) {
connectionFactory.setBrokerURL(properties.get("brokerUrl"));
}
if (properties.containsKey("username")) {
connectionFactory.setUserName(properties.get("username"));
}
if (properties.containsKey("password")) {
connectionFactory.setPassword(properties.get("password"));
}
return connectionFactory;
}
@Before
public void setUp() throws Exception {
localConnectionFactory = createLocalConnectionFactory();
foreignConnectionFactory = createForeignConnectionFactory();
outbound = new ActiveMQTopic("RECONNECT.TEST.OUT.TOPIC");
inbound = new ActiveMQTopic("RECONNECT.TEST.IN.TOPIC");
jmsTopicConnector = new SimpleJmsTopicConnector();
// Wire the bridges.
jmsTopicConnector.setOutboundTopicBridges(new OutboundTopicBridge[]{new OutboundTopicBridge("RECONNECT.TEST.OUT.TOPIC")});
jmsTopicConnector.setInboundTopicBridges(new InboundTopicBridge[]{new InboundTopicBridge("RECONNECT.TEST.IN.TOPIC")});
// Tell it how to reach the two brokers.
jmsTopicConnector.setOutboundTopicConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61617"));
jmsTopicConnector.setLocalTopicConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616"));
}
public void testForceBrokerRestart() throws Exception {
brokerService.stop();
brokerService.waitUntilStopped();
brokerService.start(true); // force restart
brokerService.waitUntilStarted();
LOG.info("try and connect to restarted broker");
//send and receive a message from a restarted broker
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61636");
Connection conn = factory.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
Destination dest = new ActiveMQQueue("test");
MessageConsumer consumer = sess.createConsumer(dest);
MessageProducer producer = sess.createProducer(dest);
producer.send(sess.createTextMessage("test"));
TextMessage msg = (TextMessage) consumer.receive(1000);
assertEquals("test", msg.getText());
}
@Override
protected void setUp() throws Exception {
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE);
System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE);
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
TransportConnector connector = broker.addConnector("nio+ssl://localhost:0?transport.needClientAuth=true");
broker.start();
broker.waitUntilStarted();
messageData = new byte[MESSAGE_SIZE];
for (int i = 0; i < MESSAGE_SIZE; i++) {
messageData[i] = (byte) (i & 0xff);
}
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("nio+ssl://localhost:" + connector.getConnectUri().getPort());
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
}
@Override
public void run() {
Session session = null;
Connection connection = null;
try {
ConnectionFactory factory = new ActiveMQConnectionFactory(USER_NAME, PASSWORD, brokenUrl);
connection = factory.createConnection();
connection.start();
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test");
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.receive();
session.close();
connection.close();
} catch (Exception ex) {
logger.error(ex);
try {
session.close();
connection.close();
} catch (JMSException e) {
logger.error(e);
}
}
}
@BeforeEach
public void setUp(BrokerService brokerService) {
fileSystem = new ActiveMQMailQueueBlobTest.MyFileSystem();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
FileSystemBlobTransferPolicy policy = new FileSystemBlobTransferPolicy();
policy.setFileSystem(fileSystem);
policy.setDefaultUploadUrl(BASE_DIR);
connectionFactory.setBlobTransferPolicy(policy);
RawMailQueueItemDecoratorFactory mailQueueItemDecoratorFactory = new RawMailQueueItemDecoratorFactory();
RecordingMetricFactory metricFactory = new RecordingMetricFactory();
NoopGaugeRegistry gaugeRegistry = new NoopGaugeRegistry();
mailQueueFactory = new ActiveMQMailQueueFactory(connectionFactory, mailQueueItemDecoratorFactory, metricFactory, gaugeRegistry);
mailQueueFactory.setUseJMX(false);
mailQueueFactory.setUseBlobMessages(true);
}
@Override
public void executeApp() throws Exception {
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("a queue");
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new TestMessageListener());
MessageProducer producer = session.createProducer(queue);
Message message = session.createMessage();
producer.send(message);
SECONDS.sleep(1);
connection.close();
}
public void runListenerServer() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActiveMQUrl() + "?jms.prefetchPolicy.all=1");
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
Destination queue = session.createQueue(config.getActiveMQListenQueueName());
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(this);
LOGGER.warn("Server is now listening for build ids");
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
@Before
public void startBroker() throws Exception {
broker = createBroker();
TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0");
broker.deleteAllMessages();
broker.start();
broker.waitUntilStarted();
PolicyEntry policy = new PolicyEntry();
policy.setMaxPageSize(maxPageSize);
broker.setDestinationPolicy(new PolicyMap());
broker.getDestinationPolicy().setDefaultEntry(policy);
connectUri = connector.getConnectUri();
factory = new ActiveMQConnectionFactory(connectUri);
}
protected Connection createConnection(int brokerId) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:" + (60000 + brokerId));
connectionFactory.setOptimizedMessageDispatch(true);
connectionFactory.setCopyMessageOnSend(false);
connectionFactory.setUseCompression(false);
connectionFactory.setDispatchAsync(true);
connectionFactory.setUseAsyncSend(false);
connectionFactory.setOptimizeAcknowledge(false);
connectionFactory.setWatchTopicAdvisories(false);
ActiveMQPrefetchPolicy qPrefetchPolicy = new ActiveMQPrefetchPolicy();
qPrefetchPolicy.setQueuePrefetch(100);
qPrefetchPolicy.setTopicPrefetch(1000);
connectionFactory.setPrefetchPolicy(qPrefetchPolicy);
connectionFactory.setAlwaysSyncSend(true);
return connectionFactory.createConnection();
}
public void create(String clientId, String topicName)
throws JMSException {
this.clientId = clientId;
// create a Connection Factory
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
// create a Connection
connection = connectionFactory.createConnection();
connection.setClientID(clientId);
// create a Session
session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create the Topic to which messages will be sent
Topic topic = session.createTopic(topicName);
// create a MessageProducer for sending messages
messageProducer = session.createProducer(topic);
}
@Test
public void testUpdateUris() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?useExponentialBackOff=false");
ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
try {
connection.start();
FailoverTransport failoverTransport = connection.getTransport().narrow(FailoverTransport.class);
URI[] bunchOfUnknownAndOneKnown = new URI[]{new URI("tcp://unknownHost:" + tcpUri.getPort()), new URI("tcp://unknownHost2:" + tcpUri.getPort()), new URI("tcp://localhost:2222")};
failoverTransport.add(false, bunchOfUnknownAndOneKnown);
} finally {
if (connection != null) {
connection.close();
}
}
}
@Override
protected CamelContext createCamelContext() throws Exception {
// create CamelContext
CamelContext camelContext = super.createCamelContext();
// connect to embedded ActiveMQ JMS broker
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("vm://localhost");
camelContext.addComponent("jms",
JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
// setup the properties component to use the test file
PropertiesComponent prop = camelContext.getComponent("properties", PropertiesComponent.class);
prop.setLocation("classpath:rider-test.properties");
return camelContext;
}
private ActiveMQBytesMessage receiveTestBytesMessage(ActiveMQConnectionFactory factory) throws JMSException, UnsupportedEncodingException {
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
ActiveMQBytesMessage rc = (ActiveMQBytesMessage) consumer.receive();
connection.close();
return rc;
}
private ActiveMQConnectionFactory createConnectionFactory(BrokerService brokerService) throws Exception {
String target = brokerService.getTransportConnectors().get(0).getPublishableConnectString();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(target);
factory.setWatchTopicAdvisories(false);
factory.setConnectionIDPrefix(connectionIdMarker + brokerService.getBrokerName());
return factory;
}
@Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setAll(10);
connectionFactory.setPrefetchPolicy(prefetchPolicy);
connectionFactory.setWatchTopicAdvisories(false);
return connectionFactory;
}
@Override
public void setUp() throws Exception {
brokerService = createBroker();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
activeMQConnectionFactory.setWatchTopicAdvisories(true);
connection = activeMQConnectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = new ActiveMQTopic(TOPIC_NAME);
producer = session.createProducer(destination);
connection.start();
}
@Before
public void setUp() throws Exception {
broker = createBroker();
broker.start();
broker.waitUntilStarted();
factory = new ActiveMQConnectionFactory(tcpUri);
factory.setUseCompression(true);
connection = (ActiveMQConnection) factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue("CompressionTestQueue");
}
@Test
public void testMixedOpenWireExample2() throws Exception {
Connection conn1 = null;
SimpleString durableQueue = new SimpleString("exampleQueue");
this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST));
Queue queue = ActiveMQJMSClient.createQueue("exampleQueue");
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory artemisCF = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
conn1 = artemisCF.createConnection();
conn1.start();
Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session1.createProducer(queue);
for (int i = 0; i < 10; i++) {
TextMessage message = session1.createTextMessage("This is a text message");
producer.send(message);
}
ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory();
Connection conn2 = openCF.createConnection();
Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn2.start();
MessageConsumer messageConsumer = sess2.createConsumer(sess2.createQueue("exampleQueue"));
for (int i = 0; i < 10; i++) {
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
assertEquals("This is a text message", messageReceived.getText());
}
conn1.close();
conn2.close();
}
ActiveMqManager(String host, int port, String username, String password) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://" + host + ":" + port);
connectionFactory.setUserName(username);
connectionFactory.setPassword(password);
connection = connectionFactory.createConnection();
connection.start();
}
private void sendActiveMq(String queue, String text) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(
properties.getUsername(),
properties.getPassword(),
String.format("tcp://%s:%d", properties.getHost(), properties.getPort())
);
sendJms(queue, text, factory);
}
@Test
public void testWildcard() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection conn = factory.createQueueConnection("*", "sunflower");
try {
conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
e.printStackTrace();
return;
}
fail("Should have failed connecting");
}
@Test
public void testQueueConsumer() throws Exception {
//1、创建一个连接工厂对象,指定服务IP和端口
// 这里的端口不是8161,而是ActiveMQ服务端口,默认为61616
String brokerURL = "tcp://192.168.30.155:61616";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
//2、使用工厂对象创建Collection对象
Connection connection = connectionFactory.createConnection();
//3、开启连接,调用Collection.start()
connection.start();
//4、创建Session对象
// 参数1:是否开启事务,如果为true,参数2无效
// 参数2:应答模式,自动应答/手动应答,自动应答即可
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用Session对象创建Destination对象(queue或topic)
Queue queue = session.createQueue("test-queue");
//6、使用Session对象创建一个Consumer对象
MessageConsumer consumer = session.createConsumer(queue);
//7、接收消息
consumer.setMessageListener(message -> {
try {
TextMessage msg = (TextMessage) message;
System.out.println("接收到消息:" + msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
//阻塞程序,避免结束
System.in.read();
//8、关闭资源
consumer.close();
session.close();
connection.close();
}
@BeforeClass
public static void startActiveMQ() throws JMSException {
final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
@Test
public void test() throws JMSException {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
JmsAppender appender = new JmsAppender();
appender.connectionFactory = cf;
Dictionary<String, Object> config = new Hashtable<>();
config.put("message.type", "map");
appender.activate(config);
Connection con = cf.createConnection();
con.start();
Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = sess.createConsumer(sess.createQueue("decanter"));
Map<String, Object> props = new HashMap<String, Object>();
props.put("timestamp", 1l);
props.put("string", "test");
props.put("boolean", true);
props.put("integer", 1);
props.put("testnull", null);
props.put("map", new HashMap<String, String>());
appender.handleEvent(new Event("decanter/collect", props));
MapMessage message = (MapMessage)consumer.receive(1000);
consumer.close();
sess.close();
con.close();
Assert.assertEquals(1l, message.getObject("timestamp"));
Assert.assertEquals("test", message.getObject("string"));
Assert.assertEquals(true, message.getObject("boolean"));
Assert.assertEquals(1, message.getObject("integer"));
Object map = message.getObject("map");
Assert.assertTrue(map instanceof Map);
}
/**
* This is used to test the submit method
*
* @return String text received from the submit queue
*/
public String receiveBuildFromQueue() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(config.getActiveMQUrl());
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue(config.getActiveMQSubmitQueueName());
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message message = consumer.receive(1000);
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
consumer.close();
session.close();
connection.close();
return text;
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
public static ConnectionFactory createConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider) throws JMSException {
switch (jmsProvider) {
case ACTIVEMQ_PROVIDER: {
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
factory.setSendTimeout(timeoutMillis);
return factory;
}
default:
throw new IllegalArgumentException("Unknown JMS Provider: " + jmsProvider);
}
}
@Test
public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
final String destinationName = "fooQueue";
PublishJMS pubProc = new PublishJMS();
TestRunner runner = TestRunners.newTestRunner(pubProc);
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(cf);
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(PublishJMS.DESTINATION, destinationName);
Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "foo");
attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
assertNotNull(successFF);
JmsTemplate jmst = new JmsTemplate(cf);
BytesMessage message = (BytesMessage) jmst.receive(destinationName);
byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message);
assertEquals("Hey dude!", new String(messageBytes));
assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName());
assertEquals("foo", message.getStringProperty("foo"));
}
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
} else {
System.out.println("Received: " + message);
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}