下面列出了怎么用org.springframework.messaging.MessageHandler的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
void receiveMessage_methodAnnotatedWithSqsListenerAnnotation_methodInvokedForIncomingMessage() {
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("incomingMessageHandler",
IncomingMessageHandler.class);
applicationContext.registerSingleton("queueMessageHandler",
QueueMessageHandler.class);
applicationContext.refresh();
MessageHandler messageHandler = applicationContext.getBean(MessageHandler.class);
messageHandler.handleMessage(MessageBuilder.withPayload("testContent")
.setHeader(QueueMessageHandler.LOGICAL_RESOURCE_ID, "receive").build());
IncomingMessageHandler messageListener = applicationContext
.getBean(IncomingMessageHandler.class);
assertThat(messageListener.getLastReceivedMessage()).isEqualTo("testContent");
}
@Test
void receiveMessage_methodWithMessageAsParameter_parameterIsConverted() {
new ApplicationContextRunner()
.withConfiguration(UserConfigurations
.of(QueueMessageHandlerWithJacksonMappingConfiguration.class))
.withBean(IncomingMessageHandlerWithMessageParameter.class)
.run((context) -> {
DummyKeyValueHolder messagePayload = new DummyKeyValueHolder("myKey",
"A value");
MappingJackson2MessageConverter jsonMapper = context
.getBean(MappingJackson2MessageConverter.class);
Message<?> message = jsonMapper.toMessage(messagePayload,
new MessageHeaders(Collections.singletonMap(
QueueMessageHandler.LOGICAL_RESOURCE_ID,
"testQueue")));
MessageHandler messageHandler = context.getBean(MessageHandler.class);
messageHandler.handleMessage(message);
IncomingMessageHandlerWithMessageParameter messageListener = context
.getBean(IncomingMessageHandlerWithMessageParameter.class);
assertThat(messageListener.getLastReceivedMessage()).isNotNull();
assertThat(messageListener.getLastReceivedMessage().getPayload())
.isEqualTo(messagePayload);
});
}
private void destroyErrorInfrastructure(ProducerDestination destination) {
String errorChannelName = errorsBaseName(destination);
String errorBridgeHandlerName = getErrorBridgeName(destination);
MessageHandler bridgeHandler = null;
if (getApplicationContext().containsBean(errorBridgeHandlerName)) {
bridgeHandler = getApplicationContext().getBean(errorBridgeHandlerName,
MessageHandler.class);
}
if (getApplicationContext().containsBean(errorChannelName)) {
SubscribableChannel channel = getApplicationContext()
.getBean(errorChannelName, SubscribableChannel.class);
if (bridgeHandler != null) {
channel.unsubscribe(bridgeHandler);
((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory())
.destroySingleton(errorBridgeHandlerName);
}
((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory())
.destroySingleton(errorChannelName);
}
}
@Test
public void testMethodHavingNameFromPropertyFile() {
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("messageHandler", MessageHandlersWithProperty.class);
applicationContext.registerSingleton("rqueueMessageHandler", RqueueMessageHandler.class);
Map<String, Object> map = new HashMap<>();
map.put("slow.queue.name", slowQueue);
map.put("smart.queue.name", smartQueue);
applicationContext
.getEnvironment()
.getPropertySources()
.addLast(new MapPropertySource("test", map));
applicationContext.registerSingleton("ppc", PropertySourcesPlaceholderConfigurer.class);
applicationContext.refresh();
MessageHandler messageHandler = applicationContext.getBean(MessageHandler.class);
MessageHandlersWithProperty messageListener =
applicationContext.getBean(MessageHandlersWithProperty.class);
messageHandler.handleMessage(buildMessage(slowQueue, message));
assertEquals(message, messageListener.getLastReceivedMessage());
messageListener.setLastReceivedMessage(null);
messageHandler.handleMessage(buildMessage(smartQueue, message + message));
assertEquals(message + message, messageListener.getLastReceivedMessage());
}
@Test
public void customChannels() {
loadBeanDefinitions("websocket-config-broker-customchannels.xml");
List<Class<? extends MessageHandler>> subscriberTypes =
Arrays.<Class<? extends MessageHandler>>asList(SimpAnnotationMethodMessageHandler.class,
UserDestinationMessageHandler.class, SimpleBrokerMessageHandler.class);
testChannel("clientInboundChannel", subscriberTypes, 3);
testExecutor("clientInboundChannel", 100, 200, 600);
subscriberTypes = Collections.singletonList(SubProtocolWebSocketHandler.class);
testChannel("clientOutboundChannel", subscriberTypes, 3);
testExecutor("clientOutboundChannel", 101, 201, 601);
subscriberTypes = Arrays.<Class<? extends MessageHandler>>asList(SimpleBrokerMessageHandler.class,
UserDestinationMessageHandler.class);
testChannel("brokerChannel", subscriberTypes, 1);
testExecutor("brokerChannel", 102, 202, 602);
}
@Bean
@ServiceActivator(inputChannel = Sink.INPUT)
public MessageHandler redisSinkMessageHandler() {
if (this.redisSinkProperties.isKey()) {
RedisStoreWritingMessageHandler redisStoreWritingMessageHandler = new RedisStoreWritingMessageHandler(
this.redisConnectionFactory);
redisStoreWritingMessageHandler.setKeyExpression(this.redisSinkProperties.keyExpression());
return redisStoreWritingMessageHandler;
}
else if (this.redisSinkProperties.isQueue()) {
return new RedisQueueOutboundChannelAdapter(this.redisSinkProperties.queueExpression(),
this.redisConnectionFactory);
}
else { // must be topic
RedisPublishingMessageHandler redisPublishingMessageHandler = new RedisPublishingMessageHandler(
this.redisConnectionFactory);
redisPublishingMessageHandler.setTopicExpression(this.redisSinkProperties.topicExpression());
return redisPublishingMessageHandler;
}
}
@Bean
@ServiceActivator(inputChannel = Sink.INPUT)
public MessageHandler amazonS3MessageHandler(AmazonS3 amazonS3, ResourceIdResolver resourceIdResolver,
AmazonS3SinkProperties s3SinkProperties) {
S3MessageHandler s3MessageHandler;
if (s3SinkProperties.getBucket() != null) {
s3MessageHandler = new S3MessageHandler(amazonS3, s3SinkProperties.getBucket());
}
else {
s3MessageHandler = new S3MessageHandler(amazonS3, s3SinkProperties.getBucketExpression());
}
s3MessageHandler.setResourceIdResolver(resourceIdResolver);
s3MessageHandler.setKeyExpression(s3SinkProperties.getKeyExpression());
if (s3SinkProperties.getAcl() != null) {
s3MessageHandler.setObjectAclExpression(new ValueExpression<>(s3SinkProperties.getAcl()));
}
else {
s3MessageHandler.setObjectAclExpression(s3SinkProperties.getAclExpression());
}
s3MessageHandler.setUploadMetadataProvider(this.uploadMetadataProvider);
s3MessageHandler.setProgressListener(this.s3ProgressListener);
return s3MessageHandler;
}
@StreamListener
public void receive(@Input(Processor.INPUT) SubscribableChannel input,
@Output(Processor.OUTPUT) final MessageChannel output1,
@Output(StreamListenerTestUtils.FooOutboundChannel1.OUTPUT) final MessageChannel output2) {
input.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
if (message.getHeaders().get("output").equals("output1")) {
output1.send(org.springframework.messaging.support.MessageBuilder
.withPayload(
message.getPayload().toString().toUpperCase())
.build());
}
else if (message.getHeaders().get("output").equals("output2")) {
output2.send(org.springframework.messaging.support.MessageBuilder
.withPayload(
message.getPayload().toString().toLowerCase())
.build());
}
}
});
}
@SuppressWarnings("unchecked")
@Test
public void testAutoStartupOn() {
TestChannelBinder binder = createBinder();
binder.setMessageSourceDelegate(new LifecycleMessageSource(
() -> new GenericMessage<>("{\"foo\":\"bar\"}".getBytes())));
MessageConverterConfigurer configurer = this.context
.getBean(MessageConverterConfigurer.class);
DefaultPollableMessageSource pollableSource = new DefaultPollableMessageSource(
this.messageConverter);
configurer.configurePolledMessageSource(pollableSource, "foo");
ExtendedConsumerProperties<Object> properties = new ExtendedConsumerProperties<>(
null);
properties.setAutoStartup(true);
Binding<PollableSource<MessageHandler>> pollableSourceBinding = binder
.bindPollableConsumer("foo", "bar", pollableSource, properties);
assertThat(pollableSourceBinding.isRunning()).isTrue();
}
@Bean
@Nullable
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
StompBrokerRelayMessageHandler handler = getBrokerRegistry().getStompBrokerRelay(brokerChannel());
if (handler == null) {
return null;
}
Map<String, MessageHandler> subscriptions = new HashMap<>(4);
String destination = getBrokerRegistry().getUserDestinationBroadcast();
if (destination != null) {
subscriptions.put(destination, userDestinationMessageHandler());
}
destination = getBrokerRegistry().getUserRegistryBroadcast();
if (destination != null) {
subscriptions.put(destination, userRegistryMessageHandler());
}
handler.setSystemSubscriptions(subscriptions);
updateUserDestinationResolver(handler);
return handler;
}
@Override
public boolean subscribe(MessageHandler handler) {
this.subscribers.incrementAndGet();
if (handler instanceof LastSubscriberMessageHandler
&& this.finalHandler != null) {
throw new IllegalStateException(
"Only one LastSubscriberMessageHandler is allowed");
}
if (this.finalHandler != null) {
super.unsubscribe(this.finalHandler);
}
boolean result = super.subscribe(handler);
if (this.finalHandler != null) {
super.subscribe(this.finalHandler);
}
if (handler instanceof LastSubscriberMessageHandler
&& this.finalHandler == null) {
this.finalHandler = (LastSubscriberMessageHandler) handler;
}
return result;
}
@Test
public void testAfterMessageHandled() {
Span span = mock(Span.class);
Scope scope = mock(Scope.class);
MessageHandler messageHandler = mock(WebSocketAnnotationMethodMessageHandler.class);
MessageBuilder<String> messageBuilder = MessageBuilder.withPayload("Hi")
.setHeader(TracingChannelInterceptor.SIMP_MESSAGE_TYPE, SimpMessageType.MESSAGE)
.setHeader(TracingChannelInterceptor.SIMP_DESTINATION, TEST_DESTINATION)
.setHeader(TracingChannelInterceptor.OPENTRACING_SCOPE, scope)
.setHeader(TracingChannelInterceptor.OPENTRACING_SPAN, span);
TracingChannelInterceptor interceptor = new TracingChannelInterceptor(mockTracer,
Tags.SPAN_KIND_CLIENT);
interceptor.afterMessageHandled(messageBuilder.build(), null, messageHandler, null);
// Verify span is finished and scope is closed
verify(span).finish();
verify(scope).close();
}
@Bean
@ServiceActivator(inputChannel = "pubSubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
PubSubMessageHandler adapter =
new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
adapter.setPublishCallback(new ListenableFutureCallback<String>() {
@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
}
@Override
public void onSuccess(String result) {
LOGGER.info("Message was sent successfully.");
}
});
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "toSink")
public MessageHandler cassandraSinkMessageHandler() {
CassandraMessageHandler<?> cassandraMessageHandler =
this.cassandraSinkProperties.getQueryType() != null
? new CassandraMessageHandler<>(this.template, this.cassandraSinkProperties.getQueryType())
: new CassandraMessageHandler<>(this.template);
cassandraMessageHandler.setProducesReply(false);
if (this.cassandraSinkProperties.getConsistencyLevel() != null
|| this.cassandraSinkProperties.getRetryPolicy() != null
|| this.cassandraSinkProperties.getTtl() > 0) {
cassandraMessageHandler.setWriteOptions(
new WriteOptions(this.cassandraSinkProperties.getConsistencyLevel(),
this.cassandraSinkProperties.getRetryPolicy(), this.cassandraSinkProperties.getTtl()));
}
if (StringUtils.hasText(this.cassandraSinkProperties.getIngestQuery())) {
cassandraMessageHandler.setIngestQuery(this.cassandraSinkProperties.getIngestQuery());
}
else if (this.cassandraSinkProperties.getStatementExpression() != null) {
cassandraMessageHandler.setStatementExpression(this.cassandraSinkProperties.getStatementExpression());
}
return cassandraMessageHandler;
}
private MessageHandler createLateReplier(final CountDownLatch latch, final AtomicReference<Throwable> failure) {
MessageHandler handler = message -> {
try {
Thread.sleep(500);
MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel();
replyChannel.send(new GenericMessage<>("response"));
failure.set(new IllegalStateException("Expected exception"));
}
catch (InterruptedException e) {
failure.set(e);
}
catch (MessageDeliveryException ex) {
String expected = "Reply message received but the receiving thread has exited due to a timeout";
String actual = ex.getMessage();
if (!expected.equals(actual)) {
failure.set(new IllegalStateException(
"Unexpected error: '" + actual + "'"));
}
}
finally {
latch.countDown();
}
};
return handler;
}
@Test
public void testPolledConsumer() throws Exception {
RabbitTestBinder binder = getBinder();
PollableSource<MessageHandler> inboundBindTarget = new DefaultPollableMessageSource(
this.messageConverter);
Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer(
"pollable", "group", inboundBindTarget, createConsumerProperties());
RabbitTemplate template = new RabbitTemplate(
this.rabbitAvailableRule.getResource());
template.convertAndSend("pollable.group", "testPollable");
boolean polled = inboundBindTarget.poll(m -> {
assertThat(m.getPayload()).isEqualTo("testPollable");
});
int n = 0;
while (n++ < 100 && !polled) {
polled = inboundBindTarget.poll(m -> {
assertThat(m.getPayload()).isEqualTo("testPollable");
});
}
assertThat(polled).isTrue();
binding.unbind();
}
@Test
public void systemSubscription() throws Exception {
MessageHandler handler = mock(MessageHandler.class);
this.brokerRelay.setSystemSubscriptions(Collections.singletonMap("/topic/foo", handler));
this.brokerRelay.start();
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECTED);
accessor.setLeaveMutable(true);
MessageHeaders headers = accessor.getMessageHeaders();
this.tcpClient.handleMessage(MessageBuilder.createMessage(new byte[0], headers));
assertEquals(2, this.tcpClient.getSentMessages().size());
assertEquals(StompCommand.CONNECT, this.tcpClient.getSentHeaders(0).getCommand());
assertEquals(StompCommand.SUBSCRIBE, this.tcpClient.getSentHeaders(1).getCommand());
assertEquals("/topic/foo", this.tcpClient.getSentHeaders(1).getDestination());
Message<byte[]> message = message(StompCommand.MESSAGE, null, null, "/topic/foo");
this.tcpClient.handleMessage(message);
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(handler).handleMessage(captor.capture());
assertSame(message, captor.getValue());
}
@Test
public void testMethodWithMessagePayloadParameterIsInvoked() {
StaticApplicationContext applicationContext = new StaticApplicationContext();
applicationContext.registerSingleton("incomingMessageHandler", IncomingMessageHandler.class);
applicationContext.registerSingleton("rqueueMessageHandler", RqueueMessageHandler.class);
applicationContext.refresh();
MessageHandler messageHandler = applicationContext.getBean(MessageHandler.class);
messageHandler.handleMessage(buildMessage(messagePayloadQueue, payloadConvertedMessage));
IncomingMessageHandler messageListener =
applicationContext.getBean(IncomingMessageHandler.class);
assertEquals(messagePayload, messageListener.getLastReceivedMessage());
}
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ExtendedProducerProperties<PubSubProducerProperties> producerProperties,
MessageChannel errorChannel) {
PubSubMessageHandler messageHandler = new PubSubMessageHandler(this.pubSubTemplate, destination.getName());
messageHandler.setBeanFactory(getBeanFactory());
return messageHandler;
}
private void doHandleMessage(MessageHandler handler, Message<?> message) {
try {
handler.handleMessage(message);
}
catch (Throwable t) { // NOSONAR
throw new MessageHandlingException(message, t);
}
}
@Bean
@Nullable
public MessageHandler userRegistryMessageHandler() {
if (getBrokerRegistry().getUserRegistryBroadcast() == null) {
return null;
}
SimpUserRegistry userRegistry = userRegistry();
Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required");
return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry,
brokerMessagingTemplate(), getBrokerRegistry().getUserRegistryBroadcast(),
messageBrokerTaskScheduler());
}
@Bean
@ServiceActivator(inputChannel = "producingChannel")
public MessageHandler jmsMessageHandler(JmsTemplate jmsTemplate) {
JmsSendingMessageHandler handler =
new JmsSendingMessageHandler(jmsTemplate);
handler.setDestinationName(integrationDestination);
return handler;
}
@Override
public boolean unsubscribe(MessageHandler handler) {
boolean result = this.handlers.remove(handler);
if (result) {
if (logger.isDebugEnabled()) {
logger.debug(getBeanName() + " removed " + handler);
}
}
return result;
}
@Override
public boolean sendInternal(Message<?> message, long timeout) {
for (MessageHandler handler : getSubscribers()) {
SendTask sendTask = new SendTask(message, handler);
if (this.executor == null) {
sendTask.run();
}
else {
this.executor.execute(sendTask);
}
}
return true;
}
private void destroyErrorInfrastructure(ConsumerDestination destination, String group,
C properties) {
try {
String recoverer = getErrorRecovererName(destination, group, properties);
destroyBean(recoverer);
String errorChannelName = errorsBaseName(destination, group, properties);
String errorMessageHandlerName = getErrorMessageHandlerName(destination,
group, properties);
String errorBridgeHandlerName = getErrorBridgeName(destination, group,
properties);
MessageHandler bridgeHandler = null;
if (getApplicationContext().containsBean(errorBridgeHandlerName)) {
bridgeHandler = getApplicationContext().getBean(errorBridgeHandlerName,
MessageHandler.class);
}
MessageHandler handler = null;
if (getApplicationContext().containsBean(errorMessageHandlerName)) {
handler = getApplicationContext().getBean(errorMessageHandlerName,
MessageHandler.class);
}
if (getApplicationContext().containsBean(errorChannelName)) {
SubscribableChannel channel = getApplicationContext()
.getBean(errorChannelName, SubscribableChannel.class);
if (bridgeHandler != null) {
channel.unsubscribe(bridgeHandler);
destroyBean(errorBridgeHandlerName);
}
if (handler != null) {
channel.unsubscribe(handler);
destroyBean(errorMessageHandlerName);
}
destroyBean(errorChannelName);
}
}
catch (IllegalStateException e) {
// context is shutting down.
}
}
@Bean
@ServiceActivator(inputChannel = "siGcsTestChannel")
public MessageHandler outboundAdapter(Storage gcs) {
GcsMessageHandler adapter = new GcsMessageHandler(new GcsSessionFactory(gcs));
adapter.setRemoteDirectoryExpression(new ValueExpression<>("testGcsBucket"));
return adapter;
}
@Override
public Message<?> beforeHandle(Message<?> message, MessageChannel channel,
MessageHandler handler) {
if ((handler instanceof WebSocketAnnotationMethodMessageHandler ||
handler instanceof SubProtocolWebSocketHandler) &&
SimpMessageType.MESSAGE.equals(message.getHeaders().get(SIMP_MESSAGE_TYPE))) {
Span span = message.getHeaders().get(OPENTRACING_SPAN, Span.class);
Scope scope = tracer.scopeManager().activate(span);
message = MessageBuilder.fromMessage(message)
.setHeader(OPENTRACING_SCOPE, scope)
.build();
}
return message;
}
@Test
public void brokerChannelWithBrokerRelay() {
ApplicationContext context = loadConfig(BrokerRelayConfig.class);
TestChannel channel = context.getBean("brokerChannel", TestChannel.class);
Set<MessageHandler> handlers = channel.getSubscribers();
assertEquals(2, handlers.size());
assertTrue(handlers.contains(context.getBean(UserDestinationMessageHandler.class)));
assertTrue(handlers.contains(context.getBean(StompBrokerRelayMessageHandler.class)));
}
@Test
public void failurePropagates() {
RuntimeException ex = new RuntimeException();
willThrow(ex).given(this.handler).handleMessage(this.message);
MessageHandler secondHandler = mock(MessageHandler.class);
this.channel.subscribe(this.handler);
this.channel.subscribe(secondHandler);
try {
this.channel.send(message);
}
catch (MessageDeliveryException actualException) {
assertThat(actualException.getCause(), equalTo(ex));
}
verifyZeroInteractions(secondHandler);
}
@Override
public Message<?> beforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler) {
super.beforeHandle(message, channel, handler);
if (this.exceptionToRaise != null) {
throw this.exceptionToRaise;
}
return (this.messageToReturn != null ? this.messageToReturn : message);
}