下面列出了怎么用org.springframework.messaging.MessageDeliveryException的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
@SuppressWarnings("unchecked")
void sendMessage_withExecutionExceptionWhileSendingAsyncMessage_throwMessageDeliveryException()
throws Exception {
// Arrange
Future<SendMessageResult> future = mock(Future.class);
when(future.get(1000, TimeUnit.MILLISECONDS))
.thenThrow(new ExecutionException(new Exception()));
AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
when(amazonSqs.sendMessageAsync(any(SendMessageRequest.class)))
.thenReturn(future);
QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,
"http://testQueue");
// Assert
assertThatThrownBy(() -> queueMessageChannel
.send(MessageBuilder.withPayload("Hello").build(), 1000))
.isInstanceOf(MessageDeliveryException.class);
}
protected final void doSend(MessageChannel channel, Message<?> message, long timeout) {
Assert.notNull(channel, "MessageChannel is required");
Message<?> messageToSend = message;
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
if (accessor != null && accessor.isMutable()) {
accessor.removeHeader(this.sendTimeoutHeader);
accessor.removeHeader(this.receiveTimeoutHeader);
accessor.setImmutable();
}
else if (message.getHeaders().containsKey(this.sendTimeoutHeader)
|| message.getHeaders().containsKey(this.receiveTimeoutHeader)) {
messageToSend = MessageBuilder.fromMessage(message)
.setHeader(this.sendTimeoutHeader, null)
.setHeader(this.receiveTimeoutHeader, null)
.build();
}
boolean sent = (timeout >= 0 ? channel.send(messageToSend, timeout) : channel.send(messageToSend));
if (!sent) {
throw new MessageDeliveryException(message,
"Failed to send message to channel '" + channel + "' within timeout: " + timeout);
}
}
@Test
public void interceptorWithException() {
IllegalStateException expected = new IllegalStateException("Fake exception");
willThrow(expected).given(this.handler).handleMessage(this.message);
BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
this.channel.addInterceptor(interceptor);
this.channel.subscribe(this.handler);
try {
this.channel.send(this.message);
}
catch (MessageDeliveryException actual) {
assertSame(expected, actual.getCause());
}
verify(this.handler).handleMessage(this.message);
assertEquals(1, interceptor.getCounter().get());
assertTrue(interceptor.wasAfterHandledInvoked());
}
private MessageHandler createLateReplier(final CountDownLatch latch, final AtomicReference<Throwable> failure) {
MessageHandler handler = message -> {
try {
Thread.sleep(500);
MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel();
replyChannel.send(new GenericMessage<>("response"));
failure.set(new IllegalStateException("Expected exception"));
}
catch (InterruptedException e) {
failure.set(e);
}
catch (MessageDeliveryException ex) {
String expected = "Reply message received but the receiving thread has exited due to a timeout";
String actual = ex.getMessage();
if (!expected.equals(actual)) {
failure.set(new IllegalStateException(
"Unexpected error: '" + actual + "'"));
}
}
finally {
latch.countDown();
}
};
return handler;
}
protected final void doSend(MessageChannel channel, Message<?> message, long timeout) {
Assert.notNull(channel, "MessageChannel is required");
Message<?> messageToSend = message;
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
if (accessor != null && accessor.isMutable()) {
accessor.removeHeader(this.sendTimeoutHeader);
accessor.removeHeader(this.receiveTimeoutHeader);
accessor.setImmutable();
}
else if (message.getHeaders().containsKey(this.sendTimeoutHeader)
|| message.getHeaders().containsKey(this.receiveTimeoutHeader)) {
messageToSend = MessageBuilder.fromMessage(message)
.setHeader(this.sendTimeoutHeader, null)
.setHeader(this.receiveTimeoutHeader, null)
.build();
}
boolean sent = (timeout >= 0 ? channel.send(messageToSend, timeout) : channel.send(messageToSend));
if (!sent) {
throw new MessageDeliveryException(message,
"Failed to send message to channel '" + channel + "' within timeout: " + timeout);
}
}
@Test
public void sendWithExecutionException() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
IllegalStateException exception = new IllegalStateException("simulated exception");
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
future.setException(exception);
when(this.connection.send(any())).thenReturn(future);
this.expected.expect(MessageDeliveryException.class);
this.expected.expectCause(Matchers.sameInstance(exception));
this.session.send("/topic/foo", "sample payload".getBytes(StandardCharsets.UTF_8));
verifyNoMoreInteractions(this.connection);
}
@Test
public void interceptorWithException() {
IllegalStateException expected = new IllegalStateException("Fake exception");
willThrow(expected).given(this.handler).handleMessage(this.message);
BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
this.channel.addInterceptor(interceptor);
this.channel.subscribe(this.handler);
try {
this.channel.send(this.message);
}
catch (MessageDeliveryException actual) {
assertSame(expected, actual.getCause());
}
verify(this.handler).handleMessage(this.message);
assertEquals(1, interceptor.getCounter().get());
assertTrue(interceptor.wasAfterHandledInvoked());
}
private MessageHandler createLateReplier(final CountDownLatch latch, final AtomicReference<Throwable> failure) {
MessageHandler handler = message -> {
try {
Thread.sleep(500);
MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel();
replyChannel.send(new GenericMessage<>("response"));
failure.set(new IllegalStateException("Expected exception"));
}
catch (InterruptedException e) {
failure.set(e);
}
catch (MessageDeliveryException ex) {
String expected = "Reply message received but the receiving thread has exited due to a timeout";
String actual = ex.getMessage();
if (!expected.equals(actual)) {
failure.set(new IllegalStateException(
"Unexpected error: '" + actual + "'"));
}
}
finally {
latch.countDown();
}
};
return handler;
}
@Override
protected final void doSend(MessageChannel channel, Message<?> message) {
Assert.notNull(channel, "'channel' is required");
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
if (accessor != null && accessor.isMutable()) {
accessor.setImmutable();
}
long timeout = this.sendTimeout;
boolean sent = (timeout >= 0 ? channel.send(message, timeout) : channel.send(message));
if (!sent) {
throw new MessageDeliveryException(message,
"failed to send message to channel '" + channel + "' within timeout: " + timeout);
}
}
@Test
public void sendWithExecutionException() throws Exception {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
IllegalStateException exception = new IllegalStateException("simulated exception");
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
future.setException(exception);
when(this.connection.send(any())).thenReturn(future);
this.expected.expect(MessageDeliveryException.class);
this.expected.expectCause(Matchers.sameInstance(exception));
this.session.send("/topic/foo", "sample payload".getBytes(UTF_8));
verifyNoMoreInteractions(this.connection);
}
@Test
public void interceptorWithException() {
IllegalStateException expected = new IllegalStateException("Fake exception");
willThrow(expected).given(this.handler).handleMessage(this.message);
BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
this.channel.addInterceptor(interceptor);
this.channel.subscribe(this.handler);
try {
this.channel.send(this.message);
}
catch (MessageDeliveryException actual) {
assertSame(expected, actual.getCause());
}
verify(this.handler).handleMessage(this.message);
assertEquals(1, interceptor.getCounter().get());
assertTrue(interceptor.wasAfterHandledInvoked());
}
@Override
public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
try {
ListenableFuture<Void> future = super.forward(message, accessor);
if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
future.get();
}
return future;
}
catch (Throwable ex) {
throw new MessageDeliveryException(message, ex);
}
}
private void sendInternal(Message<?> message) {
String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
Assert.notNull(destination, "Destination header required");
long timeout = this.sendTimeout;
boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));
if (!sent) {
throw new MessageDeliveryException(message,
"Failed to send message to destination '" + destination + "' within timeout: " + timeout);
}
}
@Override
public final boolean send(Message<?> message, long timeout) {
Assert.notNull(message, "Message must not be null");
Message<?> messageToUse = message;
ChannelInterceptorChain chain = new ChannelInterceptorChain();
boolean sent = false;
try {
messageToUse = chain.applyPreSend(messageToUse, this);
if (messageToUse == null) {
return false;
}
sent = sendInternal(messageToUse, timeout);
chain.applyPostSend(messageToUse, this, sent);
chain.triggerAfterSendCompletion(messageToUse, this, sent, null);
return sent;
}
catch (Exception ex) {
chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);
if (ex instanceof MessagingException) {
throw (MessagingException) ex;
}
throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);
}
catch (Throwable err) {
MessageDeliveryException ex2 =
new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);
chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);
throw ex2;
}
}
@Override
protected void handleNoMatch(@Nullable String destination, Message<?> message) {
// MessagingRSocket will raise an error anyway if reply Mono is expected
// Here we raise a more helpful message a destination is present
// It is OK if some messages (ConnectionSetupPayload, metadataPush) are not handled
// We need a better way to avoid raising errors for those
if (StringUtils.hasText(destination)) {
throw new MessageDeliveryException("No handler for destination '" + destination + "'");
}
}
@Override
public boolean send(Message<?> message, long timeout) {
this.replyMessage = message;
boolean alreadyReceivedReply = this.hasReceived;
this.replyLatch.countDown();
String errorDescription = null;
if (this.hasTimedOut) {
errorDescription = "Reply message received but the receiving thread has exited due to a timeout";
}
else if (alreadyReceivedReply) {
errorDescription = "Reply message received but the receiving thread has already received a reply";
}
else if (this.hasSendFailed) {
errorDescription = "Reply message received but the receiving thread has exited due to " +
"an exception while sending the request message";
}
if (errorDescription != null) {
if (logger.isWarnEnabled()) {
logger.warn(errorDescription + ": " + message);
}
if (this.throwExceptionOnLateReply) {
throw new MessageDeliveryException(message, errorDescription);
}
}
return true;
}
@Test(expected = MessageDeliveryException.class)
public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception {
logger.debug("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()");
stopActiveMqBrokerAndAwait();
this.eventPublisher.expectBrokerAvailabilityEvent(false);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
this.relay.handleMessage(MessageBuilder.createMessage("test".getBytes(), headers.getMessageHeaders()));
}
@Test
public void sendWithExecutionException() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
IllegalStateException exception = new IllegalStateException("simulated exception");
SettableListenableFuture<Void> future = new SettableListenableFuture<>();
future.setException(exception);
given(this.connection.send(any())).willReturn(future);
assertThatExceptionOfType(MessageDeliveryException.class).isThrownBy(() ->
this.session.send("/topic/foo", "sample payload".getBytes(StandardCharsets.UTF_8)))
.withCause(exception);
}
@Test
public void failurePropagates() {
RuntimeException ex = new RuntimeException();
willThrow(ex).given(this.handler).handleMessage(this.message);
MessageHandler secondHandler = mock(MessageHandler.class);
this.channel.subscribe(this.handler);
this.channel.subscribe(secondHandler);
try {
this.channel.send(message);
}
catch (MessageDeliveryException actualException) {
assertThat(actualException.getCause(), equalTo(ex));
}
verifyZeroInteractions(secondHandler);
}
@Override
public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
try {
ListenableFuture<Void> future = super.forward(message, accessor);
if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
future.get();
}
return future;
}
catch (Throwable ex) {
throw new MessageDeliveryException(message, ex);
}
}
private void sendInternal(Message<?> message) {
String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
Assert.notNull(destination, "Destination header required");
long timeout = this.sendTimeout;
boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));
if (!sent) {
throw new MessageDeliveryException(message,
"Failed to send message to destination '" + destination + "' within timeout: " + timeout);
}
}
@Override
public final boolean send(Message<?> message, long timeout) {
Assert.notNull(message, "Message must not be null");
Message<?> messageToUse = message;
ChannelInterceptorChain chain = new ChannelInterceptorChain();
boolean sent = false;
try {
messageToUse = chain.applyPreSend(messageToUse, this);
if (messageToUse == null) {
return false;
}
sent = sendInternal(messageToUse, timeout);
chain.applyPostSend(messageToUse, this, sent);
chain.triggerAfterSendCompletion(messageToUse, this, sent, null);
return sent;
}
catch (Exception ex) {
chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);
if (ex instanceof MessagingException) {
throw (MessagingException) ex;
}
throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);
}
catch (Throwable err) {
MessageDeliveryException ex2 =
new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);
chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);
throw ex2;
}
}
@Override
public boolean send(Message<?> message, long timeout) {
this.replyMessage = message;
boolean alreadyReceivedReply = this.hasReceived;
this.replyLatch.countDown();
String errorDescription = null;
if (this.hasTimedOut) {
errorDescription = "Reply message received but the receiving thread has exited due to a timeout";
}
else if (alreadyReceivedReply) {
errorDescription = "Reply message received but the receiving thread has already received a reply";
}
else if (this.hasSendFailed) {
errorDescription = "Reply message received but the receiving thread has exited due to " +
"an exception while sending the request message";
}
if (errorDescription != null) {
if (logger.isWarnEnabled()) {
logger.warn(errorDescription + ": " + message);
}
if (this.throwExceptionOnLateReply) {
throw new MessageDeliveryException(message, errorDescription);
}
}
return true;
}
@Test(expected = MessageDeliveryException.class)
public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception {
logger.debug("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()");
stopActiveMqBrokerAndAwait();
this.eventPublisher.expectBrokerAvailabilityEvent(false);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
this.relay.handleMessage(MessageBuilder.createMessage("test".getBytes(), headers.getMessageHeaders()));
}
@Test
public void failurePropagates() {
RuntimeException ex = new RuntimeException();
willThrow(ex).given(this.handler).handleMessage(this.message);
MessageHandler secondHandler = mock(MessageHandler.class);
this.channel.subscribe(this.handler);
this.channel.subscribe(secondHandler);
try {
this.channel.send(message);
}
catch (MessageDeliveryException actualException) {
assertThat(actualException.getCause(), equalTo(ex));
}
verifyZeroInteractions(secondHandler);
}
@Override
public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
try {
ListenableFuture<Void> future = super.forward(message, accessor);
if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
future.get();
}
return future;
}
catch (Throwable ex) {
throw new MessageDeliveryException(message, ex);
}
}
private void sendInternal(Message<?> message) {
String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
Assert.notNull(destination);
long timeout = this.sendTimeout;
boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));
if (!sent) {
throw new MessageDeliveryException(message,
"Failed to send message to destination '" + destination + "' within timeout: " + timeout);
}
}
@Override
public boolean send(Message<?> message, long timeout) {
this.replyMessage = message;
boolean alreadyReceivedReply = this.hasReceived;
this.replyLatch.countDown();
String errorDescription = null;
if (this.hasTimedOut) {
errorDescription = "Reply message received but the receiving thread has exited due to a timeout";
}
else if (alreadyReceivedReply) {
errorDescription = "Reply message received but the receiving thread has already received a reply";
}
else if (this.hasSendFailed) {
errorDescription = "Reply message received but the receiving thread has exited due to " +
"an exception while sending the request message";
}
if (errorDescription != null) {
if (logger.isWarnEnabled()) {
logger.warn(errorDescription + ":" + message);
}
if (GenericMessagingTemplate.this.throwExceptionOnLateReply) {
throw new MessageDeliveryException(message, errorDescription);
}
}
return true;
}
@Test(expected=MessageDeliveryException.class)
public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception {
logger.debug("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()");
stopActiveMqBrokerAndAwait();
this.eventPublisher.expectBrokerAvailabilityEvent(false);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
this.relay.handleMessage(MessageBuilder.createMessage("test".getBytes(), headers.getMessageHeaders()));
}
@Test
public void failurePropagates() throws Exception {
RuntimeException ex = new RuntimeException();
willThrow(ex).given(this.handler).handleMessage(this.message);
MessageHandler secondHandler = mock(MessageHandler.class);
this.channel.subscribe(this.handler);
this.channel.subscribe(secondHandler);
try {
this.channel.send(message);
}
catch (MessageDeliveryException actualException) {
assertThat(actualException.getCause(), equalTo(ex));
}
verifyZeroInteractions(secondHandler);
}