下面列出了怎么用javax.jms.ServerSession的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public ServerSession getServerSession() throws JMSException {
synchronized (this) {
if (serverSessionInUse) {
LOG.info("asked for session while in use, not serialised delivery");
success.set(false);
completed.set(true);
}
serverSessionInUse = true;
return serverSession;
}
}
private boolean deliverNextPending() {
if (messageQueue.isRunning() && !messageQueue.isEmpty()) {
dispatchLock.lock();
try {
ServerSession serverSession = getServerSessionPool().getServerSession();
if (serverSession == null) {
// There might not be an available session so queue a task to try again
// and hope that by then one is available in the pool.
dispatcher.schedule(new BoundedMessageDeliverTask(messageQueue.size()), DEFAULT_DISPATCH_RETRY_DELAY, TimeUnit.MILLISECONDS);
return false;
}
Session session = serverSession.getSession();
JmsInboundMessageDispatch envelope = messageQueue.dequeueNoWait();
if (session instanceof JmsSession) {
((JmsSession) session).enqueueInSession(new DeliveryTask(envelope));
} else {
LOG.warn("ServerSession provided an unknown JMS Session type to this ConnectionConsumer: {}", session);
}
serverSession.start();
} catch (JMSException e) {
connection.onAsyncException(e);
stop(true);
} finally {
dispatchLock.unlock();
}
}
return !messageQueue.isEmpty();
}
@Override
public ServerSession getServerSession() throws JMSException {
if (firstAttempt) {
firstAttempt = false;
return null;
} else {
return serverSession;
}
}
@Test
public void testFailoverWithConnectionConsumer() throws Exception {
LOG.info(this + " running test testFailoverWithConnectionConsumer");
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
final CountDownLatch connectionConsumerGotOne = new CountDownLatch(1);
try {
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(QUEUE_NAME);
final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
@Override
public ServerSession getServerSession() throws JMSException {
return new ServerSession() {
@Override
public Session getSession() throws JMSException {
return poolSession;
}
@Override
public void start() throws JMSException {
connectionConsumerGotOne.countDown();
poolSession.run();
}
};
}
}, 1);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer;
TextMessage message;
final int count = 10;
for (int i = 0; i < count; i++) {
producer = session.createProducer(destination);
message = session.createTextMessage("Test message: " + count);
producer.send(message);
producer.close();
}
// restart to force failover and connection state recovery before the commit
broker.stop();
startBroker();
session.commit();
for (int i = 0; i < count - 1; i++) {
Message received = consumer.receive(20000);
Assert.assertNotNull("Failed to get message: " + count, received);
}
session.commit();
} finally {
connection.close();
}
Assert.assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
}
@Override
public ServerSession getServerSession() throws JMSException {
throw new JMSException("Something is wrong with me");
}
@Override
public ServerSession getServerSession() throws JMSException {
return serverSession;
}
static ServerSession create(ServerSession delegate, JmsTracing jmsTracing) {
if (delegate == null) throw new NullPointerException("serverSession == null");
if (delegate instanceof TracingServerSession) return delegate;
return new TracingServerSession(delegate, jmsTracing);
}
TracingServerSession(ServerSession delegate, JmsTracing jmsTracing) {
this.delegate = delegate;
this.jmsTracing = jmsTracing;
}
@Override public ServerSession getServerSession() throws JMSException {
return TracingServerSession.create(delegate.getServerSession(), jmsTracing);
}