下面列出了怎么用javax.jms.QueueConnection的API类实例代码及写法,或者点击链接到github查看源代码。
public QueueConnection getQueueConnection(QueueConnectionFactory qcf)
throws JMSException {
final QueueConnection qc;
final String username = Config.parms.getString("us");
if (username != null && username.length() != 0) {
Log.logger.log(Level.INFO, "getQueueConnection(): authenticating as \"" + username + "\"");
final String password = Config.parms.getString("pw");
qc = qcf.createQueueConnection(username, password);
} else {
qc = qcf.createQueueConnection();
}
return qc;
}
/**
* Test create queue connection
*
* @throws Exception
*/
@Test
public void testCreateQueueConnections() throws Exception {
String queueName = "testCreateConQ";
Properties jmsProperties = JMSTestsUtils.getJMSPropertiesForDestination(queueName, PROVIDER_URL, true);
JMSConnectionFactory connectionFactory = new JMSConnectionFactory(jmsProperties);
JMSBrokerController brokerController = new JMSBrokerController(PROVIDER_URL, jmsProperties);
try {
brokerController.startProcess();
QueueConnection queueConnection = connectionFactory.createQueueConnection();
Assert.assertTrue("The queue connection is created", ((ActiveMQConnection) queueConnection).
getTransport().getRemoteAddress().contains("61616"));
QueueConnection queueConnectionWithCred = connectionFactory.createQueueConnection("admin", "admin");
Assert.assertTrue("The queue connection is created", ((ActiveMQConnection) queueConnectionWithCred).
getTransport().getRemoteAddress().contains("61616"));
} finally {
brokerController.disconnect();
brokerController.stopProcess();
}
}
@Test
public void testMultipleSessionsThrowsException() throws Exception {
resourceAdapter = newResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
resourceAdapter.start(ctx);
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
mcf.setResourceAdapter(resourceAdapter);
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
Session s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
Session s2 = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
fail("should throw javax,jms.IllegalStateException: Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6");
} catch (JMSException e) {
}
}
@Before
public void setUp() throws Exception {
System.setProperty(Context.INITIAL_CONTEXT_FACTORY, TestContextFactory.class.getName());
QueueConnectionFactory queueConnectionFactory = Mockito.mock(QueueConnectionFactory.class);
Queue queue = Mockito.mock(Queue.class);
Context context = Mockito.mock(Context.class);
TestContextFactory.context = context;
when(context.lookup(eq("jms/queueConnectionFactory"))).thenReturn(queueConnectionFactory);
when(context.lookup(eq("jms/mailQueue"))).thenReturn(queue);
queueSender = Mockito.mock(QueueSender.class);
QueueConnection queueConnection = Mockito.mock(QueueConnection.class);
when(queueConnectionFactory.createQueueConnection()).thenReturn(queueConnection);
when(queueConnectionFactory.createQueueConnection(anyString(), anyString())).thenReturn(queueConnection);
QueueSession queueSession = Mockito.mock(QueueSession.class);
bytesMessage = Mockito.mock(BytesMessage.class);
when(queueSession.createBytesMessage()).thenReturn(bytesMessage);
when(queueConnection.createQueueSession(anyBoolean(), anyInt())).thenReturn(queueSession);
when(queueSession.createSender(eq(queue))).thenReturn(queueSender);
transport = new SmtpJmsTransport(Session.getDefaultInstance(new Properties()), new URLName("jms"));
transportListener = Mockito.mock(TransportListener.class);
transport.addTransportListener(transportListener);
}
/**
* Send a message to testInboundQueue queue
*
* @throws Exception
*/
private void sendMessage() throws Exception {
InitialContext initialContext = JmsClientHelper.getActiveMqInitialContext();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContext.lookup(JmsClientHelper.QUEUE_CONNECTION_FACTORY);
QueueConnection queueConnection = connectionFactory.createQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
QueueSender sender = queueSession.createSender(queueSession.createQueue(QUEUE_NAME));
String message = "<?xml version='1.0' encoding='UTF-8'?>" +
" <ser:getQuote xmlns:ser=\"http://services.samples\" xmlns:xsd=\"http://services.samples/xsd\"> " +
" <ser:request>" +
" <xsd:symbol>IBM</xsd:symbol>" +
" </ser:request>" +
" </ser:getQuote>";
try {
TextMessage jmsMessage = queueSession.createTextMessage(message);
jmsMessage.setJMSType("incorrecttype");
sender.send(jmsMessage);
} finally {
queueConnection.close();
}
}
@Test
public void testCreateQueueSender() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(getBrokerAmqpConnectionURI());
QueueConnection connection = factory.createQueueConnection();
assertNotNull(connection);
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(name.getMethodName());
QueueSender sender = session.createSender(queue);
assertNotNull(sender);
QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
assertEquals(0, proxy.getQueueSize());
connection.close();
}
@Test(timeout = 60000)
public void testSenderAndPublisherDest() throws Exception {
JmsPoolXAConnectionFactory pcf = createXAPooledConnectionFactory();
QueueConnection connection = pcf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender sender = session.createSender(session.createQueue("AA"));
assertNotNull(sender.getQueue().getQueueName());
connection.close();
TopicConnection topicConnection = pcf.createTopicConnection();
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic("AA"));
assertNotNull(topicPublisher.getTopic().getTopicName());
topicConnection.close();
pcf.stop();
}
@Override
public QueueConnection createQueueConnection(String username, String password) throws JMSException {
JmsQueueConnection connection = null;
try {
JmsConnectionInfo connectionInfo = configureConnectionInfo(username, password);
Provider provider = createProvider(remoteURI);
connection = new JmsQueueConnection(connectionInfo, provider);
connection.setExceptionListener(exceptionListener);
connection.connect();
} catch (Exception e) {
if (connection != null) {
try {
connection.close();
} catch (Throwable ignored) {}
}
throw JmsExceptionSupport.create(e);
}
return connection;
}
/**
* Creates a AMQP connection with the number of channels specified, registered on top of it.
*
* @param numberOfChannels number of channels to be created using the connection
* @param userName admin user
* @param password admin password
* @param hostName localhost
* @param port the AMQP port for which the broker listens to
* @return the created JMS connection
* @throws NamingException if an error occurs while creating the context/connection factory using given properties.
* @throws JMSException if an error occurs while creating/starting the connection/session
*/
private Connection createConnection(int numberOfChannels, String userName, String password, String hostName,
String port) throws NamingException, JMSException {
InitialContext initialContext
= ClientHelper.getInitialContextBuilder(userName, password, hostName, port).build();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContext.lookup(ClientHelper.CONNECTION_FACTORY);
QueueConnection connection = connectionFactory.createQueueConnection();
connection.start();
for (int i = 0; i < numberOfChannels; i++) {
QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
/*
For each channel, create a number of consumers that is equal to the channel number.
e.g. if the channel count is 3, channel1 has 1 consumer, channel2 has 2 consumers and channel3 has 3
consumers
*/
for (int j = 0; j < i; j++) {
Queue queue = session.createQueue("queue");
session.createReceiver(queue);
}
}
return connection;
}
@Test
public void testConnectionCredentialsFail() throws Exception {
resourceAdapter = newResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
resourceAdapter.start(ctx);
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
mcf.setResourceAdapter(resourceAdapter);
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
ManagedConnection mc = ((ActiveMQRASession) session).getManagedConnection();
queueConnection.close();
mc.destroy();
try {
queueConnection = qraConnectionFactory.createQueueConnection("testuser", "testwrongpassword");
queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE).close();
fail("should throw esxception");
} catch (JMSException e) {
//pass
}
}
@Test
public void createReceiver() throws Exception
{
Queue queue = createQueue(getTestName());
QueueConnection queueConnection = getQueueConnection();
try
{
queueConnection.start();
Utils.sendMessages(queueConnection, queue, 3);
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver receiver = session.createReceiver(queue, String.format("%s=2", INDEX));
assertEquals("Queue names should match from QueueReceiver", queue.getQueueName(), receiver.getQueue().getQueueName());
Message received = receiver.receive(getReceiveTimeout());
assertNotNull("Message is not received", received);
assertEquals("Unexpected message is received", 2, received.getIntProperty(INDEX));
}
finally
{
queueConnection.close();
}
}
@Test
public void testQueueSessionCannotCreateTemporaryTopics() throws Exception
{
QueueConnection queueConnection = getQueueConnection();
try
{
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
queueSession.createTemporaryTopic();
fail("expected exception did not occur");
}
catch (javax.jms.IllegalStateException s)
{
// PASS
}
}
finally
{
queueConnection.close();
}
}
@Test
public void testQueueSessionCannotCreateTopics() throws Exception
{
QueueConnection queueConnection = getQueueConnection();
try
{
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
queueSession.createTopic("abc");
fail("expected exception did not occur");
}
catch (javax.jms.IllegalStateException s)
{
// PASS
}
}
finally
{
queueConnection.close();
}
}
@Test
public void testQueueSessionCannotCreateDurableSubscriber() throws Exception
{
Topic topic = createTopic(getTestName());
QueueConnection queueConnection = getQueueConnection();
try
{
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
queueSession.createDurableSubscriber(topic, "abc");
fail("expected exception did not occur");
}
catch (javax.jms.IllegalStateException s)
{
// PASS
}
}
finally
{
queueConnection.close();
}
}
@Test
public void testQueueSessionCannotUnsubscribe() throws Exception
{
QueueConnection queueConnection = getQueueConnection();
try
{
QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
queueSession.unsubscribe("abc");
fail("expected exception did not occur");
}
catch (javax.jms.IllegalStateException s)
{
// PASS
}
}
finally
{
queueConnection.close();
}
}
@Test
public void sendToUnknownQueue() throws Exception
{
QueueConnection connection = ((QueueConnection) getConnectionBuilder().build());
try
{
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue invalidDestination = session.createQueue("unknown");
try
{
QueueSender sender = session.createSender(invalidDestination);
sender.send(session.createMessage());
fail("Exception not thrown");
}
catch (InvalidDestinationException e)
{
//PASS
}
}
finally
{
connection.close();
}
}
@Test
public void testWithQueueConnectionFactoryAndJms102Usage() throws JMSException {
QueueConnectionFactory cf = mock(QueueConnectionFactory.class);
QueueConnection con = mock(QueueConnection.class);
given(cf.createQueueConnection()).willReturn(con);
SingleConnectionFactory scf = new SingleConnectionFactory(cf);
Connection con1 = scf.createQueueConnection();
Connection con2 = scf.createQueueConnection();
con1.start();
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(con).start();
verify(con).stop();
verify(con).close();
verifyNoMoreInteractions(con);
}
protected void createTestResources() throws Exception {
connection = createQueueConnectionToMockProvider();
QueueSession session = ((QueueConnection) connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(_testName.getMethodName());
receiver = session.createReceiver(destination);
receiver.close();
}
public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
try {
return ((QueueConnectionFactory) (this.connectionFactory)).createQueueConnection(userName, password);
} catch (JMSException e) {
logger.error(
"JMS Exception while creating queue connection through factory '" + this.connectionFactoryString
+ "' " + e.getMessage(), e);
}
return null;
}
@Before
public void setUpMocks() throws Exception {
mockConnectionFactory = mock(QueueConnectionFactory.class);
mockConnection = mock(QueueConnection.class);
mockSession = mock(QueueSession.class);
mockQueue = mock(Queue.class);
given(mockConnectionFactory.createConnection()).willReturn(mockConnection);
given(mockConnection.createSession(false, Session.AUTO_ACKNOWLEDGE)).willReturn(mockSession);
}
@Override
public QueueConnection createQueueConnection(String username, String password) throws JMSException {
ConnectionFactory target = getTargetConnectionFactory();
if (!(target instanceof QueueConnectionFactory)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no QueueConnectionFactory");
}
QueueConnection targetConnection = ((QueueConnectionFactory) target).createQueueConnection(username, password);
return (QueueConnection) getTransactionAwareConnectionProxy(targetConnection);
}
/**
* Wrap the given Connection with a proxy that delegates every method call to it
* but handles Session lookup in a transaction-aware fashion.
* @param target the original Connection to wrap
* @return the wrapped Connection
*/
protected Connection getTransactionAwareConnectionProxy(Connection target) {
List<Class<?>> classes = new ArrayList<>(3);
classes.add(Connection.class);
if (target instanceof QueueConnection) {
classes.add(QueueConnection.class);
}
if (target instanceof TopicConnection) {
classes.add(TopicConnection.class);
}
return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(),
ClassUtils.toClassArray(classes), new TransactionAwareConnectionInvocationHandler(target));
}
@Override
public QueueConnection createQueueConnection() throws JMSException {
ConnectionFactory target = obtainTargetConnectionFactory();
if (target instanceof QueueConnectionFactory) {
return ((QueueConnectionFactory) target).createQueueConnection();
}
else {
Connection con = target.createConnection();
if (!(con instanceof QueueConnection)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a QueueConnectionFactory");
}
return (QueueConnection) con;
}
}
@Override
public QueueConnection createQueueConnection(String username, String password) throws JMSException {
ConnectionFactory target = obtainTargetConnectionFactory();
if (target instanceof QueueConnectionFactory) {
return ((QueueConnectionFactory) target).createQueueConnection(username, password);
}
else {
Connection con = target.createConnection(username, password);
if (!(con instanceof QueueConnection)) {
throw new javax.jms.IllegalStateException("'targetConnectionFactory' is not a QueueConnectionFactory");
}
return (QueueConnection) con;
}
}
/**
* Dynamically creates a topic. This goes against the normal idea
* that JMS queues and topics should managed administratively, using
* management tools. But for some applications this would be too
* burdensome. The user would have to additionally know about the
* administration tools as well. Given that might be creating quite
* a few AVL feeds, each one being a separate topic, this could be
* a real nuisance.
*
* @param topicName
* @return true if topic created successfully
* @throws JMSException
*/
private boolean createTopic(String topicName) throws JMSException {
QueueConnectionFactory queueConnectionFactory =
(QueueConnectionFactory) connectionFactory;
QueueConnection connection = queueConnectionFactory.createQueueConnection();
Queue managementQueue = HornetQJMSClient.createQueue("hornetq.management");
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Message message = session.createMessage();
JMSManagementHelper.putOperationInvocation(message,
"jms.server",
"createTopic", // management command
topicName, // Name in hornetq
topicName); // JNDI name. This peculiar seemingly undocumented
// parameter is needed so that can use JNDI to access
// the dynamically created topic. Found info on doing
// this at https://community.jboss.org/thread/165355 .
QueueRequestor requestor = new QueueRequestor(session, managementQueue);
// Determine if was successful
Message reply = requestor.request(message);
boolean topicCreated = JMSManagementHelper.hasOperationSucceeded(reply);
if (topicCreated)
logger.info("Dynamically created topic \"" + topicName + "\"");
else
logger.error("Failed to dynamically created topic \"" + topicName + "\"");
// Return whether successful
return topicCreated;
}
/**
* Determine whether there are currently thread-bound credentials,
* using them if available, falling back to the statically specified
* username and password (i.e. values of the bean properties) else.
* @see #doCreateQueueConnection
*/
@Override
public final QueueConnection createQueueConnection() throws JMSException {
JmsUserCredentials threadCredentials = this.threadBoundCredentials.get();
if (threadCredentials != null) {
return doCreateQueueConnection(threadCredentials.username, threadCredentials.password);
}
else {
return doCreateQueueConnection(this.username, this.password);
}
}
@Test
public void testCreateTopicOnAQueueSession() throws Exception {
QueueConnection c = (QueueConnection) getConnectionFactory().createConnection();
QueueSession s = c.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
try {
s.createTopic("TestTopic");
ProxyAssertSupport.fail("should throw IllegalStateException");
} catch (javax.jms.IllegalStateException e) {
// OK
}
c.close();
}
@Before
public void setUpMocks() throws Exception {
mockConnectionFactory = mock(QueueConnectionFactory.class);
mockConnection = mock(QueueConnection.class);
mockSession = mock(QueueSession.class);
mockQueue = mock(Queue.class);
given(mockConnectionFactory.createConnection()).willReturn(mockConnection);
given(mockConnection.createSession(false, Session.AUTO_ACKNOWLEDGE)).willReturn(mockSession);
}
/**
* Wrap the given Connection with a proxy that delegates every method call to it
* but suppresses close calls. This is useful for allowing application code to
* handle a special framework Connection just like an ordinary Connection from a
* JMS ConnectionFactory.
* @param target the original Connection to wrap
* @return the wrapped Connection
*/
protected Connection getSharedConnectionProxy(Connection target) {
List<Class<?>> classes = new ArrayList<Class<?>>(3);
classes.add(Connection.class);
if (target instanceof QueueConnection) {
classes.add(QueueConnection.class);
}
if (target instanceof TopicConnection) {
classes.add(TopicConnection.class);
}
return (Connection) Proxy.newProxyInstance(
Connection.class.getClassLoader(),
classes.toArray(new Class<?>[classes.size()]),
new SharedConnectionInvocationHandler());
}
@Test
public void testCachingConnectionFactoryWithQueueConnectionFactoryAndJms102Usage() throws JMSException {
QueueConnectionFactory cf = mock(QueueConnectionFactory.class);
QueueConnection con = mock(QueueConnection.class);
QueueSession txSession = mock(QueueSession.class);
QueueSession nonTxSession = mock(QueueSession.class);
given(cf.createQueueConnection()).willReturn(con);
given(con.createQueueSession(true, Session.AUTO_ACKNOWLEDGE)).willReturn(txSession);
given(txSession.getTransacted()).willReturn(true);
given(con.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE)).willReturn(nonTxSession);
CachingConnectionFactory scf = new CachingConnectionFactory(cf);
scf.setReconnectOnException(false);
Connection con1 = scf.createQueueConnection();
Session session1 = con1.createSession(true, Session.AUTO_ACKNOWLEDGE);
session1.rollback();
session1.close();
session1 = con1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
session1.close();
con1.start();
QueueConnection con2 = scf.createQueueConnection();
Session session2 = con2.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
session2.close();
session2 = con2.createSession(true, Session.AUTO_ACKNOWLEDGE);
session2.getTransacted();
session2.close(); // should lead to rollback
con2.start();
con1.close();
con2.close();
scf.destroy(); // should trigger actual close
verify(txSession).rollback();
verify(txSession).close();
verify(nonTxSession).close();
verify(con).start();
verify(con).stop();
verify(con).close();
}