类org.springframework.messaging.MessageDeliveryException源码实例Demo

下面列出了怎么用org.springframework.messaging.MessageDeliveryException的API类实例代码及写法,或者点击链接到github查看源代码。

@Test
@SuppressWarnings("unchecked")
void sendMessage_withExecutionExceptionWhileSendingAsyncMessage_throwMessageDeliveryException()
		throws Exception {
	// Arrange
	Future<SendMessageResult> future = mock(Future.class);
	when(future.get(1000, TimeUnit.MILLISECONDS))
			.thenThrow(new ExecutionException(new Exception()));
	AmazonSQSAsync amazonSqs = mock(AmazonSQSAsync.class);
	when(amazonSqs.sendMessageAsync(any(SendMessageRequest.class)))
			.thenReturn(future);
	QueueMessageChannel queueMessageChannel = new QueueMessageChannel(amazonSqs,
			"http://testQueue");

	// Assert
	assertThatThrownBy(() -> queueMessageChannel
			.send(MessageBuilder.withPayload("Hello").build(), 1000))
					.isInstanceOf(MessageDeliveryException.class);
}
 
protected final void doSend(MessageChannel channel, Message<?> message, long timeout) {
	Assert.notNull(channel, "MessageChannel is required");

	Message<?> messageToSend = message;
	MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
	if (accessor != null && accessor.isMutable()) {
		accessor.removeHeader(this.sendTimeoutHeader);
		accessor.removeHeader(this.receiveTimeoutHeader);
		accessor.setImmutable();
	}
	else if (message.getHeaders().containsKey(this.sendTimeoutHeader)
			|| message.getHeaders().containsKey(this.receiveTimeoutHeader)) {
		messageToSend = MessageBuilder.fromMessage(message)
				.setHeader(this.sendTimeoutHeader, null)
				.setHeader(this.receiveTimeoutHeader, null)
				.build();
	}

	boolean sent = (timeout >= 0 ? channel.send(messageToSend, timeout) : channel.send(messageToSend));

	if (!sent) {
		throw new MessageDeliveryException(message,
				"Failed to send message to channel '" + channel + "' within timeout: " + timeout);
	}
}
 
@Test
public void interceptorWithException() {
	IllegalStateException expected = new IllegalStateException("Fake exception");
	willThrow(expected).given(this.handler).handleMessage(this.message);
	BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
	this.channel.addInterceptor(interceptor);
	this.channel.subscribe(this.handler);
	try {
		this.channel.send(this.message);
	}
	catch (MessageDeliveryException actual) {
		assertSame(expected, actual.getCause());
	}
	verify(this.handler).handleMessage(this.message);
	assertEquals(1, interceptor.getCounter().get());
	assertTrue(interceptor.wasAfterHandledInvoked());
}
 
private MessageHandler createLateReplier(final CountDownLatch latch, final AtomicReference<Throwable> failure) {
	MessageHandler handler = message -> {
		try {
			Thread.sleep(500);
			MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel();
			replyChannel.send(new GenericMessage<>("response"));
			failure.set(new IllegalStateException("Expected exception"));
		}
		catch (InterruptedException e) {
			failure.set(e);
		}
		catch (MessageDeliveryException ex) {
			String expected = "Reply message received but the receiving thread has exited due to a timeout";
			String actual = ex.getMessage();
			if (!expected.equals(actual)) {
				failure.set(new IllegalStateException(
						"Unexpected error: '" + actual + "'"));
			}
		}
		finally {
			latch.countDown();
		}
	};
	return handler;
}
 
protected final void doSend(MessageChannel channel, Message<?> message, long timeout) {
	Assert.notNull(channel, "MessageChannel is required");

	Message<?> messageToSend = message;
	MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
	if (accessor != null && accessor.isMutable()) {
		accessor.removeHeader(this.sendTimeoutHeader);
		accessor.removeHeader(this.receiveTimeoutHeader);
		accessor.setImmutable();
	}
	else if (message.getHeaders().containsKey(this.sendTimeoutHeader)
			|| message.getHeaders().containsKey(this.receiveTimeoutHeader)) {
		messageToSend = MessageBuilder.fromMessage(message)
				.setHeader(this.sendTimeoutHeader, null)
				.setHeader(this.receiveTimeoutHeader, null)
				.build();
	}

	boolean sent = (timeout >= 0 ? channel.send(messageToSend, timeout) : channel.send(messageToSend));

	if (!sent) {
		throw new MessageDeliveryException(message,
				"Failed to send message to channel '" + channel + "' within timeout: " + timeout);
	}
}
 
@Test
public void sendWithExecutionException() {
	this.session.afterConnected(this.connection);
	assertTrue(this.session.isConnected());

	IllegalStateException exception = new IllegalStateException("simulated exception");
	SettableListenableFuture<Void> future = new SettableListenableFuture<>();
	future.setException(exception);

	when(this.connection.send(any())).thenReturn(future);
	this.expected.expect(MessageDeliveryException.class);
	this.expected.expectCause(Matchers.sameInstance(exception));

	this.session.send("/topic/foo", "sample payload".getBytes(StandardCharsets.UTF_8));

	verifyNoMoreInteractions(this.connection);
}
 
@Test
public void interceptorWithException() {
	IllegalStateException expected = new IllegalStateException("Fake exception");
	willThrow(expected).given(this.handler).handleMessage(this.message);
	BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
	this.channel.addInterceptor(interceptor);
	this.channel.subscribe(this.handler);
	try {
		this.channel.send(this.message);
	}
	catch (MessageDeliveryException actual) {
		assertSame(expected, actual.getCause());
	}
	verify(this.handler).handleMessage(this.message);
	assertEquals(1, interceptor.getCounter().get());
	assertTrue(interceptor.wasAfterHandledInvoked());
}
 
private MessageHandler createLateReplier(final CountDownLatch latch, final AtomicReference<Throwable> failure) {
	MessageHandler handler = message -> {
		try {
			Thread.sleep(500);
			MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel();
			replyChannel.send(new GenericMessage<>("response"));
			failure.set(new IllegalStateException("Expected exception"));
		}
		catch (InterruptedException e) {
			failure.set(e);
		}
		catch (MessageDeliveryException ex) {
			String expected = "Reply message received but the receiving thread has exited due to a timeout";
			String actual = ex.getMessage();
			if (!expected.equals(actual)) {
				failure.set(new IllegalStateException(
						"Unexpected error: '" + actual + "'"));
			}
		}
		finally {
			latch.countDown();
		}
	};
	return handler;
}
 
@Override
protected final void doSend(MessageChannel channel, Message<?> message) {
	Assert.notNull(channel, "'channel' is required");

	MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
	if (accessor != null && accessor.isMutable()) {
		accessor.setImmutable();
	}

	long timeout = this.sendTimeout;
	boolean sent = (timeout >= 0 ? channel.send(message, timeout) : channel.send(message));

	if (!sent) {
		throw new MessageDeliveryException(message,
				"failed to send message to channel '" + channel + "' within timeout: " + timeout);
	}
}
 
@Test
public void sendWithExecutionException() throws Exception {

	this.session.afterConnected(this.connection);
	assertTrue(this.session.isConnected());

	IllegalStateException exception = new IllegalStateException("simulated exception");
	SettableListenableFuture<Void> future = new SettableListenableFuture<>();
	future.setException(exception);

	when(this.connection.send(any())).thenReturn(future);
	this.expected.expect(MessageDeliveryException.class);
	this.expected.expectCause(Matchers.sameInstance(exception));

	this.session.send("/topic/foo", "sample payload".getBytes(UTF_8));

	verifyNoMoreInteractions(this.connection);
}
 
@Test
public void interceptorWithException() {
	IllegalStateException expected = new IllegalStateException("Fake exception");
	willThrow(expected).given(this.handler).handleMessage(this.message);
	BeforeHandleInterceptor interceptor = new BeforeHandleInterceptor();
	this.channel.addInterceptor(interceptor);
	this.channel.subscribe(this.handler);
	try {
		this.channel.send(this.message);
	}
	catch (MessageDeliveryException actual) {
		assertSame(expected, actual.getCause());
	}
	verify(this.handler).handleMessage(this.message);
	assertEquals(1, interceptor.getCounter().get());
	assertTrue(interceptor.wasAfterHandledInvoked());
}
 
@Override
public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
	try {
		ListenableFuture<Void> future = super.forward(message, accessor);
		if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
			future.get();
		}
		return future;
	}
	catch (Throwable ex) {
		throw new MessageDeliveryException(message, ex);
	}
}
 
private void sendInternal(Message<?> message) {
	String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
	Assert.notNull(destination, "Destination header required");

	long timeout = this.sendTimeout;
	boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));

	if (!sent) {
		throw new MessageDeliveryException(message,
				"Failed to send message to destination '" + destination + "' within timeout: " + timeout);
	}
}
 
@Override
public final boolean send(Message<?> message, long timeout) {
	Assert.notNull(message, "Message must not be null");
	Message<?> messageToUse = message;
	ChannelInterceptorChain chain = new ChannelInterceptorChain();
	boolean sent = false;
	try {
		messageToUse = chain.applyPreSend(messageToUse, this);
		if (messageToUse == null) {
			return false;
		}
		sent = sendInternal(messageToUse, timeout);
		chain.applyPostSend(messageToUse, this, sent);
		chain.triggerAfterSendCompletion(messageToUse, this, sent, null);
		return sent;
	}
	catch (Exception ex) {
		chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);
		if (ex instanceof MessagingException) {
			throw (MessagingException) ex;
		}
		throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);
	}
	catch (Throwable err) {
		MessageDeliveryException ex2 =
				new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);
		chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);
		throw ex2;
	}
}
 
@Override
protected void handleNoMatch(@Nullable String destination, Message<?> message) {

	// MessagingRSocket will raise an error anyway if reply Mono is expected
	// Here we raise a more helpful message a destination is present

	// It is OK if some messages (ConnectionSetupPayload, metadataPush) are not handled
	// We need a better way to avoid raising errors for those

	if (StringUtils.hasText(destination)) {
		throw new MessageDeliveryException("No handler for destination '" + destination + "'");
	}
}
 
@Override
public boolean send(Message<?> message, long timeout) {
	this.replyMessage = message;
	boolean alreadyReceivedReply = this.hasReceived;
	this.replyLatch.countDown();

	String errorDescription = null;
	if (this.hasTimedOut) {
		errorDescription = "Reply message received but the receiving thread has exited due to a timeout";
	}
	else if (alreadyReceivedReply) {
		errorDescription = "Reply message received but the receiving thread has already received a reply";
	}
	else if (this.hasSendFailed) {
		errorDescription = "Reply message received but the receiving thread has exited due to " +
				"an exception while sending the request message";
	}

	if (errorDescription != null) {
		if (logger.isWarnEnabled()) {
			logger.warn(errorDescription + ": " + message);
		}
		if (this.throwExceptionOnLateReply) {
			throw new MessageDeliveryException(message, errorDescription);
		}
	}

	return true;
}
 
@Test(expected = MessageDeliveryException.class)
public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception {
	logger.debug("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()");

	stopActiveMqBrokerAndAwait();
	this.eventPublisher.expectBrokerAvailabilityEvent(false);

	StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
	this.relay.handleMessage(MessageBuilder.createMessage("test".getBytes(), headers.getMessageHeaders()));
}
 
@Test
public void sendWithExecutionException() {
	this.session.afterConnected(this.connection);
	assertTrue(this.session.isConnected());

	IllegalStateException exception = new IllegalStateException("simulated exception");
	SettableListenableFuture<Void> future = new SettableListenableFuture<>();
	future.setException(exception);

	given(this.connection.send(any())).willReturn(future);
	assertThatExceptionOfType(MessageDeliveryException.class).isThrownBy(() ->
			this.session.send("/topic/foo", "sample payload".getBytes(StandardCharsets.UTF_8)))
		.withCause(exception);
}
 
@Test
public void failurePropagates()  {
	RuntimeException ex = new RuntimeException();
	willThrow(ex).given(this.handler).handleMessage(this.message);
	MessageHandler secondHandler = mock(MessageHandler.class);
	this.channel.subscribe(this.handler);
	this.channel.subscribe(secondHandler);
	try {
		this.channel.send(message);
	}
	catch (MessageDeliveryException actualException) {
		assertThat(actualException.getCause(), equalTo(ex));
	}
	verifyZeroInteractions(secondHandler);
}
 
@Override
public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
	try {
		ListenableFuture<Void> future = super.forward(message, accessor);
		if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
			future.get();
		}
		return future;
	}
	catch (Throwable ex) {
		throw new MessageDeliveryException(message, ex);
	}
}
 
private void sendInternal(Message<?> message) {
	String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
	Assert.notNull(destination, "Destination header required");

	long timeout = this.sendTimeout;
	boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));

	if (!sent) {
		throw new MessageDeliveryException(message,
				"Failed to send message to destination '" + destination + "' within timeout: " + timeout);
	}
}
 
@Override
public final boolean send(Message<?> message, long timeout) {
	Assert.notNull(message, "Message must not be null");
	Message<?> messageToUse = message;
	ChannelInterceptorChain chain = new ChannelInterceptorChain();
	boolean sent = false;
	try {
		messageToUse = chain.applyPreSend(messageToUse, this);
		if (messageToUse == null) {
			return false;
		}
		sent = sendInternal(messageToUse, timeout);
		chain.applyPostSend(messageToUse, this, sent);
		chain.triggerAfterSendCompletion(messageToUse, this, sent, null);
		return sent;
	}
	catch (Exception ex) {
		chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);
		if (ex instanceof MessagingException) {
			throw (MessagingException) ex;
		}
		throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);
	}
	catch (Throwable err) {
		MessageDeliveryException ex2 =
				new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);
		chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);
		throw ex2;
	}
}
 
@Override
public boolean send(Message<?> message, long timeout) {
	this.replyMessage = message;
	boolean alreadyReceivedReply = this.hasReceived;
	this.replyLatch.countDown();

	String errorDescription = null;
	if (this.hasTimedOut) {
		errorDescription = "Reply message received but the receiving thread has exited due to a timeout";
	}
	else if (alreadyReceivedReply) {
		errorDescription = "Reply message received but the receiving thread has already received a reply";
	}
	else if (this.hasSendFailed) {
		errorDescription = "Reply message received but the receiving thread has exited due to " +
				"an exception while sending the request message";
	}

	if (errorDescription != null) {
		if (logger.isWarnEnabled()) {
			logger.warn(errorDescription + ": " + message);
		}
		if (this.throwExceptionOnLateReply) {
			throw new MessageDeliveryException(message, errorDescription);
		}
	}

	return true;
}
 
@Test(expected = MessageDeliveryException.class)
public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception {
	logger.debug("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()");

	stopActiveMqBrokerAndAwait();
	this.eventPublisher.expectBrokerAvailabilityEvent(false);

	StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
	this.relay.handleMessage(MessageBuilder.createMessage("test".getBytes(), headers.getMessageHeaders()));
}
 
@Test
public void failurePropagates()  {
	RuntimeException ex = new RuntimeException();
	willThrow(ex).given(this.handler).handleMessage(this.message);
	MessageHandler secondHandler = mock(MessageHandler.class);
	this.channel.subscribe(this.handler);
	this.channel.subscribe(secondHandler);
	try {
		this.channel.send(message);
	}
	catch (MessageDeliveryException actualException) {
		assertThat(actualException.getCause(), equalTo(ex));
	}
	verifyZeroInteractions(secondHandler);
}
 
@Override
public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
	try {
		ListenableFuture<Void> future = super.forward(message, accessor);
		if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
			future.get();
		}
		return future;
	}
	catch (Throwable ex) {
		throw new MessageDeliveryException(message, ex);
	}
}
 
private void sendInternal(Message<?> message) {
	String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
	Assert.notNull(destination);

	long timeout = this.sendTimeout;
	boolean sent = (timeout >= 0 ? this.messageChannel.send(message, timeout) : this.messageChannel.send(message));

	if (!sent) {
		throw new MessageDeliveryException(message,
				"Failed to send message to destination '" + destination + "' within timeout: " + timeout);
	}
}
 
@Override
public boolean send(Message<?> message, long timeout) {
	this.replyMessage = message;
	boolean alreadyReceivedReply = this.hasReceived;
	this.replyLatch.countDown();

	String errorDescription = null;
	if (this.hasTimedOut) {
		errorDescription = "Reply message received but the receiving thread has exited due to a timeout";
	}
	else if (alreadyReceivedReply) {
		errorDescription = "Reply message received but the receiving thread has already received a reply";
	}
	else if (this.hasSendFailed) {
		errorDescription = "Reply message received but the receiving thread has exited due to " +
				"an exception while sending the request message";
	}

	if (errorDescription != null) {
		if (logger.isWarnEnabled()) {
			logger.warn(errorDescription + ":" + message);
		}
		if (GenericMessagingTemplate.this.throwExceptionOnLateReply) {
			throw new MessageDeliveryException(message, errorDescription);
		}
	}

	return true;
}
 
@Test(expected=MessageDeliveryException.class)
public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception {

	logger.debug("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()");

	stopActiveMqBrokerAndAwait();
	this.eventPublisher.expectBrokerAvailabilityEvent(false);

	StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
	this.relay.handleMessage(MessageBuilder.createMessage("test".getBytes(), headers.getMessageHeaders()));
}
 
@Test
public void failurePropagates() throws Exception {
	RuntimeException ex = new RuntimeException();
	willThrow(ex).given(this.handler).handleMessage(this.message);
	MessageHandler secondHandler = mock(MessageHandler.class);
	this.channel.subscribe(this.handler);
	this.channel.subscribe(secondHandler);
	try {
		this.channel.send(message);
	}
	catch (MessageDeliveryException actualException) {
		assertThat(actualException.getCause(), equalTo(ex));
	}
	verifyZeroInteractions(secondHandler);
}
 
 类方法
 同包方法