下面列出了怎么用javax.jms.CompletionListener的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException {
checkClosed();
checkDestinationNotInvalid(destination);
if (!anonymousProducer) {
throw new UnsupportedOperationException("Using this method is not supported on producers created with an explicit Destination.");
}
if (listener == null) {
throw new IllegalArgumentException("CompletionListener cannot be null");
}
sendMessage(destination, message, deliveryMode, priority, timeToLive, listener);
}
@Override
protected void verifyPublishedMessage() throws Exception {
final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
verify(messageProducer, timeout(1000)).send(messageCaptor.capture(), any(CompletionListener.class));
final Message message = messageCaptor.getValue();
assertThat(message).isNotNull();
assertThat(message.getStringProperty("thing_id")).isEqualTo(TestConstants.Things.THING_ID.toString());
assertThat(message.getStringProperty("suffixed_thing_id")).isEqualTo(
TestConstants.Things.THING_ID + ".some.suffix");
assertThat(message.getStringProperty("prefixed_thing_id")).isEqualTo(
"some.prefix." + TestConstants.Things.THING_ID);
assertThat(message.getStringProperty("eclipse")).isEqualTo("ditto");
assertThat(message.getStringProperty("device_id"))
.isEqualTo(TestConstants.Things.THING_ID.toString());
}
private static void sendThingEventAndExpectPublish(final ActorRef amqpClientActor,
final Target target,
final Supplier<MessageProducer> messageProducerSupplier)
throws JMSException {
final String uuid = UUID.randomUUID().toString();
final ThingModifiedEvent thingModifiedEvent =
TestConstants.thingModified(Collections.emptyList(), Attributes.newBuilder().set("uuid", uuid).build())
.setDittoHeaders(DittoHeaders.newBuilder().putHeader("reply-to", target.getAddress()).build());
final OutboundSignal outboundSignal =
OutboundSignalFactory.newOutboundSignal(thingModifiedEvent, singletonList(target));
amqpClientActor.tell(outboundSignal, ActorRef.noSender());
final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
final MessageProducer messageProducer = messageProducerSupplier.get();
verify(messageProducer, timeout(2000).times(1))
.send(messageCaptor.capture(), any(CompletionListener.class));
final Message message = messageCaptor.getValue();
assertThat(message).isNotNull();
assertThat(message.getBody(String.class)).contains(uuid);
assertThat(message.getBody(String.class)).contains(
TestConstants.Things.NAMESPACE + "/" + TestConstants.Things.ID + "/" +
TopicPath.Group.THINGS.getName() + "/" + TopicPath.Channel.TWIN.getName() + "/" +
TopicPath.Criterion.EVENTS.getName() + "/" + TopicPath.Action.MODIFIED.getName());
}
@JMS2_0 public void send(Destination destination, Message message, int deliveryMode, int priority,
long timeToLive, CompletionListener completionListener) throws JMSException {
Span span = createAndStartProducerSpan(message, destination);
completionListener = TracingCompletionListener.create(completionListener, destination, span, current);
SpanInScope ws = tracer.withSpanInScope(span);
Throwable error = null;
try {
delegate.send(destination, message, deliveryMode, priority, timeToLive, completionListener);
} catch (Throwable t) {
propagateIfFatal(t);
error = t;
throw t;
} finally {
if (error != null) span.error(error).finish();
ws.close();
}
}
@Override
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException {
checkClosed();
checkDestinationNotInvalid(destination);
if (!anonymousProducer) {
throw new UnsupportedOperationException("Using this method is not supported on producers created with an explicit Destination.");
}
if (listener == null) {
throw new IllegalArgumentException("JmsCompletetionListener cannot be null");
}
sendMessage(destination, message, deliveryMode, priority, timeToLive, listener);
}
@JMS2_0
public void send(Message message, CompletionListener completionListener) throws JMSException {
Destination destination = destination(message);
Span span = createAndStartProducerSpan(message, destination);
SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
Throwable error = null;
try {
delegate.send(message, TracingCompletionListener.create(completionListener, destination, span, current));
} catch (Throwable t) {
propagateIfFatal(t);
error = t;
throw t;
} finally {
if (error != null) span.error(error).finish();
ws.close();
}
}
@JMS2_0 public void send(Message message, int deliveryMode, int priority, long timeToLive,
CompletionListener completionListener) throws JMSException {
Destination destination = destination(message);
Span span = createAndStartProducerSpan(message, destination);
completionListener = TracingCompletionListener.create(completionListener, destination, span, current);
SpanInScope ws = tracer.withSpanInScope(span); // animal-sniffer mistakes this for AutoCloseable
Throwable error = null;
try {
delegate.send(message, deliveryMode, priority, timeToLive, completionListener);
} catch (Throwable t) {
propagateIfFatal(t);
error = t;
throw t;
} finally {
if (error != null) span.error(error).finish();
ws.close();
}
}
@JMS2_0 public void send(Destination destination, Message message,
CompletionListener completionListener) throws JMSException {
Span span = createAndStartProducerSpan(message, destination);
completionListener = TracingCompletionListener.create(completionListener, destination, span, current);
SpanInScope ws = tracer.withSpanInScope(span);
Throwable error = null;
try {
delegate.send(destination, message, completionListener);
} catch (Throwable t) {
propagateIfFatal(t);
error = t;
throw t;
} finally {
if (error != null) span.error(error).finish();
ws.close();
}
}
@Test public void on_exception_should_forward_then_set_error() {
Span span = tracing.tracer().nextSpan().start();
CompletionListener delegate = mock(CompletionListener.class);
CompletionListener tracingCompletionListener =
TracingCompletionListener.create(delegate, destination, span, currentTraceContext);
RuntimeException error = new RuntimeException("Test exception");
tracingCompletionListener.onException(message, error);
verify(delegate).onException(message, error);
assertThat(testSpanHandler.takeLocalSpan().error()).isEqualTo(error);
}
@Test public void on_exception_should_set_error_if_exception() {
Message message = mock(Message.class);
Span span = tracing.tracer().nextSpan().start();
RuntimeException error = new RuntimeException("Test exception");
CompletionListener tracingCompletionListener =
TracingCompletionListener.create(mock(CompletionListener.class), destination, span, currentTraceContext);
tracingCompletionListener.onException(message, error);
assertThat(testSpanHandler.takeLocalSpan().error()).isEqualTo(error);
}
@Override
public void send(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException {
checkClosed();
if (anonymousProducer) {
throw new UnsupportedOperationException("Using this method is not supported on producers created without an explicit Destination");
}
if (listener == null) {
throw new IllegalArgumentException("CompletionListener cannot be null");
}
sendMessage(destination, message, deliveryMode, priority, timeToLive, listener);
}
private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException {
MessageProducer messageProducer = getMessageProducer();
// Only one thread can use the producer at a time to allow for dynamic configuration
// changes to match what's been configured here.
synchronized (messageProducer) {
long oldDelayValue = 0;
if (deliveryDelay != 0 && session.isJMSVersionSupported(2, 0)) {
oldDelayValue = messageProducer.getDeliveryDelay();
messageProducer.setDeliveryDelay(deliveryDelay);
}
// For the non-shared MessageProducer that is also not an anonymous producer we
// need to call the send method for an explicit MessageProducer otherwise we
// would be violating the JMS specification in regards to send calls.
//
// In all other cases we create an anonymous producer so we call the send with
// destination parameter version.
try {
if (getDelegate().getDestination() != null) {
if (listener == null) {
messageProducer.send(message, deliveryMode, priority, timeToLive);
} else {
messageProducer.send(message, deliveryMode, priority, timeToLive, listener);
}
} else {
if (listener == null) {
messageProducer.send(destination, message, deliveryMode, priority, timeToLive);
} else {
messageProducer.send(destination, message, deliveryMode, priority, timeToLive, listener);
}
}
} finally {
if (deliveryDelay != 0 && session.isJMSVersionSupported(2, 0)) {
messageProducer.setDeliveryDelay(oldDelayValue);
}
}
}
}
@Override
public void send(Message message, CompletionListener completionListener) throws JMSException {
checkClosed();
if (anonymousProducer) {
throw new UnsupportedOperationException("Using this method is not supported on producers created without an explicit Destination");
}
if (completionListener == null) {
throw new IllegalArgumentException("CompletetionListener cannot be null");
}
session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryDelay, completionListener);
}
@Override
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException {
checkClosed();
checkDestinationNotInvalid(destination);
if (!anonymousProducer) {
throw new UnsupportedOperationException("Using this method is not supported on producers created with an explicit Destination.");
}
if (completionListener == null) {
throw new IllegalArgumentException("CompletionListener cannot be null");
}
session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryMode, null);
}
@Test public void on_completion_should_finish_span() {
Span span = tracing.tracer().nextSpan().start();
CompletionListener tracingCompletionListener =
TracingCompletionListener.create(mock(CompletionListener.class), destination, span, currentTraceContext);
tracingCompletionListener.onCompletion(message);
testSpanHandler.takeLocalSpan();
}
@Override
protected void verifyPublishedMessageToReplyTarget() throws Exception {
final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
verify(messageProducer, timeout(1000)).send(messageCaptor.capture(), any(CompletionListener.class));
final Message message = messageCaptor.getValue();
assertThat(message.getJMSCorrelationID()).isEqualTo(TestConstants.CORRELATION_ID);
assertThat(message.getStringProperty("mappedHeader2")).isEqualTo("thing:id");
}
private void testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveResponse(final Connection connection,
final BiFunction<ThingId, DittoHeaders, CommandResponse> responseSupplier,
final String expectedAddressPrefix,
final Predicate<String> messageTextPredicate) throws JMSException {
new TestKit(actorSystem) {{
final Props props =
AmqpClientActor.propsForTests(connection, getRef(), getRef(),
(ac, el) -> mockConnection);
final ActorRef amqpClientActor = actorSystem.actorOf(props);
amqpClientActor.tell(OpenConnection.of(CONNECTION_ID, DittoHeaders.empty()), getRef());
expectMsg(CONNECTED_SUCCESS);
final ArgumentCaptor<MessageListener> captor = ArgumentCaptor.forClass(MessageListener.class);
verify(mockConsumer, timeout(1000).atLeastOnce()).setMessageListener(captor.capture());
final MessageListener messageListener = captor.getValue();
messageListener.onMessage(mockMessage());
final ThingCommand command = expectMsgClass(ThingCommand.class);
assertThat((CharSequence) command.getEntityId()).isEqualTo(TestConstants.Things.THING_ID);
assertThat(command.getDittoHeaders().getCorrelationId()).contains(TestConstants.CORRELATION_ID);
assertThat(command).isInstanceOf(ModifyThing.class);
getLastSender().tell(responseSupplier.apply(command.getEntityId(), command.getDittoHeaders()), getRef());
final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
// verify that the message is published via the producer with the correct destination
final MessageProducer messageProducer =
getProducerForAddress(expectedAddressPrefix + command.getEntityId());
verify(messageProducer, timeout(2000)).send(messageCaptor.capture(), any(CompletionListener.class));
final Message message = messageCaptor.getValue();
assertThat(message).isNotNull();
assertThat(messageTextPredicate).accepts(message.getBody(String.class));
}};
}
@Test
public void testTargetAddressPlaceholderReplacement() throws JMSException {
final Connection connection =
TestConstants.createConnection(CONNECTION_ID,
TestConstants.Targets.TARGET_WITH_PLACEHOLDER);
// target Placeholder: target:{{ thing:namespace }}/{{thing:name}}@{{ topic:channel }}
final String expectedAddress =
"target:" + TestConstants.Things.NAMESPACE + "/" + TestConstants.Things.ID + "@" +
TopicPath.Channel.TWIN.getName();
new TestKit(actorSystem) {{
final Props props =
AmqpClientActor.propsForTests(connection, getRef(), getRef(),
(ac, el) -> mockConnection);
final ActorRef amqpClientActor = actorSystem.actorOf(props);
amqpClientActor.tell(OpenConnection.of(CONNECTION_ID, DittoHeaders.empty()), getRef());
expectMsg(CONNECTED_SUCCESS);
final ThingModifiedEvent thingModifiedEvent = TestConstants.thingModified(Collections.emptyList());
final OutboundSignal outboundSignal = OutboundSignalFactory.newOutboundSignal(thingModifiedEvent,
singletonList(ConnectivityModelFactory.newTargetBuilder()
.address(TestConstants.Targets.TARGET_WITH_PLACEHOLDER.getAddress())
.authorizationContext(Authorization.AUTHORIZATION_CONTEXT)
.topics(Topic.TWIN_EVENTS)
.build()));
amqpClientActor.tell(outboundSignal, getRef());
final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
final MessageProducer messageProducer = getProducerForAddress(expectedAddress);
verify(messageProducer, timeout(2000)).send(messageCaptor.capture(), any(CompletionListener.class));
final Message message = messageCaptor.getValue();
assertThat(message).isNotNull();
}};
}
@Override
public void send(Message message, CompletionListener completionListener) throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("send(" + message + ", " + completionListener + ")");
}
producer.send(message, completionListener);
}
@Override
public void send(Message message,
int deliveryMode,
int priority,
long timeToLive,
CompletionListener completionListener) throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("send(" + message + ", " + deliveryMode + ", " + priority + ", " + timeToLive +
", " + completionListener + ")");
}
producer.send(message, deliveryMode, priority, timeToLive, completionListener);
}
@Override
public void send(Destination destination,
Message message,
CompletionListener completionListener) throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("send(" + destination + ", " + message + ", " + completionListener + ")");
}
producer.send(destination, message, completionListener);
}
@Override
public void send(Destination destination,
Message message,
int deliveryMode,
int priority,
long timeToLive,
CompletionListener completionListener) throws JMSException {
if (ActiveMQRALogger.LOGGER.isTraceEnabled()) {
ActiveMQRALogger.LOGGER.trace("send(" + destination + ", " + message + ", " + deliveryMode + ", " + priority +
", " + timeToLive + ", " + completionListener + ")");
}
producer.send(destination, message, deliveryMode, priority, timeToLive, completionListener);
}
@Override
public JMSProducer send(Destination destination, Message message) {
if (message == null) {
throw new MessageFormatRuntimeException("null message");
}
try {
if (jmsHeaderCorrelationID != null) {
message.setJMSCorrelationID(jmsHeaderCorrelationID);
}
if (jmsHeaderCorrelationIDAsBytes != null && jmsHeaderCorrelationIDAsBytes.length > 0) {
message.setJMSCorrelationIDAsBytes(jmsHeaderCorrelationIDAsBytes);
}
if (jmsHeaderReplyTo != null) {
message.setJMSReplyTo(jmsHeaderReplyTo);
}
if (jmsHeaderType != null) {
message.setJMSType(jmsHeaderType);
}
// XXX HORNETQ-1209 "JMS 2.0" can this be a foreign msg?
// if so, then "SimpleString" properties will trigger an error.
setProperties(message);
if (completionListener != null) {
CompletionListener wrapped = new CompletionListenerWrapper(completionListener);
producer.send(destination, message, wrapped);
} else {
producer.send(destination, message);
}
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
return this;
}
@Override
public void send(Message message,
int deliveryMode,
int priority,
long timeToLive,
CompletionListener completionListener) throws JMSException {
checkCompletionListener(completionListener);
checkDefaultDestination();
doSendx(defaultDestination, message, deliveryMode, priority, timeToLive, completionListener);
}
@Override
public void send(Destination destination,
Message message,
int deliveryMode,
int priority,
long timeToLive,
CompletionListener completionListener) throws JMSException {
checkClosed();
checkCompletionListener(completionListener);
checkDestination(destination);
doSendx((ActiveMQDestination) destination, message, deliveryMode, priority, timeToLive, completionListener);
}
/**
* @param jmsMessage
* @param producer
*/
private CompletionListenerWrapper(CompletionListener listener,
Message jmsMessage,
ActiveMQMessageProducer producer) {
this.completionListener = listener;
this.jmsMessage = jmsMessage;
this.producer = producer;
}
@Override
public void send(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException {
checkClosed();
if (anonymousProducer) {
throw new UnsupportedOperationException("Using this method is not supported on producers created without an explicit Destination");
}
if (listener == null) {
throw new IllegalArgumentException("JmsCompletetionListener cannot be null");
}
sendMessage(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive, listener);
}
private void sendMessages(int count, JmsMessageProducer producer, CompletionListener listener) throws Exception {
for (int i = 0; i < count; ++i) {
Message message = session.createMessage();
message.setIntProperty("sequence", i);
producer.send(message, listener);
}
}
@Test public void on_completion_should_forward_then_finish_span() {
Span span = tracing.tracer().nextSpan().start();
CompletionListener delegate = mock(CompletionListener.class);
CompletionListener tracingCompletionListener =
TracingCompletionListener.create(delegate, destination, span, currentTraceContext);
tracingCompletionListener.onCompletion(message);
verify(delegate).onCompletion(message);
testSpanHandler.takeLocalSpan();
}
TracingCompletionListener(CompletionListener delegate, Destination destination, Span span,
CurrentTraceContext current) {
this.delegate = delegate;
this.destination = destination;
this.span = span;
this.current = current;
}