下面列出了javax.jms.QueueSender#send ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void onMessage(Message message) {
try {
TextMessage receiveMessage = (TextMessage) message;
String keys = receiveMessage.getText();
LOGGER.info("keys = " + keys);
MapMessage returnMess = session.createMapMessage();
returnMess.setStringProperty("/a2/m1", "zhaohui");
returnMess.setStringProperty("/a3/m1/v2", "nanjing");
returnMess.setStringProperty("/a3/m1/v2/t2", "zhaohui");
QueueSender sender = session.createSender((Queue) message.getJMSReplyTo());
sender.send(returnMess);
} catch (Exception e) {
LOGGER.error("onMessage error", e);
}
}
@Test
public void anonymousSenderSendToUnknownQueue() throws Exception
{
QueueConnection connection = ((QueueConnection) getConnectionBuilder().setSyncPublish(true).build());
try
{
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue invalidDestination = session.createQueue("unknown");
try
{
QueueSender sender = session.createSender(null);
sender.send(invalidDestination, session.createMessage());
fail("Exception not thrown");
}
catch (InvalidDestinationException e)
{
//PASS
}
}
finally
{
connection.close();
}
}
/**
* Send a message to testInboundQueue queue
*
* @throws Exception
*/
private void sendMessage() throws Exception {
InitialContext initialContext = JmsClientHelper.getActiveMqInitialContext();
QueueConnectionFactory connectionFactory
= (QueueConnectionFactory) initialContext.lookup(JmsClientHelper.QUEUE_CONNECTION_FACTORY);
QueueConnection queueConnection = connectionFactory.createQueueConnection();
QueueSession queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
QueueSender sender = queueSession.createSender(queueSession.createQueue(QUEUE_NAME));
String message = "<?xml version='1.0' encoding='UTF-8'?>" +
" <ser:getQuote xmlns:ser=\"http://services.samples\" xmlns:xsd=\"http://services.samples/xsd\"> " +
" <ser:request>" +
" <xsd:symbol>IBM</xsd:symbol>" +
" </ser:request>" +
" </ser:getQuote>";
try {
TextMessage jmsMessage = queueSession.createTextMessage(message);
jmsMessage.setJMSType("incorrecttype");
sender.send(jmsMessage);
} finally {
queueConnection.close();
}
}
@Override
public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive)
throws JMSException {
checkQueueSender();
QueueSender qs = (QueueSender) delegate;
Span span = createAndStartProducerSpan(message, destination(message));
SpanInScope ws = tracer.withSpanInScope(span);
Throwable error = null;
try {
qs.send(queue, message, deliveryMode, priority, timeToLive);
} catch (Throwable t) {
propagateIfFatal(t);
error = t;
throw t;
} finally {
if (error != null) span.error(error);
span.finish();
ws.close();
}
}
private void sendBytesMessage(String destName, byte[] buffer) throws Exception {
InitialContext ic = getInitialContext();
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ic.lookup("ConnectionFactory");
QueueConnection connection = queueConnectionFactory.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
BytesMessage bm = session.createBytesMessage();
bm.writeBytes(buffer);
QueueSender sender = session.createSender((Queue) ic.lookup(destName));
sender.send(bm);
sender.close();
session.close();
connection.close();
}
@Test
public void testSendToQueueFailsIfNotAnonymousPublisher() throws JMSException {
JmsPoolConnection connection = (JmsPoolConnection) cf.createTopicConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
QueueSender sender = session.createSender(queue);
try {
sender.send(session.createTemporaryQueue(), session.createTextMessage());
fail("Should not be able to send to alternate destination");
} catch (UnsupportedOperationException ex) {}
}
@Test
public void sendToUnknownQueue() throws Exception
{
QueueConnection connection = ((QueueConnection) getConnectionBuilder().build());
try
{
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue invalidDestination = session.createQueue("unknown");
try
{
QueueSender sender = session.createSender(invalidDestination);
sender.send(session.createMessage());
fail("Exception not thrown");
}
catch (InvalidDestinationException e)
{
//PASS
}
}
finally
{
connection.close();
}
}
void send() throws IOException {
try {
ActiveMQConn conn = lazyInit.get();
QueueSender sender = conn.sender;
BytesMessage bytesMessage = conn.session.createBytesMessage();
bytesMessage.writeBytes(message);
sender.send(bytesMessage);
} catch (JMSException e) {
throw ioException("Unable to send message: ", e);
}
}
@Test
public void testTempQueueDelete() throws Exception {
connection.start();
QueueSession queueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = queueSession.createTemporaryQueue();
ActiveMQConnection newConn = (ActiveMQConnection) factory.createConnection();
try {
QueueSession newQueueSession = newConn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender queueSender = newQueueSession.createSender(tempQueue);
Message msg = queueSession.createMessage();
queueSender.send(msg);
try {
QueueReceiver consumer = newQueueSession.createReceiver(tempQueue);
fail("should have gotten exception but got consumer: " + consumer);
} catch (JMSException ex) {
//correct
}
connection.close();
try {
Message newMsg = newQueueSession.createMessage();
queueSender.send(newMsg);
} catch (JMSException e) {
//ok
}
} finally {
newConn.close();
}
}
/**
* com.sun.ts.tests.jms.ee.all.queueconn.QueueConnTest line 171
*/
@Test
public void testCreateReceiverWithMessageSelector() throws Exception {
QueueConnection qc = null;
try {
qc = createQueueConnection();
QueueSession qs = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver qreceiver = qs.createReceiver(queue1, "targetMessage = TRUE");
qc.start();
TextMessage m = qs.createTextMessage();
m.setText("one");
m.setBooleanProperty("targetMessage", false);
QueueSender qsender = qs.createSender(queue1);
qsender.send(m);
m.setText("two");
m.setBooleanProperty("targetMessage", true);
qsender.send(m);
TextMessage rm = (TextMessage) qreceiver.receive(1000);
ProxyAssertSupport.assertEquals("two", rm.getText());
} finally {
if (qc != null) {
qc.close();
}
Thread.sleep(2000);
removeAllMessages(queue1.getQueueName(), true);
checkEmpty(queue1);
}
}
@Test(timeout = 10000)
public void testSendToQueueWithNullOnExplicitQueueSender() throws Exception {
Queue queue = session.createQueue(getTestName());
QueueSender sender = session.createSender(null);
Message message = session.createMessage();
sender.send(queue, message);
JmsOutboundMessageDispatch envelope = remotePeer.getLastReceivedMessage();
assertNotNull(envelope);
message = envelope.getMessage();
Destination destination = message.getJMSDestination();
assertEquals(queue, destination);
}
@Test(timeout = 10000)
public void testSendToQueueWithDeliveryOptsWithNullOnExplicitQueueSender() throws Exception {
Queue queue = session.createQueue(getTestName());
QueueSender sender = session.createSender(null);
Message message = session.createMessage();
sender.send(queue, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
JmsOutboundMessageDispatch envelope = remotePeer.getLastReceivedMessage();
assertNotNull(envelope);
message = envelope.getMessage();
Destination destination = message.getJMSDestination();
assertEquals(queue, destination);
}
@Test(timeout = 10000)
public void testSendToQueueWithNullOnExplicitQueueSenderThrowsInvalidDestinationException() throws Exception {
Queue queue = session.createQueue(getTestName());
QueueSender sender = session.createSender(queue);
Message message = session.createMessage();
try {
sender.send((Queue) null, message);
fail("Expected exception to be thrown");
} catch (InvalidDestinationException ide) {
// expected
}
}
@Test(timeout = 10000)
public void testSendToQueueWithDeliveryOptsWithNullOnExplicitQueueSenderThrowsInvalidDestinationException() throws Exception {
Queue queue = session.createQueue(getTestName());
QueueSender sender = session.createSender(queue);
Message message = session.createMessage();
try {
sender.send((Queue) null, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
fail("Expected exception to be thrown");
} catch (InvalidDestinationException ide) {
// expected
}
}
@Test
public void testSendEmptyMessages() throws Exception {
Queue dest = new ActiveMQQueue(queueName);
QueueSession defaultQueueSession = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender defaultSender = defaultQueueSession.createSender(dest);
defaultSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
connection.start();
Message msg = defaultQueueSession.createMessage();
msg.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(msg);
QueueReceiver queueReceiver = defaultQueueSession.createReceiver(dest);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//bytes
BytesMessage bytesMessage = defaultQueueSession.createBytesMessage();
bytesMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(bytesMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//map
MapMessage mapMessage = defaultQueueSession.createMapMessage();
mapMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(mapMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//object
ObjectMessage objMessage = defaultQueueSession.createObjectMessage();
objMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(objMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//stream
StreamMessage streamMessage = defaultQueueSession.createStreamMessage();
streamMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(streamMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
//text
TextMessage textMessage = defaultQueueSession.createTextMessage();
textMessage.setStringProperty("testName", "testSendEmptyMessages");
defaultSender.send(textMessage);
assertNotNull("Didn't receive message", queueReceiver.receive(1000));
}
/**
* Sends a message,and if transacted, the queueSession is committed.
* <p>This method is intended for <b>clients</b>, as <b>server</b>s
* will use the <code>sendReply</code>.
* @return the correlationID of the sent message
*/
public TextMessage sendMessage(QueueSession session, QueueSender sender, String message, Map udzMap, String bifName, byte btcData[])
throws IfsaException {
try {
if (!isRequestor()) {
throw new IfsaException(getLogPrefix()+ "Provider cannot use sendMessage, should use sendReply");
}
IFSATextMessage msg = (IFSATextMessage)session.createTextMessage();
msg.setText(message);
if (udzMap != null && msg instanceof IFSAMessage) {
// Handle UDZs
log.debug(getLogPrefix()+"add UDZ map to IFSAMessage");
// process the udzMap
Map udzObject = (Map)((IFSAMessage) msg).getOutgoingUDZObject();
udzObject.putAll(udzMap);
}
String replyToQueueName="-";
//Client side
if (messageProtocol.equals(IfsaMessageProtocolEnum.REQUEST_REPLY)) {
// set reply-to address
Queue replyTo=getMessagingSource().getClientReplyQueue(session);
msg.setJMSReplyTo(replyTo);
replyToQueueName=replyTo.getQueueName();
}
if (messageProtocol.equals(IfsaMessageProtocolEnum.FIRE_AND_FORGET)) {
// not applicable
}
if (StringUtils.isNotEmpty(bifName)) {
msg.setBifName(bifName);
}
if (btcData!=null && btcData.length>0) {
msg.setBtcData(btcData);
}
if (log.isDebugEnabled()) {
log.debug(getLogPrefix()
+ " messageProtocol ["
+ messageProtocol
+ "] replyToQueueName ["
+ replyToQueueName
+ "] sending message ["
+ message
+ "]");
} else {
if (log.isInfoEnabled()) {
log.info(getLogPrefix()
+ " messageProtocol ["
+ messageProtocol
+ "] replyToQueueName ["
+ replyToQueueName
+ "] sending message");
}
}
// send the message
sender.send(msg);
// perform commit
if (isJmsTransacted() && !(messagingSource.isXaEnabledForSure() && JtaUtil.inTransaction())) {
session.commit();
log.debug(getLogPrefix()+ "committing (send) transaction");
}
return msg;
} catch (Exception e) {
throw new IfsaException(e);
}
}
protected String sendByQueue(QueueSession session, Queue destination, javax.jms.Message message) throws NamingException, JMSException {
QueueSender tqs = session.createSender(destination);
tqs.send(message);
tqs.close();
return message.getJMSMessageID();
}