下面列出了javax.jms.Connection#createSession ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void sendBytesMessageUsingCoreJms(String queueName, byte[] data) throws Exception {
Connection jmsConn = null;
try {
jmsConn = coreCf.createConnection();
Session session = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(data);
bytesMessage.writeBoolean(true);
bytesMessage.writeLong(99999L);
bytesMessage.writeChar('h');
bytesMessage.writeInt(987);
bytesMessage.writeShort((short) 1099);
bytesMessage.writeUTF("hellobytes");
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.send(bytesMessage);
} finally {
if (jmsConn != null) {
jmsConn.close();
}
}
}
@Test
public void testAutoCreateOnDurableSubscribeToTopic() throws Exception {
Connection connection = cf.createConnection();
connection.setClientID("myClientID");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic topic = ActiveMQJMSClient.createTopic(QUEUE_NAME);
MessageConsumer consumer = session.createDurableConsumer(topic, "myDurableSub");
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("msg"));
connection.start();
assertNotNull(consumer.receive(500));
connection.close();
assertNotNull(server.getManagementService().getResource(ResourceNames.ADDRESS + "test"));
assertNotNull(server.locateQueue(SimpleString.toSimpleString("myClientID.myDurableSub")));
}
@Test(timeout = 30000)
public void testSendReceive() throws Exception {
Connection connection = createConnection(fullUser, fullPass);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
MessageProducer p = session.createProducer(queue);
TextMessage message = null;
message = session.createTextMessage();
String messageText = "hello sent at " + new java.util.Date().toString();
message.setText(messageText);
p.send(message);
// Get the message we just sent
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Message msg = consumer.receive(5000);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
TextMessage textMessage = (TextMessage) msg;
assertEquals(messageText, textMessage.getText());
} finally {
connection.close();
}
}
@Test
public void testCreateProducerOnInexistentDestination() throws Exception {
getJmsServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(false));
getJmsServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false));
Connection pconn = createConnection();
try {
Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
ps.createProducer(ActiveMQJMSClient.createTopic("NoSuchTopic"));
ProxyAssertSupport.fail("should throw exception");
} catch (InvalidDestinationException e) {
// OK
}
} finally {
pconn.close();
}
}
@Test
public void testActiveTTL() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
long timeToLiveMillis = getReceiveTimeout() * 2;
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(queue);
producer.setTimeToLive(timeToLiveMillis);
producer.send(session.createTextMessage("A"));
producer.setTimeToLive(0);
producer.send(session.createTextMessage("B"));
session.commit();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Message message = consumer.receive(getReceiveTimeout());
assertTrue("TextMessage should be received", message instanceof TextMessage);
assertEquals("Unexpected message received", "A", ((TextMessage) message).getText());
Thread.sleep(timeToLiveMillis);
session.rollback();
message = consumer.receive(getReceiveTimeout());
assertTrue("TextMessage should be received after waiting for TTL", message instanceof TextMessage);
assertEquals("Unexpected message received after waiting for TTL", "B", ((TextMessage) message).getText());
}
finally
{
connection.close();
}
}
@Test
public void testRedeliveryPropertyWithNoRollback() throws Exception {
final int numMessages = 1;
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
Connection connection = connectionFactory.createConnection();
connection.start();
populateDestination(numMessages, destinationName, connection);
connection.close();
{
AtomicInteger received = new AtomicInteger();
final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries();
while (received.get() < maxRetries) {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(destinationName);
MessageConsumer consumer = session.createConsumer(destination);
TextMessage msg = (TextMessage) consumer.receive(2000);
if (msg != null) {
LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());
assertEquals("redelivery property matches deliveries", received.get(), msg.getLongProperty("JMSXDeliveryCount"));
}
session.close();
connection.close();
}
connection = connectionFactory.createConnection();
connection.start();
consumeMessage(connection, maxRetries + 1);
}
}
/**
* Tests that lack of the absolute-expiry-time and ttl fields on a message results
* in it returning 0 for for JMSExpiration
*
* @throws Exception if an error occurs during the test.
*/
@Test(timeout = 20000)
public void testReceivedMessageFromQueueWithNoAbsoluteExpiryOrTtlReturnsJMSExpirationZero() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
PropertiesDescribedType props = new PropertiesDescribedType();
props.setMessageId("myMessageIDString");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, amqpValueNullContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message receivedMessage = messageConsumer.receive(3000);
testPeer.waitForAllHandlersToComplete(3000);
assertNotNull(receivedMessage);
assertEquals(0L, receivedMessage.getJMSExpiration());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Test
public void testSendingToExchangePattern() throws Exception
{
updateAutoCreationPolicies();
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Topic topic = session.createTopic(getDestinationAddress("barExchange/foo", TYPE_TOPIC));
final MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage(TEST_MESSAGE));
final MessageConsumer consumer = session.createConsumer(topic);
Message received = consumer.receive(getReceiveTimeout() / 4);
assertNull(received);
producer.send(session.createTextMessage("Hello world2!"));
received = consumer.receive(getReceiveTimeout());
assertNotNull(received);
assertTrue(received instanceof TextMessage);
assertEquals("Hello world2!", ((TextMessage) received).getText());
}
finally
{
connection.close();
}
}
private void testAutoCreate(int protocol) throws Throwable {
final SimpleString thisQueue = SimpleString.toSimpleString("ThisQueue");
if (!isNetty()) {
// no need to run the test, there's no AMQP support
return;
}
for (int i = 0; i < 10; i++) {
ConnectionFactory factorySend = createFactory(protocol);
Connection connection = factorySend.createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(thisQueue.toString());
MessageProducer producer = session.createProducer(queue);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
producer.send(session.createTextMessage("hello"));
Assert.assertNotNull(consumer.receive(5000));
consumer.close();
session.close();
} finally {
connection.close();
}
Wait.waitFor(() -> server.getAddressInfo(thisQueue) == null, 1000, 10);
assertNull(server.getAddressInfo(thisQueue));
assertEquals(0, server.getTotalMessageCount());
}
}
private void consumeIgnoringLastSeenOmission(final Connection connection,
final Queue testQueue,
int fromIndex,
int toIndex,
int consumerLastSeenMessageIndex)
throws JMSException
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(testQueue);
int expectedIndex = fromIndex;
while (expectedIndex < toIndex)
{
Message message = consumer.receive(getReceiveTimeout());
if (message == null && consumerLastSeenMessageIndex + 1 == toIndex)
{
// this is a corner case when one remaining message is expected
// but it was already received previously, Commit was sent
// and broker successfully committed and sent back CommitOk
// but CommitOk did not reach client due to abrupt disconnect
LOGGER.debug( "Broker transaction was completed for message {}"
+ " but there was no network to notify client about its completion.",
consumerLastSeenMessageIndex);
}
else
{
assertNotNull("Expected message with index " + expectedIndex + " but got null", message);
int messageIndex = message.getIntProperty(Utils.INDEX);
LOGGER.debug("Received message with index {}, expected index is {}", messageIndex, expectedIndex);
if (messageIndex != expectedIndex
&& expectedIndex == fromIndex
&& messageIndex == consumerLastSeenMessageIndex + 1)
{
LOGGER.debug("Broker transaction was completed for message {}"
+ " but there was no network to notify client about its completion.",
consumerLastSeenMessageIndex);
expectedIndex = messageIndex;
}
assertEquals("Unexpected message index", expectedIndex, messageIndex);
}
expectedIndex++;
}
session.close();
}
@Test(timeout = 10000)
public void testInterleavedCompletionsReturnedInOrder() throws Exception {
final int MESSAGE_COUNT = 3;
final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
JmsConnectionFactory factory = new JmsConnectionFactory(
"mock://localhost?mock.delayCompletionCalls=true");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination destination = new JmsQueue("explicitDestination");
JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
final MyCompletionListener listener = new MyCompletionListener();
sendMessages(MESSAGE_COUNT, producer, listener);
assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
}
}));
List<JmsOutboundMessageDispatch> pending = remotePoor.getPendingCompletions(destination);
assertEquals(MESSAGE_COUNT, pending.size());
Collections.reverse(pending);
for (JmsOutboundMessageDispatch envelope : pending) {
int sequence = envelope.getMessage().getIntProperty("sequence");
if (sequence % 2 == 0) {
LOG.info("Trigger completion of message: {}", envelope.getMessage().getJMSMessageID());
remotePoor.completePendingSend(envelope);
} else {
LOG.info("Trigger failure of message: {}", envelope.getMessage().getJMSMessageID());
remotePoor.failPendingSend(envelope, new ProviderException("Failed to send message"));
}
}
assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return listener.getCombinedSends().size() == MESSAGE_COUNT;
}
}));
assertTotalCompletionOrder(MESSAGE_COUNT, listener);
connection.close();
}
@Test
public void testAutoDeleteTopicDefaultDurableSubscriptionQueue() throws Exception {
ConnectionFactory fact = getCF();
Connection connection = fact.createConnection();
connection.start();
try {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
String testQueueName = getName();
String sub = testQueueName + "/mysub";
Topic topic = session.createTopic(testQueueName);
assertEquals(testQueueName, topic.getTopicName());
MessageConsumer consumer = session.createSharedDurableConsumer(topic, sub);
QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(sub));
assertFalse(queueBinding.getQueue().isAutoDelete());
Wait.assertEquals(0, queueBinding.getQueue()::getMessageCount);
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("hello1"));
producer.send(session.createTextMessage("hello2"));
Message message = consumer.receive(5000);
assertNotNull(message);
assertEquals("hello1", ((TextMessage)message).getText());
message.acknowledge();
consumer.close();
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(sub));
assertNotNull(queueBinding);
consumer = session.createSharedDurableConsumer(topic, sub);
message = consumer.receive(5000);
assertNotNull(message);
assertEquals("hello2", ((TextMessage)message).getText());
message.acknowledge();
consumer.close();
//Wait longer than scan period.
Thread.sleep(20);
queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(sub));
assertNotNull(queueBinding);
} finally {
connection.close();
}
}
/**
* Tests if acknowledging a messages works correctly with session joining. Steps are,
* 1. Publish two messages to two queues using two non-transacted sessions
* 2. Create two distributed transaction sessions and join one session to other.
* 3. Receive messages and ack using two sessions.
* 4. Commit the session
* 5. Subscribe to the published queue and see if any message is received.
*/
@Test(groups = { "wso2.mb", "dtx" })
public void xaStartJoinMessageAckTestCase()
throws NamingException, JMSException, XAException, XPathExpressionException {
String queueNameOne = "DtxStartPositiveTestCaseXaStartJoinMessageAckTestCaseOne";
String queueNameTwo = "DtxStartPositiveTestCaseXaStartJoinMessageAckTestCaseTwo";
InitialContext initialContext = JMSClientHelper.createInitialContextBuilder("admin", "admin", "localhost",
getAMQPPort()).withQueue(queueNameOne).build();
ConnectionFactory nonXaConnectionFactory = (ConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_CONNECTION_FACTORY);
Connection nonXaQueueConnection = nonXaConnectionFactory.createConnection();
nonXaQueueConnection.start();
Session nonXaQueueSessionOne = nonXaQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination xaTestQueueOne = nonXaQueueSessionOne.createQueue(queueNameOne);
Destination xaTestQueueTwo = nonXaQueueSessionOne.createQueue(queueNameTwo);
MessageProducer nonXaQueueSessionProducerOne = nonXaQueueSessionOne.createProducer(xaTestQueueOne);
MessageProducer nonXaQueueSessionProducerTwo = nonXaQueueSessionOne.createProducer(xaTestQueueTwo);
nonXaQueueSessionProducerOne.send(nonXaQueueSessionOne.createTextMessage("Message 1"));
nonXaQueueSessionProducerTwo.send(nonXaQueueSessionOne.createTextMessage("Message 2"));
nonXaQueueSessionProducerOne.close();
nonXaQueueSessionProducerTwo.close();
XAConnectionFactory xaConnectionFactory = (XAConnectionFactory) initialContext
.lookup(JMSClientHelper.QUEUE_XA_CONNECTION_FACTORY);
// Create XA resource one
XAConnection xaConnectionOne = xaConnectionFactory.createXAConnection();
xaConnectionOne.start();
XASession xaSessionOne = xaConnectionOne.createXASession();
XAResource xaResourceOne = xaSessionOne.getXAResource();
Session sessionOne = xaSessionOne.getSession();
MessageConsumer xaConsumerOne = sessionOne.createConsumer(xaTestQueueOne);
// Create XA resource two
XAConnection xaConnectionTwo = xaConnectionFactory.createXAConnection();
xaConnectionTwo.start();
XASession xaSessionTwo = xaConnectionTwo.createXASession();
XAResource xaResourceTwo = xaSessionTwo.getXAResource();
Session sessionTwo = xaSessionTwo.getSession();
MessageConsumer xaConsumerTwo = sessionTwo.createConsumer(xaTestQueueTwo);
Xid xid = JMSClientHelper.getNewXid();
boolean sameRM = xaResourceOne.isSameRM(xaResourceTwo);
Assert.assertEquals(sameRM, true, "Resource one and resource two are connected to different resource "
+ "managers");
xaResourceOne.start(xid, XAResource.TMNOFLAGS);
xaResourceTwo.start(xid, XAResource.TMJOIN);
Message receivedMessageForQueueOne = xaConsumerOne.receive(5000);
Assert.assertNotNull(receivedMessageForQueueOne, "A message was not received for queue " + queueNameOne);
Message receivedMessageForQueueTwo = xaConsumerTwo.receive(5000);
Assert.assertNotNull(receivedMessageForQueueTwo, "A message was not received for queue " + queueNameTwo);
xaResourceOne.end(xid, XAResource.TMSUCCESS);
xaResourceOne.prepare(xid);
xaResourceOne.commit(xid, false);
xaConnectionOne.close();
xaConnectionTwo.close();
// subscribe and see if the message is received
MessageConsumer nonXaConsumerOne = nonXaQueueSessionOne.createConsumer(xaTestQueueOne);
Session nonXaQueueSessionTwo = nonXaQueueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer nonXaConsumerTwo = nonXaQueueSessionTwo.createConsumer(xaTestQueueTwo);
// wait 3 seconds
receivedMessageForQueueOne = nonXaConsumerOne.receive(3000);
Assert.assertNull(receivedMessageForQueueOne, "Message received after committing for queue " + queueNameOne);
receivedMessageForQueueTwo = nonXaConsumerTwo.receive(3000);
Assert.assertNull(receivedMessageForQueueTwo, "Message received after committing for queue " + queueNameTwo);
nonXaQueueConnection.close();
}
@Test
public void testClusteredQueue() throws Exception {
Connection conn1 = openWireCf1.createConnection();
Connection conn2 = openWireCf2.createConnection();
conn1.start();
conn2.start();
try {
Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue targetQueue1 = session1.createQueue(QUEUE_NAME);
Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue targetQueue2 = session2.createQueue(QUEUE_NAME);
this.waitForBindings(servers[0], QUEUE_NAME, true, 1, 0, 2000);
this.waitForBindings(servers[1], QUEUE_NAME, true, 1, 0, 2000);
this.waitForBindings(servers[1], QUEUE_NAME, false, 1, 0, 2000);
this.waitForBindings(servers[0], QUEUE_NAME, false, 1, 0, 2000);
MessageProducer prod1 = session1.createProducer(targetQueue1);
MessageConsumer cons2 = session2.createConsumer(targetQueue2);
this.waitForBindings(servers[0], QUEUE_NAME, false, 1, 1, 2000);
this.waitForBindings(servers[1], QUEUE_NAME, true, 1, 1, 2000);
TextMessage msg = session1.createTextMessage("hello");
prod1.send(msg);
Wait.assertTrue(() -> getServer(1).locateQueue(SimpleString.toSimpleString(QUEUE_NAME)).getMessageCount() == 1, 5000, 100);
TextMessage msgReceived = (TextMessage) cons2.receive(5000);
assertNotNull(msgReceived);
assertEquals(msgReceived.getText(), msg.getText());
} finally {
conn1.close();
conn2.close();
}
}
public void run() {
try {
running = true;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
Connection connection = connectionFactory.createConnection();
if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
connection.setClientID(clientId);
}
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ackMode);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
replyProducer = session.createProducer(null);
replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageConsumer consumer = null;
if (durable && topic) {
consumer = session.createDurableSubscriber((Topic) destination, consumerName);
} else {
consumer = session.createConsumer(destination);
}
if (maxiumMessages > 0) {
consumeMessagesAndClose(connection, session, consumer);
} else {
if (receiveTimeOut == 0) {
consumer.setMessageListener(this);
} else {
consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);
}
}
} catch (Exception e) {
System.out.println("[" + this.getName() + "] Caught: " + e);
e.printStackTrace();
}
}
@Test(timeout = 20000)
public void testRecoveredClientAckSessionWithDurableSubscriber() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, false, "?jms.clientID=myClientId", null, null, false);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(Session.CLIENT_ACKNOWLEDGE);
String subscriptionName = "mySubName";
String topicName = "myTopic";
Topic topic = session.createTopic(topicName);
int msgCount = 3;
testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), msgCount, false, false,
Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(msgCount)), 1, false, true);
MessageConsumer subscriber = session.createDurableConsumer(topic, subscriptionName);
TextMessage receivedTextMessage = null;
assertNotNull("Expected a message", receivedTextMessage = (TextMessage) subscriber.receive(3000));
assertEquals("Unexpected delivery number", 1, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
assertNotNull("Expected a message", receivedTextMessage = (TextMessage) subscriber.receive(3000));
assertEquals("Unexpected delivery number", 2, receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1);
session.recover();
assertNotNull("Expected a message", receivedTextMessage = (TextMessage) subscriber.receive(3000));
int deliveryNumber = receivedTextMessage.getIntProperty(TestAmqpPeer.MESSAGE_NUMBER) + 1;
assertEquals("Unexpected delivery number", 1, deliveryNumber);
testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
receivedTextMessage.acknowledge();
testPeer.expectDisposition(true, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), 2, 2);
testPeer.expectDetach(false, true, false);
testPeer.expectDisposition(true, new ReleasedMatcher(), 3, 3);
subscriber.close();
testPeer.waitForAllHandlersToComplete(1000);
testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
testPeer.expectDetach(true, true, true);
session.unsubscribe(subscriptionName);
testPeer.expectClose();
connection.close();
}
}
public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
bridgeBrokers(SPOKE, HUB);
startAllBrokers();
// Setup connection
URI hubURI = brokers.get(HUB).broker.getVmConnectorURI();
URI spokeURI = brokers.get(SPOKE).broker.getVmConnectorURI();
ActiveMQConnectionFactory facHub = new ActiveMQConnectionFactory(hubURI);
ActiveMQConnectionFactory facSpoke = new ActiveMQConnectionFactory(spokeURI);
Connection conHub = facHub.createConnection();
Connection conSpoke = facSpoke.createConnection();
conHub.setClientID("clientHUB");
conSpoke.setClientID("clientSPOKE");
conHub.start();
conSpoke.start();
Session sesHub = conHub.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sesSpoke = conSpoke.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO");
String consumerName = "consumerName";
// Setup consumers
MessageConsumer remoteConsumer = sesSpoke.createDurableSubscriber(topic, consumerName);
remoteConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
try {
TextMessage textMsg = (TextMessage) msg;
receivedMsgs++;
LOG.info("Received messages (" + receivedMsgs + "): " + textMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// allow subscription information to flow back to Spoke
sleep(1000);
// Setup producer
MessageProducer localProducer = sesHub.createProducer(topic);
localProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// Send messages
for (int i = 0; i < MESSAGE_COUNT; i++) {
sleep(50);
if (i == 50 || i == 150) {
if (simulateStalledNetwork) {
socketProxy.pause();
} else {
socketProxy.close();
}
networkDownTimeStart = System.currentTimeMillis();
} else if (networkDownTimeStart > 0) {
// restart after NETWORK_DOWN_TIME seconds
sleep(NETWORK_DOWN_TIME);
networkDownTimeStart = 0;
if (simulateStalledNetwork) {
socketProxy.goOn();
} else {
socketProxy.reopen();
}
} else {
// slow message production to allow bridge to recover and limit message duplication
sleep(500);
}
Message test = sesHub.createTextMessage("test-" + i);
localProducer.send(test);
}
LOG.info("waiting for messages to flow");
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return receivedMsgs >= MESSAGE_COUNT;
}
});
assertTrue("At least message " + MESSAGE_COUNT +
" must be received, count=" + receivedMsgs, MESSAGE_COUNT <= receivedMsgs);
brokers.get(HUB).broker.deleteAllMessages();
brokers.get(SPOKE).broker.deleteAllMessages();
conHub.close();
conSpoke.close();
}
public void runSimple() throws Exception {
deleteDirectory(new File("./target/server"));
ActiveMQServer server = createServer("./target/server");
try {
server.start();
ConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession();
MessageProducer producer = session.createProducer(session.createQueue("queue"));
for (int i = 0; i < 500; i++) {
producer.send(session.createTextMessage("text"));
}
} finally {
server.stop();
}
}
@Test
@HttpRequestConfig(useVirtualHostAsHost = false)
public void brokerStatistics() throws Exception
{
String logDownloadUrl = configureLogger(false);
Connection conn = getConnection();
try
{
final Map<String, Object> args1 = new HashMap<>();
args1.put("name", "qpid.broker.statisticsReportPattern");
args1.put("value", "messagesIn=${messagesIn}");
getHelper().submitRequest("broker/setContextVariable", "POST", args1, SC_OK);
final Map<String, Object> attrs = Collections.singletonMap(Broker.STATISTICS_REPORTING_PERIOD, STATISTICS_REPORTING_PERIOD_IN_SEC);
getHelper().submitRequest("broker", "POST", attrs, SC_OK);
getBrokerAdmin().createQueue(QUEUE1_NAME);
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
assertThat("Pre-enqueue statistics report not found",
countLogFileMatches(logDownloadUrl, "messagesIn=0", LOG_TIMEOUT_IN_MS),
is(greaterThan(0)));
// Enqueue a single message to queue 1
Utils.sendMessages(session, session.createQueue(QUEUE1_NAME), 1);
assertThat("Post-enqueue statistics report not found",
countLogFileMatches(logDownloadUrl, "messagesIn=1", LOG_TIMEOUT_IN_MS),
is(greaterThan(0)));
}
finally
{
getHelper().submitRequest("broker/removeContextVariable", "POST",
singletonMap("name", "qpid.broker.statisticsReportPattern"), SC_OK);
getHelper().submitRequest("broker/removeContextVariable", "POST",
singletonMap("name", Broker.STATISTICS_REPORTING_PERIOD), SC_OK);
getHelper().submitRequest("brokerlogger/statslogger", "DELETE", SC_OK);
conn.close();
}
}
/**
* Create a JMS Session for the given Connection.
* <p>This implementation uses JMS 1.1 API.
* @param con the JMS Connection to create a Session for
* @return the new JMS Session
* @throws javax.jms.JMSException if thrown by JMS API methods
*/
protected Session createSession(Connection con) throws JMSException {
return con.createSession(true, Session.AUTO_ACKNOWLEDGE);
}