下面列出了javax.jms.TopicSession#createTopic ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static void publish() throws JMSException {
// get topic connection
TopicConnectionFactory connectionFactory = new WeEventConnectionFactory(defaultBrokerUrl);
TopicConnection connection = connectionFactory.createTopicConnection();
// start connection
connection.start();
// create session
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// create topic
Topic topic = session.createTopic(topicName);
// create publisher
TopicPublisher publisher = session.createPublisher(topic);
// send message
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(("hello WeEvent").getBytes(StandardCharsets.UTF_8));
publisher.publish(msg);
System.out.print("send done.");
connection.close();
}
private void doTestCreateTopicPublisher(boolean useAnonymousProducers) throws JMSException {
cf.setUseAnonymousProducers(useAnonymousProducers);
JmsPoolConnection connection = (JmsPoolConnection) cf.createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic1 = session.createTopic("Topic-1");
Topic topic2 = session.createTopic("Topic-2");
JmsPoolTopicPublisher publisher1 = (JmsPoolTopicPublisher) session.createPublisher(topic1);
JmsPoolTopicPublisher publisher2 = (JmsPoolTopicPublisher) session.createPublisher(topic2);
if (useAnonymousProducers) {
assertSame(publisher1.getMessageProducer(), publisher2.getMessageProducer());
} else {
assertNotSame(publisher1.getMessageProducer(), publisher2.getMessageProducer());
}
connection.close();
}
@Test(timeout = 60000)
public void testTopicMessageSend() throws Exception {
cf.setMaxConnections(1);
TopicConnection connection = cf.createTopicConnection();
try {
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(getTestName());
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.send(topicSession.createMessage());
assertEquals(1, cf.getNumConnections());
} finally {
connection.close();
cf.stop();
}
}
@Test
public void testNotificationProperties() throws Exception {
try (TopicConnection topicConnection = factory.createTopicConnection()) {
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic notificationsTopic = topicSession.createTopic("activemq.notifications");
TopicSubscriber subscriber = topicSession.createSubscriber(notificationsTopic);
List<Message> receivedMessages = new CopyOnWriteArrayList<>();
subscriber.setMessageListener(receivedMessages::add);
topicConnection.start();
Wait.waitFor(() -> receivedMessages.size() > 0);
Assert.assertTrue(receivedMessages.size() > 0);
for (Message message : receivedMessages) {
assertNotNull(message);
assertNotNull(message.getStringProperty("_AMQ_NotifType"));
}
}
}
private Message collectMessagesFromDurableSubscriptionForOneMinute() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.setClientID(clientID);
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(topicName);
connection.start();
TopicSubscriber subscriber = topicSession.createDurableSubscriber(topic, durableSubName);
LOG.info("About to receive messages");
Message message = subscriber.receive(120000);
subscriber.close();
connection.close();
LOG.info("collectMessagesFromDurableSubscriptionForOneMinute done");
return message;
}
@Test
public void testCreateTopicPublisher() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
TopicConnection connection = factory.createTopicConnection();
assertNotNull(connection);
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Topic topic = session.createTopic(name.getMethodName());
TopicPublisher publisher = session.createPublisher(topic);
assertNotNull(publisher);
TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
assertEquals(0, proxy.getEnqueueCount());
connection.close();
}
private static void subscribe() throws JMSException {
// get topic connection
TopicConnectionFactory connectionFactory = new WeEventConnectionFactory(defaultBrokerUrl);
TopicConnection connection = connectionFactory.createTopicConnection();
// start connection
connection.start();
// create session
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// create topic
Topic topic = session.createTopic(topicName);
// create subscriber
TopicSubscriber subscriber = session.createSubscriber(topic);
// create listener
subscriber.setMessageListener(message -> {
BytesMessage msg = (BytesMessage) message;
try {
byte[] data = new byte[(int) msg.getBodyLength()];
msg.readBytes(data);
System.out.println("received: " + new String(data, StandardCharsets.UTF_8));
} catch (JMSException e) {
e.printStackTrace();
}
});
connection.close();
}
@Test(timeout = 60000)
public void testCreateTopicPublisher() throws Exception {
JmsPoolConnection connection = (JmsPoolConnection) pooledFactory.createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic1 = session.createTopic("Topic-1");
Topic topic2 = session.createTopic("Topic-2");
JmsPoolTopicPublisher publisher1 = (JmsPoolTopicPublisher) session.createPublisher(topic1);
JmsPoolTopicPublisher publisher2 = (JmsPoolTopicPublisher) session.createPublisher(topic2);
assertNotSame(publisher1.getMessageProducer(), publisher2.getMessageProducer());
}
@Test(timeout = 60000)
public void testCreateTopicPublisher() throws Exception {
JmsPoolConnection connection = (JmsPoolConnection) pooledFactory.createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic1 = session.createTopic("Topic-1");
Topic topic2 = session.createTopic("Topic-2");
JmsPoolTopicPublisher publisher1 = (JmsPoolTopicPublisher) session.createPublisher(topic1);
JmsPoolTopicPublisher publisher2 = (JmsPoolTopicPublisher) session.createPublisher(topic2);
assertSame(publisher1.getMessageProducer(), publisher2.getMessageProducer());
connection.close();
}
/**
* <b>JMS 1.0.2</b>
* @return A valid Topic object created either from JNDI lookup or directly from the given session.
*/
public DestinationWrapper<Topic> lookupTopic(String uri, TopicSession session) throws JMSException, NamingException {
if ( usingJNDI ) {
return lookupTopicFromJNDI( uri );
} else {
return new DestinationWrapper<Topic>( uri, session.createTopic( uri ) );
}
}
@Test(timeout = 60000)
public void testTemporarySubscriptionDeleted() throws Exception {
Connection connection = createConnection();
try {
TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getTopicName());
TopicSubscriber myNonDurSub = session.createSubscriber(topic);
assertNotNull(myNonDurSub);
Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString(getTopicName()));
Assert.assertEquals(2, bindingsForAddress.getBindings().size());
session.close();
final CountDownLatch latch = new CountDownLatch(1);
server.getRemotingService().getConnections().iterator().next().addCloseListener(new CloseListener() {
@Override
public void connectionClosed() {
latch.countDown();
}
});
connection.close();
latch.await(5, TimeUnit.SECONDS);
bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString(getTopicName()));
Assert.assertEquals(1, bindingsForAddress.getBindings().size());
} finally {
connection.close();
}
}
private void produceExpiredAndOneNonExpiredMessages() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName);
TopicConnection connection = connectionFactory.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
MessageProducer producer = session.createProducer(topic);
producer.setTimeToLive(TimeUnit.SECONDS.toMillis(1));
for (int i = 0; i < 40000; i++) {
sendRandomMessage(session, producer);
}
producer.setTimeToLive(TimeUnit.DAYS.toMillis(1));
sendRandomMessage(session, producer);
connection.close();
LOG.info("produceExpiredAndOneNonExpiredMessages done");
}
private void registerDurableSubscription() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.setClientID(clientID);
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(topicName);
TopicSubscriber durableSubscriber = topicSession.createDurableSubscriber(topic, durableSubName);
connection.start();
durableSubscriber.close();
connection.close();
LOG.info("Durable Sub Registered");
}
protected void createTestResources() throws Exception {
connection = createTopicConnectionToMockProvider();
TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(_testName.getMethodName());
publisher = session.createPublisher(destination);
publisher.close();
}
protected void createTestResources() throws Exception {
connection = createTopicConnectionToMockProvider();
TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(_testName.getMethodName());
subscriber = session.createSubscriber(destination);
subscriber.close();
}
/**
* Test that the selector applied to the DurableSubscription was successfully
* transferred to the new store, and functions as expected with continued use
* by monitoring message count while sending new messages to the topic and then
* consuming them.
*/
@Test
public void testSelectorDurability() throws Exception
{
TopicConnection connection = getTopicConnection();
try
{
connection.start();
TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(SELECTOR_TOPIC_NAME);
TopicPublisher publisher = session.createPublisher(topic);
int index = ThreadLocalRandom.current().nextInt();
Message messageA = session.createTextMessage("A");
messageA.setIntProperty("ID", index);
messageA.setStringProperty("testprop", "false");
publisher.publish(messageA);
Message messageB = session.createTextMessage("B");
messageB.setIntProperty("ID", index);
messageB.setStringProperty("testprop", "true");
publisher.publish(messageB);
session.commit();
TopicSubscriber subscriber =
session.createDurableSubscriber(topic, SELECTOR_SUB_NAME, "testprop='true'", false);
Message migrated = subscriber.receive(getReceiveTimeout());
assertThat("Failed to receive migrated message", migrated, is(notNullValue()));
Message received = subscriber.receive(getReceiveTimeout());
session.commit();
assertThat("Failed to receive published message", received, is(notNullValue()));
assertThat("Message is not Text message", received, is(instanceOf(TextMessage.class)));
assertThat("Unexpected text", ((TextMessage) received).getText(), is(equalTo("B")));
assertThat("Unexpected index", received.getIntProperty("ID"), is(equalTo(index)));
session.close();
}
finally
{
connection.close();
}
}
/**
* Test that the DurableSubscription without selector was successfully
* transfered to the new store, and functions as expected with continued use.
*/
@Test
public void testDurableSubscriptionWithoutSelector() throws Exception
{
TopicConnection connection = getTopicConnection();
try
{
connection.start();
TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
TopicPublisher publisher = session.createPublisher(topic);
int index = ThreadLocalRandom.current().nextInt();
Message messageA = session.createTextMessage("A");
messageA.setIntProperty("ID", index);
messageA.setStringProperty("testprop", "false");
publisher.publish(messageA);
Message messageB = session.createTextMessage("B");
messageB.setIntProperty("ID", index);
messageB.setStringProperty("testprop", "true");
publisher.publish(messageB);
session.commit();
TopicSubscriber subscriber = session.createDurableSubscriber(topic, SUB_NAME);
Message migrated = subscriber.receive(getReceiveTimeout());
assertThat("Failed to receive migrated message", migrated, is(notNullValue()));
Message receivedA = subscriber.receive(getReceiveTimeout());
session.commit();
assertThat("Failed to receive published message A", receivedA, is(notNullValue()));
assertThat("Message A is not Text message", receivedA, is(instanceOf(TextMessage.class)));
assertThat("Unexpected text for A", ((TextMessage) receivedA).getText(), is(equalTo("A")));
assertThat("Unexpected index", receivedA.getIntProperty("ID"), is(equalTo(index)));
Message receivedB = subscriber.receive(getReceiveTimeout());
session.commit();
assertThat("Failed to receive published message B", receivedB, is(notNullValue()));
assertThat("Message B is not Text message", receivedB, is(instanceOf(TextMessage.class)));
assertThat("Unexpected text for B", ((TextMessage) receivedB).getText(), is(equalTo("B")));
assertThat("Unexpected index for B", receivedB.getIntProperty("ID"), is(equalTo(index)));
session.commit();
session.close();
}
finally
{
connection.close();
}
}
@PostConstruct
@SuppressWarnings("unused")
private void init() throws Exception {
// Init Cache
Configuration c = new Configuration();
c.setName("sessionManager");
manager = CacheManager.create(c);
CacheConfiguration config = new CacheConfiguration();
config.eternal(false).name(CACHE_NAME).maxEntriesLocalHeap(maxEntries).memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU).timeToIdleSeconds(timeToIdle).timeToLiveSeconds(timeToLive);
if (!manager.cacheExists(CACHE_NAME)) {
manager.addCache(new Cache(config));
}
sessions = manager.getCache(CACHE_NAME);
// Init JMS replication
ConnectionFactory factory = new ActiveMQConnectionFactory(this.url);
Connection conn = factory.createConnection();
conn.start();
jmsSession = (TopicSession) conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Topic topic = jmsSession.createTopic(TOPIC_NAME);
tp = jmsSession.createPublisher(topic);
listener = new Thread() { // Thread created once upon container startup
@Override
public void run() {
try {
MessageConsumer consumer = jmsSession.createConsumer(topic);
while (live) {
ObjectMessage msg = (ObjectMessage) consumer.receive();
LOG.debug("Received replication message: {}", msg);
if (PUT.equals(msg.getStringProperty(ACTION_KEY))) {
sessions.put(new Element(msg.getStringProperty(TOKEN_KEY), msg.getObject()));
} else if (REMOVE.equals(msg.getStringProperty(ACTION_KEY))) {
sessions.remove(msg.getStringProperty(TOKEN_KEY));
}
}
} catch (JMSException e) {
LOG.error("Error reading replication message", e);
}
}
};
listener.start();
}