下面列出了怎么用org.springframework.messaging.support.ErrorMessage的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testBadRepublishSetting() throws IOException {
ConnectionFactory cf = mock(ConnectionFactory.class);
Connection conn = mock(Connection.class);
given(cf.createConnection()).willReturn(conn);
Channel channel = mock(Channel.class);
given(channel.isOpen()).willReturn(true);
given(channel.exchangeDeclarePassive("DLX")).willThrow(new IOException());
given(conn.createChannel(false)).willReturn(channel);
RabbitProperties props = new RabbitProperties();
RabbitMessageChannelBinder binder = new RabbitMessageChannelBinder(cf, props, null);
RabbitConsumerProperties extension = new RabbitConsumerProperties();
ExtendedConsumerProperties<RabbitConsumerProperties> bindingProps =
new ExtendedConsumerProperties<RabbitConsumerProperties>(extension);
MessageHandler handler = binder.getErrorMessageHandler(mock(ConsumerDestination.class), "foo", bindingProps);
ErrorMessage message = new ErrorMessage(new RuntimeException("test"),
Collections.singletonMap(IntegrationMessageHeaderAccessor.SOURCE_DATA,
new Message("foo".getBytes(), new MessageProperties())));
handler.handleMessage(message);
handler.handleMessage(message);
verify(channel, times(1)).exchangeDeclarePassive("DLX");
verify(channel, never()).basicPublish(any(), any(), eq(false), any(), any());
}
private Message<?> outputMessage(Message<?> originalMessage,
Message<?> retrievedMessage, MessageHeaderAccessor additionalHeaders) {
MessageHeaderAccessor headers = MessageHeaderAccessor
.getMutableAccessor(originalMessage);
if (originalMessage instanceof ErrorMessage) {
ErrorMessage errorMessage = (ErrorMessage) originalMessage;
headers.copyHeaders(MessageHeaderPropagation.propagationHeaders(
additionalHeaders.getMessageHeaders(),
this.tracing.propagation().keys()));
return new ErrorMessage(errorMessage.getPayload(),
isWebSockets(headers) ? headers.getMessageHeaders()
: new MessageHeaders(headers.getMessageHeaders()),
errorMessage.getOriginalMessage());
}
headers.copyHeaders(additionalHeaders.getMessageHeaders());
return new GenericMessage<>(retrievedMessage.getPayload(),
isWebSockets(headers) ? headers.getMessageHeaders()
: new MessageHeaders(headers.getMessageHeaders()));
}
@Test
public void errorMessageOriginalMessageRetained() {
this.channel.addInterceptor(this.interceptor);
Message<?> originalMessage = MessageBuilder.withPayload("Hello")
.setHeader("header", "value").build();
Message<?> failedMessage = MessageBuilder.fromMessage(originalMessage)
.removeHeader("header").build();
this.channel.send(new ErrorMessage(new MessagingException(failedMessage),
originalMessage.getHeaders(), originalMessage));
this.message = this.channel.receive();
assertThat(this.message).isNotNull();
assertThat(this.message).isInstanceOfSatisfying(ErrorMessage.class,
errorMessage -> {
assertThat(errorMessage.getOriginalMessage())
.isSameAs(originalMessage);
assertThat(errorMessage.getHeaders().get("header"))
.isEqualTo("value");
});
}
@Test
public void errorMessageHeadersWithNullPayloadRetained() {
this.channel.addInterceptor(this.interceptor);
Map<String, Object> errorChannelHeaders = new HashMap<>();
errorChannelHeaders.put("b3", "000000000000000a-000000000000000a");
this.channel.send(new ErrorMessage(new MessagingException("exception"),
errorChannelHeaders));
this.message = this.channel.receive();
TraceContext receiveContext = parseB3SingleFormat(
this.message.getHeaders().get("b3", String.class)).context();
assertThat(receiveContext.traceIdString()).isEqualTo("000000000000000a");
assertThat(receiveContext.spanIdString()).isNotEqualTo("000000000000000a");
assertThat(this.spans).hasSize(2);
}
@Before
public void setup() throws Exception {
this.method = MessageMethodArgumentResolverTests.class.getDeclaredMethod("handle",
Message.class, Message.class, Message.class, Message.class, ErrorMessage.class, Message.class);
this.converter = mock(MessageConverter.class);
this.resolver = new MessageMethodArgumentResolver(this.converter);
}
@Test
public void resolveMessageSubclassMatch() throws Exception {
ErrorMessage message = new ErrorMessage(new UnsupportedOperationException());
MethodParameter parameter = new MethodParameter(this.method, 4);
assertTrue(this.resolver.supportsParameter(parameter));
assertSame(message, this.resolver.resolveArgument(parameter, message));
}
@Test
public void resolveWithMessageSubclassAndPayloadWildcard() throws Exception {
ErrorMessage message = new ErrorMessage(new UnsupportedOperationException());
MethodParameter parameter = new MethodParameter(this.method, 0);
assertTrue(this.resolver.supportsParameter(parameter));
assertSame(message, this.resolver.resolveArgument(parameter, message));
}
@Test
public void resolveWithWrongMessageType() throws Exception {
UnsupportedOperationException ex = new UnsupportedOperationException();
Message<? extends Throwable> message = new GenericMessage<Throwable>(ex);
MethodParameter parameter = new MethodParameter(this.method, 4);
assertTrue(this.resolver.supportsParameter(parameter));
assertThatExceptionOfType(MethodArgumentTypeMismatchException.class).isThrownBy(() ->
this.resolver.resolveArgument(parameter, message))
.withMessageContaining(ErrorMessage.class.getName())
.withMessageContaining(GenericMessage.class.getName());
}
@SuppressWarnings("unused")
private void handle(
Message<?> wildcardPayload,
Message<Integer> integerPayload,
Message<Number> numberPayload,
Message<? extends Number> anyNumberPayload,
ErrorMessage subClass,
Message<Foo> fooPayload) {
}
@Before
public void setup() throws Exception {
this.method = MessageMethodArgumentResolverTests.class.getDeclaredMethod("handle",
Message.class, Message.class, Message.class, Message.class, ErrorMessage.class, Message.class);
this.converter = mock(MessageConverter.class);
this.resolver = new MessageMethodArgumentResolver(this.converter);
}
@Test
public void resolveMessageSubclassMatch() throws Exception {
ErrorMessage message = new ErrorMessage(new UnsupportedOperationException());
MethodParameter parameter = new MethodParameter(this.method, 4);
assertTrue(this.resolver.supportsParameter(parameter));
assertSame(message, this.resolver.resolveArgument(parameter, message));
}
@Test
public void resolveWithMessageSubclassAndPayloadWildcard() throws Exception {
ErrorMessage message = new ErrorMessage(new UnsupportedOperationException());
MethodParameter parameter = new MethodParameter(this.method, 0);
assertTrue(this.resolver.supportsParameter(parameter));
assertSame(message, this.resolver.resolveArgument(parameter, message));
}
@Test
public void resolveWithWrongMessageType() throws Exception {
UnsupportedOperationException ex = new UnsupportedOperationException();
Message<? extends Throwable> message = new GenericMessage<Throwable>(ex);
MethodParameter parameter = new MethodParameter(this.method, 4);
assertTrue(this.resolver.supportsParameter(parameter));
thrown.expect(MethodArgumentTypeMismatchException.class);
thrown.expectMessage(ErrorMessage.class.getName());
thrown.expectMessage(GenericMessage.class.getName());
assertSame(message, this.resolver.resolveArgument(parameter, message));
}
@SuppressWarnings("unused")
private void handle(
Message<?> wildcardPayload,
Message<Integer> integerPayload,
Message<Number> numberPayload,
Message<? extends Number> anyNumberPayload,
ErrorMessage subClass,
Message<Foo> fooPayload) {
}
@Test
public void resolveMessageSubTypeExactMatch() throws Exception {
ErrorMessage message = new ErrorMessage(new UnsupportedOperationException());
MethodParameter parameter = new MethodParameter(this.method, 4);
assertTrue("Parameter '" + parameter + "' should be supported", this.resolver.supportsParameter(parameter));
assertSame(message, this.resolver.resolveArgument(parameter, message));
}
@Test
public void resolveMessageSubTypeSubClass() throws Exception {
ErrorMessage message = new ErrorMessage(new UnsupportedOperationException());
MethodParameter parameter = new MethodParameter(this.method, 0);
assertTrue("Parameter '" + parameter + "' should be supported", this.resolver.supportsParameter(parameter));
assertSame(message, this.resolver.resolveArgument(parameter, message));
}
@Test
public void resolveWrongMessageType() throws Exception {
Message<? extends Throwable> message = new GenericMessage<Throwable>(new UnsupportedOperationException());
MethodParameter parameter = new MethodParameter(this.method, 4);
assertTrue("Parameter '" + parameter + "' should be supported", this.resolver.supportsParameter(parameter));
thrown.expect(MethodArgumentTypeMismatchException.class);
thrown.expectMessage(ErrorMessage.class.getName());
thrown.expectMessage(GenericMessage.class.getName());
assertSame(message, this.resolver.resolveArgument(parameter, message));
}
@SuppressWarnings("unused")
private void handleMessage(
Message<?> wildcardPayload,
Message<Integer> integerPayload,
Message<Number> numberPayload,
Message<? extends Number> anyNumberPayload,
ErrorMessage subClass) {
}
@Override
protected MessageHandler getPolledConsumerErrorMessageHandler(
ConsumerDestination destination, String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
if (properties.getExtension().isEnableDlq()) {
return getErrorMessageHandler(destination, group, properties);
}
final MessageHandler superHandler = super.getErrorMessageHandler(destination,
group, properties);
return (message) -> {
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) message.getHeaders()
.get(KafkaHeaders.RAW_DATA);
if (!(message instanceof ErrorMessage)) {
logger.error("Expected an ErrorMessage, not a "
+ message.getClass().toString() + " for: " + message);
}
else if (record == null) {
if (superHandler != null) {
superHandler.handleMessage(message);
}
}
else {
if (message.getPayload() instanceof MessagingException) {
AcknowledgmentCallback ack = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(
((MessagingException) message.getPayload())
.getFailedMessage());
if (ack != null) {
if (isAutoCommitOnError(properties)) {
ack.acknowledge(AcknowledgmentCallback.Status.REJECT);
}
else {
ack.acknowledge(AcknowledgmentCallback.Status.REQUEUE);
}
}
}
}
};
}
@Override
protected MessageHandler getPolledConsumerErrorMessageHandler(
ConsumerDestination destination, String group,
ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
MessageHandler handler = getErrorMessageHandler(destination, group, properties);
if (handler != null) {
return handler;
}
final MessageHandler superHandler = super.getErrorMessageHandler(destination,
group, properties);
return message -> {
Message amqpMessage = (Message) message.getHeaders()
.get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
if (!(message instanceof ErrorMessage)) {
logger.error("Expected an ErrorMessage, not a "
+ message.getClass().toString() + " for: " + message);
}
else if (amqpMessage == null) {
if (superHandler != null) {
superHandler.handleMessage(message);
}
}
else {
if (message.getPayload() instanceof MessagingException) {
AcknowledgmentCallback ack = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(
((MessagingException) message.getPayload())
.getFailedMessage());
if (ack != null) {
if (properties.getExtension().isRequeueRejected()) {
ack.acknowledge(Status.REQUEUE);
}
else {
ack.acknowledge(Status.REJECT);
}
}
}
}
};
}
/**
* This starts a consumer span as a child of the incoming message or the current trace
* context, placing it in scope until the receive completes.
*/
@Override
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
if (emptyMessage(message)) {
return message;
}
MessageHeaderAccessor headers = mutableHeaderAccessor(message);
TraceContextOrSamplingFlags extracted = this.extractor.extract(headers);
Span span = this.threadLocalSpan.next(extracted);
MessageHeaderPropagation.removeAnyTraceHeaders(headers,
this.tracing.propagation().keys());
this.injector.inject(span.context(), headers);
if (!span.isNoop()) {
span.kind(Span.Kind.CONSUMER).name("receive").start();
span.remoteServiceName(toRemoteServiceName(headers));
addTags(message, span, channel);
}
if (log.isDebugEnabled()) {
log.debug("Created a new span in post receive " + span);
}
headers.setImmutable();
if (message instanceof ErrorMessage) {
ErrorMessage errorMessage = (ErrorMessage) message;
return new ErrorMessage(errorMessage.getPayload(),
headers.getMessageHeaders(), errorMessage.getOriginalMessage());
}
return new GenericMessage<>(message.getPayload(), headers.getMessageHeaders());
}
/**
* This starts a consumer span as a child of the incoming message or the current trace
* context. It then creates a span for the handler, placing it in scope.
*/
@Override
public Message<?> beforeHandle(Message<?> message, MessageChannel channel,
MessageHandler handler) {
if (emptyMessage(message)) {
return message;
}
MessageHeaderAccessor headers = mutableHeaderAccessor(message);
TraceContextOrSamplingFlags extracted = this.extractor.extract(headers);
// Start and finish a consumer span as we will immediately process it.
Span consumerSpan = this.tracer.nextSpan(extracted);
if (!consumerSpan.isNoop()) {
consumerSpan.kind(Span.Kind.CONSUMER).start();
consumerSpan.remoteServiceName(REMOTE_SERVICE_NAME);
addTags(message, consumerSpan, channel);
consumerSpan.finish();
}
// create and scope a span for the message processor
this.threadLocalSpan
.next(TraceContextOrSamplingFlags.create(consumerSpan.context()))
.name("handle").start();
// remove any trace headers, but don't re-inject as we are synchronously
// processing the
// message and can rely on scoping to access this span later.
MessageHeaderPropagation.removeAnyTraceHeaders(headers,
this.tracing.propagation().keys());
if (log.isDebugEnabled()) {
log.debug("Created a new span in before handle" + consumerSpan);
}
if (message instanceof ErrorMessage) {
return new ErrorMessage((Throwable) message.getPayload(),
headers.getMessageHeaders());
}
headers.setImmutable();
return new GenericMessage<>(message.getPayload(), headers.getMessageHeaders());
}
@Test
public void errorMessageHeadersRetained() {
this.channel.addInterceptor(this.interceptor);
QueueChannel deadReplyChannel = new QueueChannel();
QueueChannel errorsReplyChannel = new QueueChannel();
Map<String, Object> errorChannelHeaders = new HashMap<>();
errorChannelHeaders.put(MessageHeaders.REPLY_CHANNEL, errorsReplyChannel);
errorChannelHeaders.put(MessageHeaders.ERROR_CHANNEL, errorsReplyChannel);
this.channel.send(new ErrorMessage(
new MessagingException(MessageBuilder.withPayload("hi")
.setHeader("b3", "000000000000000a-000000000000000a")
.setReplyChannel(deadReplyChannel)
.setErrorChannel(deadReplyChannel).build()),
errorChannelHeaders));
this.message = this.channel.receive();
assertThat(this.message).isNotNull();
// Parse fails if trace or span ID are missing
TraceContext context = parseB3SingleFormat(
this.message.getHeaders().get("b3", String.class)).context();
assertThat(context.traceIdString()).isEqualTo("000000000000000a");
assertThat(context.spanIdString()).isNotEqualTo("000000000000000a");
assertThat(this.spans).hasSize(2);
assertThat(this.message.getHeaders().getReplyChannel())
.isSameAs(errorsReplyChannel);
assertThat(this.message.getHeaders().getErrorChannel())
.isSameAs(errorsReplyChannel);
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message)
throws Exception {
Class<?> targetMessageType = parameter.getParameterType();
Class<?> targetPayloadType = getPayloadType(parameter);
if (!targetMessageType.isAssignableFrom(message.getClass())) {
throw new MethodArgumentTypeMismatchException(message, parameter,
"Actual message type '" + ClassUtils.getDescriptiveType(message)
+ "' does not match expected type '"
+ ClassUtils.getQualifiedName(targetMessageType) + "'");
}
Class<?> payloadClass = message.getPayload().getClass();
if (message instanceof ErrorMessage
|| conversionNotRequired(payloadClass, targetPayloadType)) {
return message;
}
Object payload = message.getPayload();
if (isEmptyPayload(payload)) {
throw new MessageConversionException(message,
"Cannot convert from actual payload type '"
+ ClassUtils.getDescriptiveType(payload)
+ "' to expected payload type '"
+ ClassUtils.getQualifiedName(targetPayloadType)
+ "' when payload is empty");
}
payload = convertPayload(message, parameter, targetPayloadType);
return MessageBuilder.createMessage(payload, message.getHeaders());
}
@Test
@SuppressWarnings("unchecked")
public void testProducerErrorChannel() throws Exception {
KinesisTestBinder binder = getBinder();
final RuntimeException putRecordException = new RuntimeException(
"putRecordRequestEx");
final AtomicReference<Object> sent = new AtomicReference<>();
AmazonKinesisAsync amazonKinesisMock = mock(AmazonKinesisAsync.class);
BDDMockito
.given(amazonKinesisMock.putRecordAsync(any(PutRecordRequest.class),
any(AsyncHandler.class)))
.willAnswer((Answer<Future<PutRecordResult>>) (invocation) -> {
PutRecordRequest request = invocation.getArgument(0);
sent.set(request.getData());
AsyncHandler<?, ?> handler = invocation.getArgument(1);
handler.onError(putRecordException);
return mock(Future.class);
});
new DirectFieldAccessor(binder.getBinder()).setPropertyValue("amazonKinesis",
amazonKinesisMock);
ExtendedProducerProperties<KinesisProducerProperties> producerProps = createProducerProperties();
producerProps.setErrorChannelEnabled(true);
DirectChannel moduleOutputChannel = createBindableChannel("output",
createProducerBindingProperties(producerProps));
Binding<MessageChannel> producerBinding = binder.bindProducer("ec.0",
moduleOutputChannel, producerProps);
ApplicationContext applicationContext = TestUtils.getPropertyValue(
binder.getBinder(), "applicationContext", ApplicationContext.class);
SubscribableChannel ec = applicationContext.getBean("ec.0.errors",
SubscribableChannel.class);
final AtomicReference<Message<?>> errorMessage = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ec.subscribe((message) -> {
errorMessage.set(message);
latch.countDown();
});
String messagePayload = "oops";
moduleOutputChannel.send(new GenericMessage<>(messagePayload.getBytes()));
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(errorMessage.get()).isInstanceOf(ErrorMessage.class);
assertThat(errorMessage.get().getPayload())
.isInstanceOf(AwsRequestFailureException.class);
AwsRequestFailureException exception = (AwsRequestFailureException) errorMessage
.get().getPayload();
assertThat(exception.getCause()).isSameAs(putRecordException);
assertThat(((PutRecordRequest) exception.getRequest()).getData())
.isSameAs(sent.get());
producerBinding.unbind();
}
@Before
public void setup() throws Exception {
this.method = MessageMethodArgumentResolverTests.class.getDeclaredMethod("handleMessage",
Message.class, Message.class, Message.class, Message.class, ErrorMessage.class);
}
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return message instanceof ErrorMessage ? message
: this.doPreSend(message, channel);
}