下面列出了javax.jms.Connection#setClientID ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected void browseTestQueueMessages(String queueName) throws Exception {
// Start the connection
Connection connection = cf.createConnection();
connection.setClientID("clientId2" + queueName);
connection.start();
Session session = connection.createSession(false, QueueSession.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
try {
QueueBrowser queueBrowser = session.createBrowser(queue);
@SuppressWarnings("unchecked")
Enumeration<Message> messages = queueBrowser.getEnumeration();
while (messages.hasMoreElements()) {
messages.nextElement();
}
} finally {
connection.close();
}
}
@Test
public void testMessageSizeOneDurablePartialConsumption() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.setClientID("clientId");
connection.start();
publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, publishedMessageSize,
DeliveryMode.PERSISTENT, false);
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
// consume partial messages
consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
// 150 should be left
verifyPendingStats(defaultTopicName, 150, publishedMessageSize.get());
// We don't really know the size here but it should be smaller than before
// so take an average
verifyPendingDurableStats(defaultTopicName, 150, (long) (.75 * publishedMessageSize.get()));
connection.close();
}
@Test(timeout = 20000)
public void testSaslPlainConnection() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Expect a PLAIN connection
String user = "user";
String pass = "qwerty123456";
testPeer.expectSaslPlain(user, pass);
testPeer.expectOpen();
// Each connection creates a session for managing temporary destinations etc
testPeer.expectBegin();
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort());
Connection connection = factory.createConnection(user, pass);
// Set a clientID to provoke the actual AMQP connection process to occur.
connection.setClientID("clientName");
testPeer.waitForAllHandlersToComplete(1000);
assertNull(testPeer.getThrowable());
testPeer.expectClose();
connection.close();
}
}
@Test
public void testTopicMessageSize() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.setClientID("clientId");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createTopic(defaultTopicName));
publishTestTopicMessages(200, publishedMessageSize);
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultQueueName, 0, 0);
// consume all messages
consumeTestMessages(consumer, 200);
// All messages should now be gone
verifyPendingStats(defaultTopicName, 0, 0);
verifyPendingDurableStats(defaultQueueName, 0, 0);
connection.close();
}
@Test(timeout = 60000)
public void testSetClientIDAfterConnectionStart() throws Exception {
Connection connection = cf.createConnection();
// test: try to call setClientID() after start()
// should result in an exception
try {
connection.start();
connection.setClientID("newID3");
fail("Calling setClientID() after start() mut raise a JMSException.");
} catch (IllegalStateException ise) {
LOG.debug("Correctly received " + ise);
} finally {
connection.close();
cf.stop();
}
LOG.debug("Test finished.");
}
@Test(timeout = 60000)
public void testSetClientIDTwiceWithSameID() throws Exception {
LOG.debug("running testRepeatedSetClientIDCalls()");
// test: call setClientID("newID") twice
// this should be tolerated and not result in an exception
ConnectionFactory cf = createPooledConnectionFactory();
Connection conn = cf.createConnection();
conn.setClientID("newID");
try {
conn.setClientID("newID");
conn.start();
conn.close();
} catch (IllegalStateException ise) {
LOG.error("Repeated calls to newID2.setClientID(\"newID\") caused " + ise.getMessage());
fail("Repeated calls to newID2.setClientID(\"newID\") caused " + ise.getMessage());
} finally {
((JmsPoolConnectionFactory) cf).stop();
}
LOG.debug("Test finished.");
}
@Test(timeout = 60000)
public void testSetClientIDTwiceWithDifferentID() throws Exception {
LOG.debug("running testRepeatedSetClientIDCalls()");
ConnectionFactory cf = createPooledConnectionFactory();
Connection conn = cf.createConnection();
// test: call setClientID() twice with different IDs
// this should result in an IllegalStateException
conn.setClientID("newID1");
try {
conn.setClientID("newID2");
fail("calling Connection.setClientID() twice with different clientID must raise an IllegalStateException");
} catch (IllegalStateException ise) {
LOG.debug("Correctly received " + ise);
} finally {
conn.close();
((JmsPoolConnectionFactory) cf).stop();
}
LOG.debug("Test finished.");
}
public void testInitialConnectDelayWithNoBroker() throws Exception {
// the initialReconnectDelay only kicks in once a set of connect URL have
// been returned from the discovery agent.
// Up to that point the reconnectDelay is used which has a default value of 10
//
long initialReconnectDelay = 4000;
long startT = System.currentTimeMillis();
String groupId = "WillNotMatch" + startT;
try {
String urlStr = "discovery:(multicast://default?group=" + groupId +
")?useExponentialBackOff=false&maxReconnectAttempts=2&reconnectDelay=" + initialReconnectDelay;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlStr);
LOG.info("Connecting.");
Connection connection = factory.createConnection();
connection.setClientID("test");
fail("Did not fail to connect as expected.");
} catch (JMSException expected) {
assertTrue("reason is java.io.IOException, was: " + expected.getCause(), expected.getCause() instanceof java.io.IOException);
long duration = System.currentTimeMillis() - startT;
assertTrue("took at least initialReconnectDelay time: " + duration + " e:" + expected, duration >= initialReconnectDelay);
}
}
@Test(timeout = 20000)
public void testIdleTimeoutIsAdvertisedByDefault() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
testPeer.expectSaslAnonymous();
testPeer.expectOpen(null, greaterThan(UnsignedInteger.valueOf(0)), null, false);
// Each connection creates a session for managing temporary destinations etc
testPeer.expectBegin();
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort());
Connection connection = factory.createConnection();
// Set a clientID to provoke the actual AMQP connection process to occur.
connection.setClientID("clientName");
testPeer.waitForAllHandlersToComplete(1000);
assertNull(testPeer.getThrowable());
testPeer.expectClose();
connection.close();
}
}
@Test
public void testAutoCreateOnDurableSubscribeToTopic() throws Exception {
Connection connection = cf.createConnection();
connection.setClientID("myClientID");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic topic = ActiveMQJMSClient.createTopic(QUEUE_NAME);
MessageConsumer consumer = session.createDurableConsumer(topic, "myDurableSub");
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("msg"));
connection.start();
assertNotNull(consumer.receive(500));
connection.close();
assertNotNull(server.getManagementService().getResource(ResourceNames.ADDRESS + "test"));
assertNotNull(server.locateQueue(SimpleString.toSimpleString("myClientID.myDurableSub")));
}
@Test
public void testMessageSizeSharedDurable() throws Exception {
AtomicLong publishedMessageSize = new AtomicLong();
Connection connection = cf.createConnection();
connection.setClientID("clientId");
connection.start();
// The publish method will create a second shared consumer
Session s = connection.createSession();
MessageConsumer c = s.createSharedDurableConsumer(s.createTopic(defaultTopicName), "sub1");
publishTestMessagesDurable(connection, new String[] {"sub1",}, 200, publishedMessageSize,
DeliveryMode.PERSISTENT, true);
// verify the count and size - double because two durables so two queue
// bindings
verifyPendingStats(defaultTopicName, 200, publishedMessageSize.get());
verifyPendingDurableStats(defaultTopicName, 200, publishedMessageSize.get());
c.close();
// consume messages for sub1
consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
verifyPendingStats(defaultTopicName, 0, publishedMessageSize.get());
verifyPendingDurableStats(defaultTopicName, 0, publishedMessageSize.get());
connection.close();
}
@Test
public void testSetClientIDFail() throws Exception {
final String clientID = "my-test-client-id";
// Setting a client id must be the first thing done to the connection
// otherwise a javax.jms.IllegalStateException must be thrown
Connection connection = createConnection();
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
connection.setClientID(clientID);
ProxyAssertSupport.fail();
} catch (javax.jms.IllegalStateException e) {
ConnectionTest.log.trace("Caught exception ok");
}
connection.close();
// TODO: This will probably go away, remove it enterily after we
// make sure this rule can go away
// connection = createConnection();
// connection.getClientID();
// try
// {
// connection.setClientID(clientID);
// ProxyAssertSupport.fail();
// }
// catch (javax.jms.IllegalStateException e)
// {
// }
// connection.close();
}
@Test
public void testUnsubscribeDurableSubscription() throws Exception {
Connection conn = null;
try {
conn = createConnection();
conn.setClientID("ak47");
Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = s.createDurableSubscriber(ActiveMQServerTestCase.topic1, "uzzi");
MessageProducer prod = s.createProducer(ActiveMQServerTestCase.topic1);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
prod.send(s.createTextMessage("one"));
cons.close();
s.unsubscribe("uzzi");
MessageConsumer ds = s.createDurableSubscriber(ActiveMQServerTestCase.topic1, "uzzi");
conn.start();
ProxyAssertSupport.assertNull(ds.receiveNoWait());
ds.close();
s.unsubscribe("uzzi");
} finally {
if (conn != null) {
conn.close();
}
}
}
@Test
public void testSharedDurableConsumerWithClientID() throws Exception {
conn = cf.createConnection();
conn.setClientID("C1");
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Connection conn2 = cf.createConnection();
conn2.setClientID("C2");
Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
{
Connection conn3 = cf.createConnection();
boolean exception = false;
try {
conn3.setClientID("C2");
} catch (Exception e) {
exception = true;
}
Assert.assertTrue(exception);
conn3.close();
}
topic = ActiveMQJMSClient.createTopic(T_NAME);
MessageConsumer cons = session.createSharedDurableConsumer(topic, "test1");
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("test"));
TextMessage txt = (TextMessage) cons.receive(5000);
Assert.assertNotNull(txt);
}
@Test
public void testSubscribeTopicToFQQNWrongQueueAttachedToAnotherAddress() throws Exception {
// Create 2 Queues: address1::queue1, address2::queue2
String address1 = "a1";
String address2 = "a2";
String queue1 = "q1";
String queue2 = "q2";
server.createQueue(new QueueConfiguration(queue1).setAddress(address1));
server.createQueue(new QueueConfiguration(queue2).setAddress(address2));
Exception e = null;
// Wrong FQQN. Attempt to subscribe to a queue belonging to a different address than given in the FQQN.
String wrongFQQN = address1 + "::" + queue2;
Connection connection = createConnection(false);
try {
connection.setClientID("FQQNconn");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(wrongFQQN);
session.createConsumer(topic);
} catch (InvalidDestinationException ide) {
e = ide;
} finally {
connection.close();
}
assertNotNull(e);
assertTrue(e.getMessage().contains("Queue: '" + queue2 + "' does not exist for address '" + address1 + "'"));
}
/**
* Try setting client ID
*/
@Test
public void testSetClientID() throws Exception {
Connection conn = createConnection();
conn.setClientID("myID");
String clientID = conn.getClientID();
ProxyAssertSupport.assertEquals("Invalid ClientID", "myID", clientID);
}
private Connection openConnection() throws JMSException {
Connection con = cf.createConnection();
con.setClientID(conClientId);
con.start();
return con;
}
public void testNoExceptionOnRedeliveryAckWithSimpleTopicConsumer() throws Exception {
Destination destination = createDestination(getClass().getName());
Connection connection = createConnection();
final AtomicBoolean gotException = new AtomicBoolean();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.error("unexpected ex:" + exception);
gotException.set(true);
}
});
connection.setClientID(idGen.generateId());
connection.start();
Session consumerSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = null;
if (topic) {
consumer = consumerSession.createConsumer(destination);
} else {
consumer = consumerSession.createConsumer(destination);
}
Session producerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
producer.setDeliveryMode(deliveryMode);
TextMessage sentMsg = producerSession.createTextMessage();
sentMsg.setText("msg1");
producer.send(sentMsg);
producerSession.commit();
Message recMsg = consumer.receive(RECEIVE_TIMEOUT);
assertFalse(recMsg.getJMSRedelivered());
recMsg = consumer.receive(RECEIVE_TIMEOUT);
consumerSession.rollback();
recMsg = consumer.receive(RECEIVE_TIMEOUT);
assertTrue(recMsg.getJMSRedelivered());
consumerSession.rollback();
recMsg = consumer.receive(RECEIVE_TIMEOUT);
assertTrue(recMsg.getJMSRedelivered());
consumerSession.commit();
assertTrue(recMsg.equals(sentMsg));
assertTrue(recMsg.getJMSRedelivered());
connection.close();
assertFalse("no exception", gotException.get());
}
private Connection openConnection() throws JMSException {
Connection con = cf.createConnection();
con.setClientID(conClientId);
con.start();
return con;
}
@Test
public void testDeleteTopicAfterClusteredSend() throws Exception {
Connection conn1 = cf1.createConnection();
conn1.setClientID("someClient1");
Connection conn2 = cf2.createConnection();
conn2.setClientID("someClient2");
conn1.start();
conn2.start();
try {
Topic topic1 = createTopic(TOPIC);
Topic topic2 = (Topic) context1.lookup("topic/" + TOPIC);
Session session1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
// topic1 and 2 should be the same.
// Using a different instance here just to make sure it is implemented correctly
MessageConsumer cons2 = session2.createDurableSubscriber(topic2, "sub2");
Thread.sleep(500);
MessageProducer prod1 = session1.createProducer(topic1);
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < 2; i++) {
prod1.send(session1.createTextMessage("someMessage"));
}
TextMessage received = (TextMessage) cons2.receive(5000);
assertNotNull(received);
assertEquals("someMessage", received.getText());
cons2.close();
} finally {
conn1.close();
conn2.close();
}
jmsServer1.destroyTopic(TOPIC);
jmsServer2.destroyTopic(TOPIC);
}