下面列出了javax.jms.InvalidSelectorException#javax.jms.TopicSession 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public Destination lookupDestination(String destinationName) throws JmsException {
Session session=null;
try {
session = createSession(false,Session.AUTO_ACKNOWLEDGE);
log.debug("Session class ["+session.getClass().getName()+"]");
Destination destination;
/* create the destination */
if (session instanceof TopicSession) {
destination = ((TopicSession)session).createTopic(destinationName);
} else {
destination = ((QueueSession)session).createQueue(destinationName);
}
return destination;
} catch (Exception e) {
throw new JmsException("cannot create destination", e);
} finally {
releaseSession(session);
}
}
protected Session createSession(Connection connection) {
try {
if (JMSConstants.JMS_SPEC_VERSION_1_1.equals(jmsSpec) || JMSConstants.JMS_SPEC_VERSION_2_0
.equals(jmsSpec)) {
return connection.createSession(transactedSession, sessionAckMode);
} else {
if (this.destinationType.equals(JMSConstants.JMSDestinationType.QUEUE)) {
return (QueueSession) ((QueueConnection) (connection))
.createQueueSession(transactedSession, sessionAckMode);
} else if (this.destinationType.equals(JMSConstants.JMSDestinationType.TOPIC)) {
return (TopicSession) ((TopicConnection) (connection))
.createTopicSession(transactedSession, sessionAckMode);
}
}
} catch (JMSException e) {
logger.error("JMS Exception while obtaining session for factory '" + this.connectionFactoryString + "' " + e
.getMessage(), e);
}
return null;
}
@Test
public void testNotificationProperties() throws Exception {
try (TopicConnection topicConnection = factory.createTopicConnection()) {
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic notificationsTopic = topicSession.createTopic("activemq.notifications");
TopicSubscriber subscriber = topicSession.createSubscriber(notificationsTopic);
List<Message> receivedMessages = new CopyOnWriteArrayList<>();
subscriber.setMessageListener(receivedMessages::add);
topicConnection.start();
Wait.waitFor(() -> receivedMessages.size() > 0);
Assert.assertTrue(receivedMessages.size() > 0);
for (Message message : receivedMessages) {
assertNotNull(message);
assertNotNull(message.getStringProperty("_AMQ_NotifType"));
}
}
}
public void testWithSessionCloseOutsideTheLoop() throws Exception {
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < 100; i++) {
TopicSubscriber subscriber = subscriberSession.createSubscriber(topic);
DummyMessageListener listener = new DummyMessageListener();
subscriber.setMessageListener(listener);
subscriber.close();
}
subscriberSession.close();
connection.close();
Thread.sleep(1000);
Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic);
assertNotNull(dest);
assertTrue(dest.getConsumers().isEmpty());
}
@Test
public void testGetTopic() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTemporaryTopic();
TopicSubscriber subscriber = session.createSubscriber(topic);
assertNotNull(subscriber.getTopic());
assertSame(topic, subscriber.getTopic());
subscriber.close();
try {
subscriber.getTopic();
fail("Cannot read topic on closed subscriber");
} catch (IllegalStateException ise) {}
}
public void testWithoutSessionAndSubsciberClose() throws Exception {
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
for (int i = 0; i < 100; i++) {
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = subscriberSession.createSubscriber(topic);
assertNotNull(subscriber);
}
connection.close();
Thread.sleep(1000);
Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic);
assertNotNull(dest);
assertTrue(dest.getConsumers().isEmpty());
}
@Test
public void testGetTopicSubscriber() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTemporaryTopic();
JmsPoolTopicSubscriber subscriber = (JmsPoolTopicSubscriber) session.createDurableSubscriber(topic, "name", "color = red", true);
assertNotNull(subscriber.getTopicSubscriber());
assertTrue(subscriber.getTopicSubscriber() instanceof MockJMSTopicSubscriber);
subscriber.close();
try {
subscriber.getTopicSubscriber();
fail("Cannot read state on closed subscriber");
} catch (IllegalStateException ise) {}
}
@Test
public void testCreateTopicPublisher() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
TopicConnection connection = factory.createTopicConnection();
assertNotNull(connection);
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Topic topic = session.createTopic(name.getMethodName());
TopicPublisher publisher = session.createPublisher(topic);
assertNotNull(publisher);
TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
assertEquals(0, proxy.getEnqueueCount());
connection.close();
}
@Test
public void testGetTopicPublisher() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTemporaryTopic();
JmsPoolTopicPublisher publisher = (JmsPoolTopicPublisher) session.createPublisher(topic);
assertNotNull(publisher.getTopicPublisher());
assertTrue(publisher.getTopicPublisher() instanceof MockJMSTopicPublisher);
publisher.close();
try {
publisher.getTopicPublisher();
fail("Cannot read state on closed publisher");
} catch (IllegalStateException ise) {}
}
@Test(timeout = 60000)
public void testTopicMessageSend() throws Exception {
cf.setMaxConnections(1);
TopicConnection connection = cf.createTopicConnection();
try {
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(getTestName());
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.send(topicSession.createMessage());
assertEquals(1, cf.getNumConnections());
} finally {
connection.close();
cf.stop();
}
}
protected void publishTestTopicMessages(int publishSize, int deliveryMode, AtomicLong publishedMessageSize)
throws Exception {
// create a new queue
Connection connection = cf.createConnection();
connection.setClientID("clientId2");
connection.start();
// Start the connection
Session session = connection.createSession(false, TopicSession.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(defaultTopicName);
try {
// publish a bunch of non-persistent messages to fill up the temp
// store
MessageProducer prod = session.createProducer(topic);
prod.setDeliveryMode(deliveryMode);
for (int i = 0; i < publishSize; i++) {
prod.send(createMessage(i, session, JournalPendingMessageTest.maxMessageSize, publishedMessageSize));
}
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testSenderAndPublisherDest() throws Exception {
JmsPoolXAConnectionFactory pcf = createXAPooledConnectionFactory();
QueueConnection connection = pcf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender sender = session.createSender(session.createQueue("AA"));
assertNotNull(sender.getQueue().getQueueName());
connection.close();
TopicConnection topicConnection = pcf.createTopicConnection();
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic("AA"));
assertNotNull(topicPublisher.getTopic().getTopicName());
topicConnection.close();
pcf.stop();
}
public void testWithOneSubscriber() throws Exception {
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = subscriberSession.createSubscriber(topic);
DummyMessageListener listener = new DummyMessageListener();
subscriber.setMessageListener(listener);
subscriber.close();
subscriberSession.close();
connection.close();
Thread.sleep(1000);
Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic);
assertNotNull(dest);
assertTrue(dest.getConsumers().isEmpty());
}
/**
* Topics shouldn't hold on to messages when the non-durable subscribers close
*/
@Test
public void testPersistentMessagesForTopicDropped2() throws Exception {
TopicConnection topicConn = createTopicConnection();
topicConn.start();
TopicSession sess = topicConn.createTopicSession(true, 0);
TopicPublisher pub = sess.createPublisher(ActiveMQServerTestCase.topic1);
TopicSubscriber sub = sess.createSubscriber(ActiveMQServerTestCase.topic1);
pub.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = sess.createTextMessage("testing123");
pub.publish(m);
sess.commit();
// receive but rollback
TextMessage m2 = (TextMessage) sub.receive(3000);
ProxyAssertSupport.assertNotNull(m2);
ProxyAssertSupport.assertEquals("testing123", m2.getText());
sess.rollback();
topicConn.close();
checkEmpty(ActiveMQServerTestCase.topic1);
}
public MessageProducer getMessageProducer(Session session,
Destination destination) throws NamingException, JMSException {
MessageProducer mp;
if (useJms102()) {
if (useTopicFunctions) {
mp = getTopicPublisher((TopicSession)session, (Topic)destination);
} else {
mp = getQueueSender((QueueSession)session, (Queue)destination);
}
} else {
mp = session.createProducer(destination);
}
if (getMessageTimeToLive()>0)
mp.setTimeToLive(getMessageTimeToLive());
return mp;
}
@Test
public void testTopicSessionCannotCreateCreateBrowser() throws Exception
{
Queue queue = createQueue(getTestName());
TopicConnection topicConnection = getTopicConnection();
try
{
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicSession.createBrowser(queue);
fail("Expected exception was not thrown");
}
catch (javax.jms.IllegalStateException s)
{
// PASS
}
finally
{
topicConnection.close();
}
}
@Test
public void testTopicSessionCannotCreateQueues() throws Exception
{
TopicConnection topicConnection = getTopicConnection();
try
{
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicSession.createQueue("abc");
fail("Expected exception was not thrown");
}
catch (javax.jms.IllegalStateException s)
{
// PASS
}
finally
{
topicConnection.close();
}
}
public void testWithSessionAndSubsciberClose() throws Exception {
TopicConnection connection = connectionFactory.createTopicConnection();
connection.start();
for (int i = 0; i < 100; i++) {
TopicSession subscriberSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = subscriberSession.createSubscriber(topic);
DummyMessageListener listener = new DummyMessageListener();
subscriber.setMessageListener(listener);
subscriber.close();
subscriberSession.close();
}
connection.close();
Thread.sleep(1000);
Destination dest = backEnd.getRegionBroker().getDestinationMap().get(topic);
assertNotNull(dest);
assertTrue(dest.getConsumers().isEmpty());
}
@Test
public void publisherGetDeliveryModeAfterConnectionClose() throws Exception
{
Topic topic = createTopic(getTestName());
TopicConnection connection = getTopicConnection();
try
{
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = session.createPublisher(topic);
connection.close();
try
{
publisher.getDeliveryMode();
fail("Expected exception not thrown");
}
catch (javax.jms.IllegalStateException e)
{
// PASS
}
}
finally
{
connection.close();
}
}
/**
* Create a MessageConsumer. In this overloaded function the selector is taken into account.
* This ensures that listeners (or other extensions of this class) do not influence how the selector
* is used: when a correlationID should be in the filter the <code>getMessageConsumerForCorrelationId</code>
* should be used, other wise the <code>getMessageConsumer</code> function which has no attribute for
* <code>selector</code>. When a MessageSelector is set, it will be used when no correlation id is required.
* @param session the Session
* @param destination the Destination
* @param selector the MessageSelector
* @return MessageConsumer
*/
public MessageConsumer getMessageConsumer(Session session, Destination destination, String selector) throws NamingException, JMSException {
if (useTopicFunctions) {
if (useJms102()) {
return getTopicSubscriber((TopicSession)session, (Topic)destination, selector);
} else {
return getTopicSubscriber(session, (Topic)destination, selector);
}
} else {
if (useJms102()) {
return getQueueReceiver((QueueSession)session, (Queue)destination, selector);
} else {
return session.createConsumer(destination, selector);
}
}
}
public DestinationWrapper<Topic> lookupTopic(String topic, TopicSession session)
throws JMSException, NamingException {
if (usingJNDI || session == null) {
return lookupTopicFromJNDI(topic);
} else if (usingMQ) {
//if we are using MQ call the superclass MQ methods to create the
// topic then we'll do anything MB specific..
//if we are using MQ call the superclass MQ methods to create the topic then we'll do anything MB specific..
DestinationWrapper<Topic> dw = super.lookupTopic(topic, session);
configureWBIMBTopic((MQTopic)dw.destination);
return dw;
}
//if we are here then we need to go create and configure the topic
// ourselves as it must be for MC or IP
return new DestinationWrapper<Topic>(topic,
configureWBIMBTopic((MQTopic) session.createTopic(topic)));
}
private Message collectMessagesFromDurableSubscriptionForOneMinute() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + brokerName);
TopicConnection connection = connectionFactory.createTopicConnection();
connection.setClientID(clientID);
TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = topicSession.createTopic(topicName);
connection.start();
TopicSubscriber subscriber = topicSession.createDurableSubscriber(topic, durableSubName);
LOG.info("About to receive messages");
Message message = subscriber.receive(120000);
subscriber.close();
connection.close();
LOG.info("collectMessagesFromDurableSubscriptionForOneMinute done");
return message;
}
@Override
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal)
throws JMSException {
checkTopicSession();
TopicSession ts = (TopicSession) delegate;
return TracingMessageConsumer.create(ts.createSubscriber(topic, messageSelector, noLocal),
jmsTracing);
}
/**
* Wrap the given Session with a proxy that delegates every method call to it
* but adapts close calls. This is useful for allowing application code to
* handle a special framework Session just like an ordinary Session.
* @param target the original Session to wrap
* @param sessionList the List of cached Sessions that the given Session belongs to
* @return the wrapped Session
*/
protected Session getCachedSessionProxy(Session target, LinkedList<Session> sessionList) {
List<Class<?>> classes = new ArrayList<>(3);
classes.add(SessionProxy.class);
if (target instanceof QueueSession) {
classes.add(QueueSession.class);
}
if (target instanceof TopicSession) {
classes.add(TopicSession.class);
}
return (Session) Proxy.newProxyInstance(SessionProxy.class.getClassLoader(),
ClassUtils.toClassArray(classes), new CachedSessionInvocationHandler(target, sessionList));
}
@Test
public void resolveWithPubSubTopicSession() throws Exception {
Topic expectedDestination = new StubTopic();
TopicSession session = mock(TopicSession.class);
given(session.createTopic(DESTINATION_NAME)).willReturn(expectedDestination);
testResolveDestination(session, expectedDestination, true);
}
private static void subscribe() throws JMSException {
// get topic connection
TopicConnectionFactory connectionFactory = new WeEventConnectionFactory(defaultBrokerUrl);
TopicConnection connection = connectionFactory.createTopicConnection();
// start connection
connection.start();
// create session
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// create topic
Topic topic = session.createTopic(topicName);
// create subscriber
TopicSubscriber subscriber = session.createSubscriber(topic);
// create listener
subscriber.setMessageListener(message -> {
BytesMessage msg = (BytesMessage) message;
try {
byte[] data = new byte[(int) msg.getBodyLength()];
msg.readBytes(data);
System.out.println("received: " + new String(data, StandardCharsets.UTF_8));
} catch (JMSException e) {
e.printStackTrace();
}
});
connection.close();
}
@Override public TopicSession getTopicSession() throws JMSException {
if ((types & TYPE_XA_TOPIC) != TYPE_XA_TOPIC) {
throw new IllegalStateException(delegate + " is not an XATopicSession");
}
TopicSession xats = ((XATopicSession) delegate).getTopicSession();
return TracingSession.create(xats, jmsTracing);
}
@Test
public void testCachingConnectionFactoryWithTopicConnectionFactoryAndJms102Usage() throws JMSException {
TopicConnectionFactory cf = mock(TopicConnectionFactory.class);
TopicConnection con = mock(TopicConnection.class);
TopicSession txSession = mock(TopicSession.class);
TopicSession nonTxSession = mock(TopicSession.class);
given(cf.createTopicConnection()).willReturn(con);
given(con.createTopicSession(true, Session.AUTO_ACKNOWLEDGE)).willReturn(txSession);
given(txSession.getTransacted()).willReturn(true);
given(con.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE)).willReturn(nonTxSession);
CachingConnectionFactory scf = new CachingConnectionFactory(cf);
scf.setReconnectOnException(false);
Connection con1 = scf.createTopicConnection();
Session session1 = con1.createSession(true, Session.AUTO_ACKNOWLEDGE);
session1.getTransacted();
session1.close(); // should lead to rollback
session1 = con1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
session1.close();
con1.start();
TopicConnection con2 = scf.createTopicConnection();
Session session2 = con2.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
session2.close();
session2 = con2.createSession(true, Session.AUTO_ACKNOWLEDGE);
session2.getTransacted();
session2.close();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(txSession).close();
verify(nonTxSession).close();
verify(con).start();
verify(con).stop();
verify(con).close();
}
@Override
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed();
ensureConnected();
int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
MockJMSTopicSession result = new MockJMSTopicSession(getNextSessionId(), ackMode, this);
addSession(result);
if (started.get()) {
result.start();
}
return result;
}
protected void createTestResources() throws Exception {
connection = createTopicConnectionToMockProvider();
TopicSession session = ((TopicConnection) connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic(_testName.getMethodName());
subscriber = session.createSubscriber(destination);
subscriber.close();
}