下面列出了javax.jms.BytesMessage#setJMSReplyTo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void createByteMsgs(int numMessages) throws Exception
{
BytesMessage message = testMeta.session.createBytesMessage();
for (int i = 0; i < numMessages; i++) {
message.writeBytes(("Message: " + i).getBytes());
message.setIntProperty("counter", i);
message.setJMSCorrelationID("MyCorrelationID");
message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo"));
message.setJMSType("MyType");
message.setJMSPriority(5);
testMeta.producer.send(message);
}
}
private void postBook(Session session, Destination destination, Destination replyTo)
throws Exception {
MessageProducer producer = session.createProducer(destination);
byte[] payload = writeBook(new Book("JMS", 3L));
BytesMessage message = session.createBytesMessage();
message.writeBytes(payload);
message.setJMSReplyTo(replyTo);
// or, if oneway,
// message.setStringProperty("OnewayRequest", "true");
// we could've set this header in JMSDestination if no replyTo were set
// but in CXF one could also provide the replyTo in the configuration
// so it is just simpler to set this header if needed to avoid some
// complex logic on the server side
// all these properties are optional
// CXF JAXRS and JMS Transport will default to
// Content-Type : text/xml
// Accept : */*
// POST
// Message.REQUEST_URI : "/"
message.setStringProperty("Content-Type", "application/xml");
message.setStringProperty("Accept", "text/xml");
message.setStringProperty(org.apache.cxf.message.Message.REQUEST_URI, "/bookstore/books");
message.setStringProperty(org.apache.cxf.message.Message.HTTP_REQUEST_METHOD, "POST");
message.setStringProperty("custom.protocol.header", "custom.value");
producer.send(message);
producer.close();
}
@Override
public void onMessage(final BytesMessage message, final Session session) throws JMSException {
final ProtocolRequest request = (ProtocolRequest) mqMessageConverter.fromMessage(message);
// 如果消费线程里面放子线程:
// 好处是可以加快消费速度,减少MQ消息堆积,
// 坏处是如果子线程消费速度跟不上,会造成消息在内存中的堆积,一旦服务器挂掉,消息全部丢失
// 最终还是去掉子线程
String interfaze = request.getInterface();
/*ThreadPoolFactory.createThreadPoolServerExecutor(interfaze).submit(new Callable<Object>() {
@Override
public Object call() throws Exception {*/
String requestSelector = MQSelectorUtil.getRequestSelector(message);
if (transportLogPrint) {
LOG.info("Request from client={}, service={}", requestSelector, interfaze);
}
final ProtocolResponse response = new ProtocolResponse();
try {
ServerExecutorAdapter serverExecutorAdapter = executorContainer.getServerExecutorAdapter();
serverExecutorAdapter.handle(request, response);
} catch (Exception e) {
LOG.error("Consume request failed", e);
}
boolean feedback = request.isFeedback();
if (feedback) {
try {
Destination requestDestination = message.getJMSReplyTo();
ApplicationEntity applicationEntity = cacheContainer.getApplicationEntity();
mqProducer.produceResponse(requestDestination, applicationEntity, response, requestSelector);
} finally {
message.setJMSReplyTo(null);
}
}
/*return null;
}
});*/
}