javax.jms.BytesMessage#setJMSReplyTo ( )源码实例Demo

下面列出了javax.jms.BytesMessage#setJMSReplyTo ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

private void createByteMsgs(int numMessages) throws Exception
{
  BytesMessage message = testMeta.session.createBytesMessage();
  for (int i = 0; i < numMessages; i++) {
    message.writeBytes(("Message: " + i).getBytes());
    message.setIntProperty("counter", i);
    message.setJMSCorrelationID("MyCorrelationID");
    message.setJMSReplyTo(new ActiveMQQueue("MyReplyTo"));
    message.setJMSType("MyType");
    message.setJMSPriority(5);
    testMeta.producer.send(message);
  }
}
 
源代码2 项目: cxf   文件: JAXRSJmsTest.java
private void postBook(Session session, Destination destination, Destination replyTo)
    throws Exception {
    MessageProducer producer = session.createProducer(destination);
    byte[] payload = writeBook(new Book("JMS", 3L));
    BytesMessage message = session.createBytesMessage();
    message.writeBytes(payload);
    message.setJMSReplyTo(replyTo);
    // or, if oneway,
    // message.setStringProperty("OnewayRequest", "true");
    // we could've set this header in JMSDestination if no replyTo were set
    // but in CXF one could also provide the replyTo in the configuration
    // so it is just simpler to set this header if needed to avoid some
    // complex logic on the server side

    // all these properties are optional
    // CXF JAXRS and JMS Transport will default to
    // Content-Type : text/xml
    // Accept : */*
    // POST
    // Message.REQUEST_URI : "/"

    message.setStringProperty("Content-Type", "application/xml");
    message.setStringProperty("Accept", "text/xml");
    message.setStringProperty(org.apache.cxf.message.Message.REQUEST_URI, "/bookstore/books");
    message.setStringProperty(org.apache.cxf.message.Message.HTTP_REQUEST_METHOD, "POST");
    message.setStringProperty("custom.protocol.header", "custom.value");

    producer.send(message);
    producer.close();
}
 
源代码3 项目: Thunder   文件: MQServerHandler.java
@Override
public void onMessage(final BytesMessage message, final Session session) throws JMSException {
    final ProtocolRequest request = (ProtocolRequest) mqMessageConverter.fromMessage(message);
    // 如果消费线程里面放子线程:
    // 好处是可以加快消费速度,减少MQ消息堆积,
    // 坏处是如果子线程消费速度跟不上,会造成消息在内存中的堆积,一旦服务器挂掉,消息全部丢失
    // 最终还是去掉子线程
    String interfaze = request.getInterface();
    /*ThreadPoolFactory.createThreadPoolServerExecutor(interfaze).submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {*/
            String requestSelector = MQSelectorUtil.getRequestSelector(message);

            if (transportLogPrint) {
                LOG.info("Request from client={}, service={}", requestSelector, interfaze);
            }

            final ProtocolResponse response = new ProtocolResponse();
            try {
                ServerExecutorAdapter serverExecutorAdapter = executorContainer.getServerExecutorAdapter();
                serverExecutorAdapter.handle(request, response);
            } catch (Exception e) {
                LOG.error("Consume request failed", e);
            }

            boolean feedback = request.isFeedback();
            if (feedback) {
                try {
                    Destination requestDestination = message.getJMSReplyTo();

                    ApplicationEntity applicationEntity = cacheContainer.getApplicationEntity();

                    mqProducer.produceResponse(requestDestination, applicationEntity, response, requestSelector);
                } finally {
                    message.setJMSReplyTo(null);
                }
            }

            /*return null;
        }
    });*/
}