类org.springframework.messaging.support.ErrorMessage源码实例Demo

下面列出了怎么用org.springframework.messaging.support.ErrorMessage的API类实例代码及写法,或者点击链接到github查看源代码。

@Test
public void testBadRepublishSetting() throws IOException {
	ConnectionFactory cf = mock(ConnectionFactory.class);
	Connection conn = mock(Connection.class);
	given(cf.createConnection()).willReturn(conn);
	Channel channel = mock(Channel.class);
	given(channel.isOpen()).willReturn(true);
	given(channel.exchangeDeclarePassive("DLX")).willThrow(new IOException());
	given(conn.createChannel(false)).willReturn(channel);
	RabbitProperties props = new RabbitProperties();
	RabbitMessageChannelBinder binder = new RabbitMessageChannelBinder(cf, props, null);
	RabbitConsumerProperties extension = new RabbitConsumerProperties();
	ExtendedConsumerProperties<RabbitConsumerProperties> bindingProps =
			new ExtendedConsumerProperties<RabbitConsumerProperties>(extension);
	MessageHandler handler = binder.getErrorMessageHandler(mock(ConsumerDestination.class), "foo", bindingProps);
	ErrorMessage message = new ErrorMessage(new RuntimeException("test"),
			Collections.singletonMap(IntegrationMessageHeaderAccessor.SOURCE_DATA,
					new Message("foo".getBytes(), new MessageProperties())));
	handler.handleMessage(message);
	handler.handleMessage(message);
	verify(channel, times(1)).exchangeDeclarePassive("DLX");
	verify(channel, never()).basicPublish(any(), any(), eq(false), any(), any());
}
 
private Message<?> outputMessage(Message<?> originalMessage,
		Message<?> retrievedMessage, MessageHeaderAccessor additionalHeaders) {
	MessageHeaderAccessor headers = MessageHeaderAccessor
			.getMutableAccessor(originalMessage);
	if (originalMessage instanceof ErrorMessage) {
		ErrorMessage errorMessage = (ErrorMessage) originalMessage;
		headers.copyHeaders(MessageHeaderPropagation.propagationHeaders(
				additionalHeaders.getMessageHeaders(),
				this.tracing.propagation().keys()));
		return new ErrorMessage(errorMessage.getPayload(),
				isWebSockets(headers) ? headers.getMessageHeaders()
						: new MessageHeaders(headers.getMessageHeaders()),
				errorMessage.getOriginalMessage());
	}
	headers.copyHeaders(additionalHeaders.getMessageHeaders());
	return new GenericMessage<>(retrievedMessage.getPayload(),
			isWebSockets(headers) ? headers.getMessageHeaders()
					: new MessageHeaders(headers.getMessageHeaders()));
}
 
@Test
public void errorMessageOriginalMessageRetained() {
	this.channel.addInterceptor(this.interceptor);
	Message<?> originalMessage = MessageBuilder.withPayload("Hello")
			.setHeader("header", "value").build();
	Message<?> failedMessage = MessageBuilder.fromMessage(originalMessage)
			.removeHeader("header").build();
	this.channel.send(new ErrorMessage(new MessagingException(failedMessage),
			originalMessage.getHeaders(), originalMessage));

	this.message = this.channel.receive();

	assertThat(this.message).isNotNull();
	assertThat(this.message).isInstanceOfSatisfying(ErrorMessage.class,
			errorMessage -> {
				assertThat(errorMessage.getOriginalMessage())
						.isSameAs(originalMessage);
				assertThat(errorMessage.getHeaders().get("header"))
						.isEqualTo("value");
			});
}
 
@Test
public void errorMessageHeadersWithNullPayloadRetained() {
	this.channel.addInterceptor(this.interceptor);
	Map<String, Object> errorChannelHeaders = new HashMap<>();
	errorChannelHeaders.put("b3", "000000000000000a-000000000000000a");
	this.channel.send(new ErrorMessage(new MessagingException("exception"),
			errorChannelHeaders));

	this.message = this.channel.receive();

	TraceContext receiveContext = parseB3SingleFormat(
			this.message.getHeaders().get("b3", String.class)).context();
	assertThat(receiveContext.traceIdString()).isEqualTo("000000000000000a");
	assertThat(receiveContext.spanIdString()).isNotEqualTo("000000000000000a");
	assertThat(this.spans).hasSize(2);
}
 
@Before
public void setup() throws Exception {
	this.method = MessageMethodArgumentResolverTests.class.getDeclaredMethod("handle",
			Message.class, Message.class, Message.class, Message.class, ErrorMessage.class, Message.class);

	this.converter = mock(MessageConverter.class);
	this.resolver = new MessageMethodArgumentResolver(this.converter);
}
 
@Test
public void resolveMessageSubclassMatch() throws Exception {
	ErrorMessage message = new ErrorMessage(new UnsupportedOperationException());
	MethodParameter parameter = new MethodParameter(this.method, 4);

	assertTrue(this.resolver.supportsParameter(parameter));
	assertSame(message, this.resolver.resolveArgument(parameter, message));
}
 
@Test
public void resolveWithMessageSubclassAndPayloadWildcard() throws Exception {
	ErrorMessage message = new ErrorMessage(new UnsupportedOperationException());
	MethodParameter parameter = new MethodParameter(this.method, 0);

	assertTrue(this.resolver.supportsParameter(parameter));
	assertSame(message, this.resolver.resolveArgument(parameter, message));
}
 
@Test
public void resolveWithWrongMessageType() throws Exception {
	UnsupportedOperationException ex = new UnsupportedOperationException();
	Message<? extends Throwable> message = new GenericMessage<Throwable>(ex);
	MethodParameter parameter = new MethodParameter(this.method, 4);

	assertTrue(this.resolver.supportsParameter(parameter));
	assertThatExceptionOfType(MethodArgumentTypeMismatchException.class).isThrownBy(() ->
			this.resolver.resolveArgument(parameter, message))
		.withMessageContaining(ErrorMessage.class.getName())
		.withMessageContaining(GenericMessage.class.getName());
}
 
@SuppressWarnings("unused")
private void handle(
		Message<?> wildcardPayload,
		Message<Integer> integerPayload,
		Message<Number> numberPayload,
		Message<? extends Number> anyNumberPayload,
		ErrorMessage subClass,
		Message<Foo> fooPayload) {
}
 
@Before
public void setup() throws Exception {
	this.method = MessageMethodArgumentResolverTests.class.getDeclaredMethod("handle",
			Message.class, Message.class, Message.class, Message.class, ErrorMessage.class, Message.class);

	this.converter = mock(MessageConverter.class);
	this.resolver = new MessageMethodArgumentResolver(this.converter);
}
 
@Test
public void resolveMessageSubclassMatch() throws Exception {
	ErrorMessage message = new ErrorMessage(new UnsupportedOperationException());
	MethodParameter parameter = new MethodParameter(this.method, 4);

	assertTrue(this.resolver.supportsParameter(parameter));
	assertSame(message, this.resolver.resolveArgument(parameter, message));
}
 
@Test
public void resolveWithMessageSubclassAndPayloadWildcard() throws Exception {
	ErrorMessage message = new ErrorMessage(new UnsupportedOperationException());
	MethodParameter parameter = new MethodParameter(this.method, 0);

	assertTrue(this.resolver.supportsParameter(parameter));
	assertSame(message, this.resolver.resolveArgument(parameter, message));
}
 
@Test
public void resolveWithWrongMessageType() throws Exception {
	UnsupportedOperationException ex = new UnsupportedOperationException();
	Message<? extends Throwable> message = new GenericMessage<Throwable>(ex);
	MethodParameter parameter = new MethodParameter(this.method, 4);

	assertTrue(this.resolver.supportsParameter(parameter));
	thrown.expect(MethodArgumentTypeMismatchException.class);
	thrown.expectMessage(ErrorMessage.class.getName());
	thrown.expectMessage(GenericMessage.class.getName());
	assertSame(message, this.resolver.resolveArgument(parameter, message));
}
 
@SuppressWarnings("unused")
private void handle(
		Message<?> wildcardPayload,
		Message<Integer> integerPayload,
		Message<Number> numberPayload,
		Message<? extends Number> anyNumberPayload,
		ErrorMessage subClass,
		Message<Foo> fooPayload) {
}
 
@Test
public void resolveMessageSubTypeExactMatch() throws Exception {
	ErrorMessage message = new ErrorMessage(new UnsupportedOperationException());
	MethodParameter parameter = new MethodParameter(this.method, 4);

	assertTrue("Parameter '" + parameter + "' should be supported", this.resolver.supportsParameter(parameter));
	assertSame(message, this.resolver.resolveArgument(parameter, message));
}
 
@Test
public void resolveMessageSubTypeSubClass() throws Exception {
	ErrorMessage message = new ErrorMessage(new UnsupportedOperationException());
	MethodParameter parameter = new MethodParameter(this.method, 0);

	assertTrue("Parameter '" + parameter + "' should be supported", this.resolver.supportsParameter(parameter));
	assertSame(message, this.resolver.resolveArgument(parameter, message));
}
 
@Test
public void resolveWrongMessageType() throws Exception {
	Message<? extends Throwable> message = new GenericMessage<Throwable>(new UnsupportedOperationException());
	MethodParameter parameter = new MethodParameter(this.method, 4);

	assertTrue("Parameter '" + parameter + "' should be supported", this.resolver.supportsParameter(parameter));
	thrown.expect(MethodArgumentTypeMismatchException.class);
	thrown.expectMessage(ErrorMessage.class.getName());
	thrown.expectMessage(GenericMessage.class.getName());
	assertSame(message, this.resolver.resolveArgument(parameter, message));
}
 
@SuppressWarnings("unused")
private void handleMessage(
		Message<?> wildcardPayload,
		Message<Integer> integerPayload,
		Message<Number> numberPayload,
		Message<? extends Number> anyNumberPayload,
		ErrorMessage subClass) {
}
 
@Override
protected MessageHandler getPolledConsumerErrorMessageHandler(
		ConsumerDestination destination, String group,
		ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
	if (properties.getExtension().isEnableDlq()) {
		return getErrorMessageHandler(destination, group, properties);
	}
	final MessageHandler superHandler = super.getErrorMessageHandler(destination,
			group, properties);
	return (message) -> {
		ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) message.getHeaders()
				.get(KafkaHeaders.RAW_DATA);
		if (!(message instanceof ErrorMessage)) {
			logger.error("Expected an ErrorMessage, not a "
					+ message.getClass().toString() + " for: " + message);
		}
		else if (record == null) {
			if (superHandler != null) {
				superHandler.handleMessage(message);
			}
		}
		else {
			if (message.getPayload() instanceof MessagingException) {
				AcknowledgmentCallback ack = StaticMessageHeaderAccessor
						.getAcknowledgmentCallback(
								((MessagingException) message.getPayload())
										.getFailedMessage());
				if (ack != null) {
					if (isAutoCommitOnError(properties)) {
						ack.acknowledge(AcknowledgmentCallback.Status.REJECT);
					}
					else {
						ack.acknowledge(AcknowledgmentCallback.Status.REQUEUE);
					}
				}
			}
		}
	};
}
 
@Override
protected MessageHandler getPolledConsumerErrorMessageHandler(
		ConsumerDestination destination, String group,
		ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
	MessageHandler handler = getErrorMessageHandler(destination, group, properties);
	if (handler != null) {
		return handler;
	}
	final MessageHandler superHandler = super.getErrorMessageHandler(destination,
			group, properties);
	return message -> {
		Message amqpMessage = (Message) message.getHeaders()
				.get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE);
		if (!(message instanceof ErrorMessage)) {
			logger.error("Expected an ErrorMessage, not a "
					+ message.getClass().toString() + " for: " + message);
		}
		else if (amqpMessage == null) {
			if (superHandler != null) {
				superHandler.handleMessage(message);
			}
		}
		else {
			if (message.getPayload() instanceof MessagingException) {
				AcknowledgmentCallback ack = StaticMessageHeaderAccessor
						.getAcknowledgmentCallback(
								((MessagingException) message.getPayload())
										.getFailedMessage());
				if (ack != null) {
					if (properties.getExtension().isRequeueRejected()) {
						ack.acknowledge(Status.REQUEUE);
					}
					else {
						ack.acknowledge(Status.REJECT);
					}
				}
			}
		}
	};
}
 
/**
 * This starts a consumer span as a child of the incoming message or the current trace
 * context, placing it in scope until the receive completes.
 */
@Override
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
	if (emptyMessage(message)) {
		return message;
	}
	MessageHeaderAccessor headers = mutableHeaderAccessor(message);
	TraceContextOrSamplingFlags extracted = this.extractor.extract(headers);
	Span span = this.threadLocalSpan.next(extracted);
	MessageHeaderPropagation.removeAnyTraceHeaders(headers,
			this.tracing.propagation().keys());
	this.injector.inject(span.context(), headers);
	if (!span.isNoop()) {
		span.kind(Span.Kind.CONSUMER).name("receive").start();
		span.remoteServiceName(toRemoteServiceName(headers));
		addTags(message, span, channel);
	}
	if (log.isDebugEnabled()) {
		log.debug("Created a new span in post receive " + span);
	}
	headers.setImmutable();
	if (message instanceof ErrorMessage) {
		ErrorMessage errorMessage = (ErrorMessage) message;
		return new ErrorMessage(errorMessage.getPayload(),
				headers.getMessageHeaders(), errorMessage.getOriginalMessage());
	}
	return new GenericMessage<>(message.getPayload(), headers.getMessageHeaders());
}
 
/**
 * This starts a consumer span as a child of the incoming message or the current trace
 * context. It then creates a span for the handler, placing it in scope.
 */
@Override
public Message<?> beforeHandle(Message<?> message, MessageChannel channel,
		MessageHandler handler) {
	if (emptyMessage(message)) {
		return message;
	}
	MessageHeaderAccessor headers = mutableHeaderAccessor(message);
	TraceContextOrSamplingFlags extracted = this.extractor.extract(headers);
	// Start and finish a consumer span as we will immediately process it.
	Span consumerSpan = this.tracer.nextSpan(extracted);
	if (!consumerSpan.isNoop()) {
		consumerSpan.kind(Span.Kind.CONSUMER).start();
		consumerSpan.remoteServiceName(REMOTE_SERVICE_NAME);
		addTags(message, consumerSpan, channel);
		consumerSpan.finish();
	}
	// create and scope a span for the message processor
	this.threadLocalSpan
			.next(TraceContextOrSamplingFlags.create(consumerSpan.context()))
			.name("handle").start();
	// remove any trace headers, but don't re-inject as we are synchronously
	// processing the
	// message and can rely on scoping to access this span later.
	MessageHeaderPropagation.removeAnyTraceHeaders(headers,
			this.tracing.propagation().keys());
	if (log.isDebugEnabled()) {
		log.debug("Created a new span in before handle" + consumerSpan);
	}
	if (message instanceof ErrorMessage) {
		return new ErrorMessage((Throwable) message.getPayload(),
				headers.getMessageHeaders());
	}
	headers.setImmutable();
	return new GenericMessage<>(message.getPayload(), headers.getMessageHeaders());
}
 
@Test
public void errorMessageHeadersRetained() {
	this.channel.addInterceptor(this.interceptor);
	QueueChannel deadReplyChannel = new QueueChannel();
	QueueChannel errorsReplyChannel = new QueueChannel();
	Map<String, Object> errorChannelHeaders = new HashMap<>();
	errorChannelHeaders.put(MessageHeaders.REPLY_CHANNEL, errorsReplyChannel);
	errorChannelHeaders.put(MessageHeaders.ERROR_CHANNEL, errorsReplyChannel);
	this.channel.send(new ErrorMessage(
			new MessagingException(MessageBuilder.withPayload("hi")
					.setHeader("b3", "000000000000000a-000000000000000a")
					.setReplyChannel(deadReplyChannel)
					.setErrorChannel(deadReplyChannel).build()),
			errorChannelHeaders));

	this.message = this.channel.receive();

	assertThat(this.message).isNotNull();

	// Parse fails if trace or span ID are missing
	TraceContext context = parseB3SingleFormat(
			this.message.getHeaders().get("b3", String.class)).context();

	assertThat(context.traceIdString()).isEqualTo("000000000000000a");
	assertThat(context.spanIdString()).isNotEqualTo("000000000000000a");
	assertThat(this.spans).hasSize(2);
	assertThat(this.message.getHeaders().getReplyChannel())
			.isSameAs(errorsReplyChannel);
	assertThat(this.message.getHeaders().getErrorChannel())
			.isSameAs(errorsReplyChannel);
}
 
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message)
		throws Exception {
	Class<?> targetMessageType = parameter.getParameterType();
	Class<?> targetPayloadType = getPayloadType(parameter);

	if (!targetMessageType.isAssignableFrom(message.getClass())) {
		throw new MethodArgumentTypeMismatchException(message, parameter,
				"Actual message type '" + ClassUtils.getDescriptiveType(message)
						+ "' does not match expected type '"
						+ ClassUtils.getQualifiedName(targetMessageType) + "'");
	}

	Class<?> payloadClass = message.getPayload().getClass();

	if (message instanceof ErrorMessage
			|| conversionNotRequired(payloadClass, targetPayloadType)) {
		return message;
	}
	Object payload = message.getPayload();
	if (isEmptyPayload(payload)) {
		throw new MessageConversionException(message,
				"Cannot convert from actual payload type '"
						+ ClassUtils.getDescriptiveType(payload)
						+ "' to expected payload type '"
						+ ClassUtils.getQualifiedName(targetPayloadType)
						+ "' when payload is empty");
	}

	payload = convertPayload(message, parameter, targetPayloadType);
	return MessageBuilder.createMessage(payload, message.getHeaders());
}
 
@Test
@SuppressWarnings("unchecked")
public void testProducerErrorChannel() throws Exception {
	KinesisTestBinder binder = getBinder();

	final RuntimeException putRecordException = new RuntimeException(
			"putRecordRequestEx");
	final AtomicReference<Object> sent = new AtomicReference<>();
	AmazonKinesisAsync amazonKinesisMock = mock(AmazonKinesisAsync.class);
	BDDMockito
			.given(amazonKinesisMock.putRecordAsync(any(PutRecordRequest.class),
					any(AsyncHandler.class)))
			.willAnswer((Answer<Future<PutRecordResult>>) (invocation) -> {
				PutRecordRequest request = invocation.getArgument(0);
				sent.set(request.getData());
				AsyncHandler<?, ?> handler = invocation.getArgument(1);
				handler.onError(putRecordException);
				return mock(Future.class);
			});

	new DirectFieldAccessor(binder.getBinder()).setPropertyValue("amazonKinesis",
			amazonKinesisMock);

	ExtendedProducerProperties<KinesisProducerProperties> producerProps = createProducerProperties();
	producerProps.setErrorChannelEnabled(true);
	DirectChannel moduleOutputChannel = createBindableChannel("output",
			createProducerBindingProperties(producerProps));
	Binding<MessageChannel> producerBinding = binder.bindProducer("ec.0",
			moduleOutputChannel, producerProps);

	ApplicationContext applicationContext = TestUtils.getPropertyValue(
			binder.getBinder(), "applicationContext", ApplicationContext.class);
	SubscribableChannel ec = applicationContext.getBean("ec.0.errors",
			SubscribableChannel.class);
	final AtomicReference<Message<?>> errorMessage = new AtomicReference<>();
	final CountDownLatch latch = new CountDownLatch(1);
	ec.subscribe((message) -> {
		errorMessage.set(message);
		latch.countDown();
	});

	String messagePayload = "oops";
	moduleOutputChannel.send(new GenericMessage<>(messagePayload.getBytes()));

	assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
	assertThat(errorMessage.get()).isInstanceOf(ErrorMessage.class);
	assertThat(errorMessage.get().getPayload())
			.isInstanceOf(AwsRequestFailureException.class);
	AwsRequestFailureException exception = (AwsRequestFailureException) errorMessage
			.get().getPayload();
	assertThat(exception.getCause()).isSameAs(putRecordException);
	assertThat(((PutRecordRequest) exception.getRequest()).getData())
			.isSameAs(sent.get());
	producerBinding.unbind();
}
 
@Before
public void setup() throws Exception {
	this.method = MessageMethodArgumentResolverTests.class.getDeclaredMethod("handleMessage",
			Message.class, Message.class, Message.class, Message.class, ErrorMessage.class);
}
 
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
	return message instanceof ErrorMessage ? message
			: this.doPreSend(message, channel);
}
 
 类方法
 同包方法