下面列出了javax.jms.TopicSession#createTextMessage ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Topics shouldn't hold on to messages if there are no subscribers
*/
@Test
public void testPersistentMessagesForTopicDropped() throws Exception {
TopicConnection topicConn = createTopicConnection();
TopicSession sess = topicConn.createTopicSession(true, 0);
TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1);
pub.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = sess.createTextMessage("testing123");
pub.publish(m);
sess.commit();
topicConn.close();
checkEmpty(ActiveMQServerTestCase.topic1);
}
/**
* Topics shouldn't hold on to messages when the non-durable subscribers close
*/
@Test
public void testPersistentMessagesForTopicDropped2() throws Exception {
TopicConnection topicConn = createTopicConnection();
topicConn.start();
TopicSession sess = topicConn.createTopicSession(true, 0);
TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1);
TopicSubscriber sub = sess.createSubscriber(ActiveMQServerTestCase.topic1);
pub.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = sess.createTextMessage("testing123");
pub.publish(m);
sess.commit();
// receive but rollback
TextMessage m2 = (TextMessage) sub.receive(3000);
ProxyAssertSupport.assertNotNull(m2);
ProxyAssertSupport.assertEquals("testing123", m2.getText());
sess.rollback();
topicConn.close();
checkEmpty(ActiveMQServerTestCase.topic1);
}
@Test
public void testTempTopicDelete() throws Exception {
connection.start();
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryTopic tempTopic = topicSession.createTemporaryTopic();
ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection();
try {
TopicSession newTopicSession = newConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = newTopicSession.createPublisher(tempTopic);
// need to wait here because the ActiveMQ client's temp destination map is updated asynchronously, not waiting can introduce a race
assertTrue(Wait.waitFor(() -> newConn.activeTempDestinations.size() == 1, 2000, 100));
TextMessage msg = newTopicSession.createTextMessage("Test Message");
publisher.publish(msg);
try {
TopicSubscriber consumer = newTopicSession.createSubscriber(tempTopic);
fail("should have gotten exception but got consumer: " + consumer);
} catch (JMSException ex) {
//correct
}
connection.close();
try {
Message newMsg = newTopicSession.createMessage();
publisher.publish(newMsg);
} catch (JMSException e) {
//ok
}
} finally {
newConn.close();
}
}
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void testMultipleTopicSubscribersOnSameSession(String port,
String adminUsername,
String adminPassword,
String brokerHostname)
throws NamingException, JMSException, InterruptedException {
String queueName = "testMultipleTopicSubscribersOnSameSession";
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withTopic(queueName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, TopicSession.CLIENT_ACKNOWLEDGE);
Topic topic = (Topic) initialContext.lookup(queueName);
int numberOfConsumers = 3;
int messagesPerConsumer = 1000;
int maxNumberOfMessages = numberOfConsumers * messagesPerConsumer;
LinkedBlockingQueue<MessageResult> receiveQueue = new LinkedBlockingQueue<>(maxNumberOfMessages);
TopicSubscriber consumers[] = new TopicSubscriber[numberOfConsumers];
int messageCount[] = new int[numberOfConsumers];
for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
consumers[consumerIndex] = subscriberSession.createSubscriber(topic);
int finalConsumerIndex = consumerIndex;
consumers[consumerIndex].setMessageListener(message -> {
messageCount[finalConsumerIndex]++;
try {
message.acknowledge();
} catch (JMSException e) {
LOGGER.error("Message acknowledging failed.", e);
}
receiveQueue.offer(new MessageResult(message, finalConsumerIndex));
});
}
// publish messages with property.
TopicSession producerSession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
TopicPublisher producer = producerSession.createPublisher(topic);
TextMessage textMessage;
String consumerMessage = "testMessage";
for (int i = 0; i < messagesPerConsumer; i++) {
textMessage = producerSession.createTextMessage(consumerMessage);
producer.send(textMessage);
}
for (int i = 0; i < maxNumberOfMessages; i++) {
MessageResult result = receiveQueue.poll(5, TimeUnit.SECONDS);
if (result == null) {
StringBuilder countSummary = new StringBuilder();
for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
countSummary.append("Consumer ")
.append(consumerIndex)
.append(" received ")
.append(messageCount[consumerIndex])
.append(" messages, ");
}
Assert.fail("Messages stopped receiving after " + i + " iterations. " + countSummary.toString());
} else {
TextMessage textMessage1 = (TextMessage) result.getMessage();
Assert.assertEquals(textMessage1.getText(),
consumerMessage,
"Incorrect message received for consumer " + result.getConsumerId());
}
}
for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
Assert.assertEquals(messageCount[consumerIndex],
messagesPerConsumer,
"Message " + messageCount[consumerIndex]
+ " received for consumer " + consumerIndex + ".");
}
producer.close();
for (int consumerIndex = 0; consumerIndex < numberOfConsumers; consumerIndex++) {
consumers[consumerIndex].close();
}
connection.close();
}
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void testPositiveJMSSelectorConsumerProducer(String port,
String adminUsername,
String adminPassword,
String brokerHostname) throws NamingException, JMSException {
String queueName = "testPositiveJMSSelectorConsumerProducer";
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withTopic(queueName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = (Topic) initialContext.lookup(queueName);
// Subscribe with a selector
String propertyName = "MyProperty";
String propertyValue = "propertyValue";
String jmsPropertySelector = propertyName + " = '" + propertyValue + "'";
TopicSubscriber consumer = subscriberSession.createSubscriber(topic, jmsPropertySelector, false);
// publish messages with property
TopicSession producerSession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
TopicPublisher producer = producerSession.createPublisher(topic);
int numberOfMessages = 100;
for (int i = 0; i < numberOfMessages; i++) {
TextMessage textMessage = producerSession.createTextMessage("Test message " + i);
textMessage.setStringProperty(propertyName, propertyValue);
producer.send(textMessage);
}
// consume messages
for (int i = 0; i < numberOfMessages; i++) {
Message message = consumer.receive(1000);
Assert.assertNotNull(message, "Message #" + i + " was not received");
}
producerSession.close();
connection.close();
}
@Parameters({"broker-port", "admin-username", "admin-password", "broker-hostname"})
@Test
public void testNegativeJMSSelectorConsumerProducer(String port,
String adminUsername,
String adminPassword,
String brokerHostname) throws NamingException, JMSException {
String queueName = "testNegativeJMSSelectorConsumerProducer";
InitialContext initialContext = ClientHelper
.getInitialContextBuilder(adminUsername, adminPassword, brokerHostname, port)
.withTopic(queueName)
.build();
TopicConnectionFactory connectionFactory
= (TopicConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = (Topic) initialContext.lookup(queueName);
// Subscribe with a selector
String propertyName = "MyProperty";
String propertyValue = "propertyValue";
String jmsPropertySelector = propertyName + " = '" + propertyValue + "'";
TopicSubscriber consumer = subscriberSession.createSubscriber(topic, jmsPropertySelector, false);
// publish messages with property
TopicSession producerSession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
TopicPublisher producer = producerSession.createPublisher(topic);
// Send messages with a different property value
int numberOfMessages = 100;
for (int i = 0; i < numberOfMessages; i++) {
TextMessage textMessage = producerSession.createTextMessage("Test message " + i);
textMessage.setStringProperty(propertyName, propertyValue + "-1");
producer.send(textMessage);
}
// consume messages
Message message = consumer.receive(100);
Assert.assertNull(message, "Message received. Shouldn't receive any messages.");
producerSession.close();
connection.close();
}
/**
* Test that the selector applied to the DurableSubscription was successfully
* transferred to the new store, and functions as expected with continued use
* by monitoring message count while sending new messages to the topic and then
* consuming them.
*/
@Test
public void testSelectorDurability() throws Exception
{
TopicConnection connection = getTopicConnection();
try
{
connection.start();
TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(SELECTOR_TOPIC_NAME);
TopicPublisher publisher = session.createPublisher(topic);
int index = ThreadLocalRandom.current().nextInt();
Message messageA = session.createTextMessage("A");
messageA.setIntProperty("ID", index);
messageA.setStringProperty("testprop", "false");
publisher.publish(messageA);
Message messageB = session.createTextMessage("B");
messageB.setIntProperty("ID", index);
messageB.setStringProperty("testprop", "true");
publisher.publish(messageB);
session.commit();
TopicSubscriber subscriber =
session.createDurableSubscriber(topic, SELECTOR_SUB_NAME, "testprop='true'", false);
Message migrated = subscriber.receive(getReceiveTimeout());
assertThat("Failed to receive migrated message", migrated, is(notNullValue()));
Message received = subscriber.receive(getReceiveTimeout());
session.commit();
assertThat("Failed to receive published message", received, is(notNullValue()));
assertThat("Message is not Text message", received, is(instanceOf(TextMessage.class)));
assertThat("Unexpected text", ((TextMessage) received).getText(), is(equalTo("B")));
assertThat("Unexpected index", received.getIntProperty("ID"), is(equalTo(index)));
session.close();
}
finally
{
connection.close();
}
}
/**
* Test that the DurableSubscription without selector was successfully
* transfered to the new store, and functions as expected with continued use.
*/
@Test
public void testDurableSubscriptionWithoutSelector() throws Exception
{
TopicConnection connection = getTopicConnection();
try
{
connection.start();
TopicSession session = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
TopicPublisher publisher = session.createPublisher(topic);
int index = ThreadLocalRandom.current().nextInt();
Message messageA = session.createTextMessage("A");
messageA.setIntProperty("ID", index);
messageA.setStringProperty("testprop", "false");
publisher.publish(messageA);
Message messageB = session.createTextMessage("B");
messageB.setIntProperty("ID", index);
messageB.setStringProperty("testprop", "true");
publisher.publish(messageB);
session.commit();
TopicSubscriber subscriber = session.createDurableSubscriber(topic, SUB_NAME);
Message migrated = subscriber.receive(getReceiveTimeout());
assertThat("Failed to receive migrated message", migrated, is(notNullValue()));
Message receivedA = subscriber.receive(getReceiveTimeout());
session.commit();
assertThat("Failed to receive published message A", receivedA, is(notNullValue()));
assertThat("Message A is not Text message", receivedA, is(instanceOf(TextMessage.class)));
assertThat("Unexpected text for A", ((TextMessage) receivedA).getText(), is(equalTo("A")));
assertThat("Unexpected index", receivedA.getIntProperty("ID"), is(equalTo(index)));
Message receivedB = subscriber.receive(getReceiveTimeout());
session.commit();
assertThat("Failed to receive published message B", receivedB, is(notNullValue()));
assertThat("Message B is not Text message", receivedB, is(instanceOf(TextMessage.class)));
assertThat("Unexpected text for B", ((TextMessage) receivedB).getText(), is(equalTo("B")));
assertThat("Unexpected index for B", receivedB.getIntProperty("ID"), is(equalTo(index)));
session.commit();
session.close();
}
finally
{
connection.close();
}
}
private void sendRandomMessage(TopicSession session, MessageProducer producer) throws JMSException {
TextMessage textMessage = session.createTextMessage();
textMessage.setText(RandomStringUtils.random(500, "abcdefghijklmnopqrstuvwxyz"));
producer.send(textMessage);
}