类org.springframework.messaging.MessageHandler源码实例Demo

下面列出了怎么用org.springframework.messaging.MessageHandler的API类实例代码及写法,或者点击链接到github查看源代码。

@Test
void receiveMessage_methodAnnotatedWithSqsListenerAnnotation_methodInvokedForIncomingMessage() {
	StaticApplicationContext applicationContext = new StaticApplicationContext();
	applicationContext.registerSingleton("incomingMessageHandler",
			IncomingMessageHandler.class);
	applicationContext.registerSingleton("queueMessageHandler",
			QueueMessageHandler.class);
	applicationContext.refresh();

	MessageHandler messageHandler = applicationContext.getBean(MessageHandler.class);
	messageHandler.handleMessage(MessageBuilder.withPayload("testContent")
			.setHeader(QueueMessageHandler.LOGICAL_RESOURCE_ID, "receive").build());

	IncomingMessageHandler messageListener = applicationContext
			.getBean(IncomingMessageHandler.class);
	assertThat(messageListener.getLastReceivedMessage()).isEqualTo("testContent");
}
 
@Test
void receiveMessage_methodWithMessageAsParameter_parameterIsConverted() {
	new ApplicationContextRunner()
			.withConfiguration(UserConfigurations
					.of(QueueMessageHandlerWithJacksonMappingConfiguration.class))
			.withBean(IncomingMessageHandlerWithMessageParameter.class)
			.run((context) -> {
				DummyKeyValueHolder messagePayload = new DummyKeyValueHolder("myKey",
						"A value");
				MappingJackson2MessageConverter jsonMapper = context
						.getBean(MappingJackson2MessageConverter.class);
				Message<?> message = jsonMapper.toMessage(messagePayload,
						new MessageHeaders(Collections.singletonMap(
								QueueMessageHandler.LOGICAL_RESOURCE_ID,
								"testQueue")));

				MessageHandler messageHandler = context.getBean(MessageHandler.class);
				messageHandler.handleMessage(message);

				IncomingMessageHandlerWithMessageParameter messageListener = context
						.getBean(IncomingMessageHandlerWithMessageParameter.class);
				assertThat(messageListener.getLastReceivedMessage()).isNotNull();
				assertThat(messageListener.getLastReceivedMessage().getPayload())
						.isEqualTo(messagePayload);
			});
}
 
private void destroyErrorInfrastructure(ProducerDestination destination) {
	String errorChannelName = errorsBaseName(destination);
	String errorBridgeHandlerName = getErrorBridgeName(destination);
	MessageHandler bridgeHandler = null;
	if (getApplicationContext().containsBean(errorBridgeHandlerName)) {
		bridgeHandler = getApplicationContext().getBean(errorBridgeHandlerName,
				MessageHandler.class);
	}
	if (getApplicationContext().containsBean(errorChannelName)) {
		SubscribableChannel channel = getApplicationContext()
				.getBean(errorChannelName, SubscribableChannel.class);
		if (bridgeHandler != null) {
			channel.unsubscribe(bridgeHandler);
			((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory())
					.destroySingleton(errorBridgeHandlerName);
		}
		((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory())
				.destroySingleton(errorChannelName);
	}
}
 
源代码4 项目: rqueue   文件: RqueueMessageHandlerTest.java
@Test
public void testMethodHavingNameFromPropertyFile() {
  StaticApplicationContext applicationContext = new StaticApplicationContext();
  applicationContext.registerSingleton("messageHandler", MessageHandlersWithProperty.class);
  applicationContext.registerSingleton("rqueueMessageHandler", RqueueMessageHandler.class);
  Map<String, Object> map = new HashMap<>();
  map.put("slow.queue.name", slowQueue);
  map.put("smart.queue.name", smartQueue);
  applicationContext
      .getEnvironment()
      .getPropertySources()
      .addLast(new MapPropertySource("test", map));

  applicationContext.registerSingleton("ppc", PropertySourcesPlaceholderConfigurer.class);
  applicationContext.refresh();
  MessageHandler messageHandler = applicationContext.getBean(MessageHandler.class);
  MessageHandlersWithProperty messageListener =
      applicationContext.getBean(MessageHandlersWithProperty.class);
  messageHandler.handleMessage(buildMessage(slowQueue, message));
  assertEquals(message, messageListener.getLastReceivedMessage());
  messageListener.setLastReceivedMessage(null);
  messageHandler.handleMessage(buildMessage(smartQueue, message + message));
  assertEquals(message + message, messageListener.getLastReceivedMessage());
}
 
@Test
public void customChannels() {
	loadBeanDefinitions("websocket-config-broker-customchannels.xml");

	List<Class<? extends MessageHandler>> subscriberTypes =
			Arrays.<Class<? extends MessageHandler>>asList(SimpAnnotationMethodMessageHandler.class,
					UserDestinationMessageHandler.class, SimpleBrokerMessageHandler.class);

	testChannel("clientInboundChannel", subscriberTypes, 3);
	testExecutor("clientInboundChannel", 100, 200, 600);

	subscriberTypes = Collections.singletonList(SubProtocolWebSocketHandler.class);

	testChannel("clientOutboundChannel", subscriberTypes, 3);
	testExecutor("clientOutboundChannel", 101, 201, 601);

	subscriberTypes = Arrays.<Class<? extends MessageHandler>>asList(SimpleBrokerMessageHandler.class,
			UserDestinationMessageHandler.class);

	testChannel("brokerChannel", subscriberTypes, 1);
	testExecutor("brokerChannel", 102, 202, 602);
}
 
@Bean
@ServiceActivator(inputChannel = Sink.INPUT)
public MessageHandler redisSinkMessageHandler() {
	if (this.redisSinkProperties.isKey()) {
		RedisStoreWritingMessageHandler redisStoreWritingMessageHandler = new RedisStoreWritingMessageHandler(
				this.redisConnectionFactory);
		redisStoreWritingMessageHandler.setKeyExpression(this.redisSinkProperties.keyExpression());
		return redisStoreWritingMessageHandler;
	}
	else if (this.redisSinkProperties.isQueue()) {
		return new RedisQueueOutboundChannelAdapter(this.redisSinkProperties.queueExpression(),
				this.redisConnectionFactory);
	}
	else { // must be topic
		RedisPublishingMessageHandler redisPublishingMessageHandler = new RedisPublishingMessageHandler(
				this.redisConnectionFactory);
		redisPublishingMessageHandler.setTopicExpression(this.redisSinkProperties.topicExpression());
		return redisPublishingMessageHandler;
	}
}
 
@Bean
@ServiceActivator(inputChannel = Sink.INPUT)
public MessageHandler amazonS3MessageHandler(AmazonS3 amazonS3, ResourceIdResolver resourceIdResolver,
		AmazonS3SinkProperties s3SinkProperties) {
	S3MessageHandler s3MessageHandler;
	if (s3SinkProperties.getBucket() != null) {
		s3MessageHandler = new S3MessageHandler(amazonS3, s3SinkProperties.getBucket());
	}
	else {
		s3MessageHandler = new S3MessageHandler(amazonS3, s3SinkProperties.getBucketExpression());
	}
	s3MessageHandler.setResourceIdResolver(resourceIdResolver);
	s3MessageHandler.setKeyExpression(s3SinkProperties.getKeyExpression());
	if (s3SinkProperties.getAcl() != null) {
		s3MessageHandler.setObjectAclExpression(new ValueExpression<>(s3SinkProperties.getAcl()));
	}
	else {
		s3MessageHandler.setObjectAclExpression(s3SinkProperties.getAclExpression());
	}
	s3MessageHandler.setUploadMetadataProvider(this.uploadMetadataProvider);
	s3MessageHandler.setProgressListener(this.s3ProgressListener);
	return s3MessageHandler;
}
 
@StreamListener
public void receive(@Input(Processor.INPUT) SubscribableChannel input,
		@Output(Processor.OUTPUT) final MessageChannel output1,
		@Output(StreamListenerTestUtils.FooOutboundChannel1.OUTPUT) final MessageChannel output2) {
	input.subscribe(new MessageHandler() {
		@Override
		public void handleMessage(Message<?> message) throws MessagingException {
			if (message.getHeaders().get("output").equals("output1")) {
				output1.send(org.springframework.messaging.support.MessageBuilder
						.withPayload(
								message.getPayload().toString().toUpperCase())
						.build());
			}
			else if (message.getHeaders().get("output").equals("output2")) {
				output2.send(org.springframework.messaging.support.MessageBuilder
						.withPayload(
								message.getPayload().toString().toLowerCase())
						.build());
			}
		}
	});
}
 
@SuppressWarnings("unchecked")
@Test
public void testAutoStartupOn() {
	TestChannelBinder binder = createBinder();
	binder.setMessageSourceDelegate(new LifecycleMessageSource(
			() -> new GenericMessage<>("{\"foo\":\"bar\"}".getBytes())));
	MessageConverterConfigurer configurer = this.context
			.getBean(MessageConverterConfigurer.class);

	DefaultPollableMessageSource pollableSource = new DefaultPollableMessageSource(
			this.messageConverter);
	configurer.configurePolledMessageSource(pollableSource, "foo");
	ExtendedConsumerProperties<Object> properties = new ExtendedConsumerProperties<>(
			null);
	properties.setAutoStartup(true);

	Binding<PollableSource<MessageHandler>> pollableSourceBinding = binder
			.bindPollableConsumer("foo", "bar", pollableSource, properties);

	assertThat(pollableSourceBinding.isRunning()).isTrue();
}
 
@Bean
@Nullable
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
	StompBrokerRelayMessageHandler handler = getBrokerRegistry().getStompBrokerRelay(brokerChannel());
	if (handler == null) {
		return null;
	}
	Map<String, MessageHandler> subscriptions = new HashMap<>(4);
	String destination = getBrokerRegistry().getUserDestinationBroadcast();
	if (destination != null) {
		subscriptions.put(destination, userDestinationMessageHandler());
	}
	destination = getBrokerRegistry().getUserRegistryBroadcast();
	if (destination != null) {
		subscriptions.put(destination, userRegistryMessageHandler());
	}
	handler.setSystemSubscriptions(subscriptions);
	updateUserDestinationResolver(handler);
	return handler;
}
 
源代码11 项目: spring-cloud-stream   文件: BinderErrorChannel.java
@Override
public boolean subscribe(MessageHandler handler) {
	this.subscribers.incrementAndGet();
	if (handler instanceof LastSubscriberMessageHandler
			&& this.finalHandler != null) {
		throw new IllegalStateException(
				"Only one LastSubscriberMessageHandler is allowed");
	}
	if (this.finalHandler != null) {
		super.unsubscribe(this.finalHandler);
	}
	boolean result = super.subscribe(handler);
	if (this.finalHandler != null) {
		super.subscribe(this.finalHandler);
	}
	if (handler instanceof LastSubscriberMessageHandler
			&& this.finalHandler == null) {
		this.finalHandler = (LastSubscriberMessageHandler) handler;
	}
	return result;
}
 
@Test
public void testAfterMessageHandled() {
  Span span = mock(Span.class);
  Scope scope = mock(Scope.class);
  MessageHandler messageHandler = mock(WebSocketAnnotationMethodMessageHandler.class);
  MessageBuilder<String> messageBuilder = MessageBuilder.withPayload("Hi")
      .setHeader(TracingChannelInterceptor.SIMP_MESSAGE_TYPE, SimpMessageType.MESSAGE)
      .setHeader(TracingChannelInterceptor.SIMP_DESTINATION, TEST_DESTINATION)
      .setHeader(TracingChannelInterceptor.OPENTRACING_SCOPE, scope)
      .setHeader(TracingChannelInterceptor.OPENTRACING_SPAN, span);

  TracingChannelInterceptor interceptor = new TracingChannelInterceptor(mockTracer,
      Tags.SPAN_KIND_CLIENT);
  interceptor.afterMessageHandled(messageBuilder.build(), null, messageHandler, null);

  // Verify span is finished and scope is closed
  verify(span).finish();
  verify(scope).close();
}
 
源代码13 项目: spring-cloud-gcp   文件: SenderApplication.java
@Bean
@ServiceActivator(inputChannel = "pubSubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
	PubSubMessageHandler adapter =
			new PubSubMessageHandler(pubsubTemplate, "exampleTopic");
	adapter.setPublishCallback(new ListenableFutureCallback<String>() {
		@Override
		public void onFailure(Throwable ex) {
			LOGGER.info("There was an error sending the message.");
		}

		@Override
		public void onSuccess(String result) {
			LOGGER.info("Message was sent successfully.");
		}
	});

	return adapter;
}
 
@Bean
@ServiceActivator(inputChannel = "toSink")
public MessageHandler cassandraSinkMessageHandler() {
	CassandraMessageHandler<?> cassandraMessageHandler =
			this.cassandraSinkProperties.getQueryType() != null
					? new CassandraMessageHandler<>(this.template, this.cassandraSinkProperties.getQueryType())
					: new CassandraMessageHandler<>(this.template);
	cassandraMessageHandler.setProducesReply(false);
	if (this.cassandraSinkProperties.getConsistencyLevel() != null
			|| this.cassandraSinkProperties.getRetryPolicy() != null
			|| this.cassandraSinkProperties.getTtl() > 0) {
		cassandraMessageHandler.setWriteOptions(
				new WriteOptions(this.cassandraSinkProperties.getConsistencyLevel(),
						this.cassandraSinkProperties.getRetryPolicy(), this.cassandraSinkProperties.getTtl()));
	}
	if (StringUtils.hasText(this.cassandraSinkProperties.getIngestQuery())) {
		cassandraMessageHandler.setIngestQuery(this.cassandraSinkProperties.getIngestQuery());
	}
	else if (this.cassandraSinkProperties.getStatementExpression() != null) {
		cassandraMessageHandler.setStatementExpression(this.cassandraSinkProperties.getStatementExpression());
	}
	return cassandraMessageHandler;
}
 
private MessageHandler createLateReplier(final CountDownLatch latch, final AtomicReference<Throwable> failure) {
	MessageHandler handler = message -> {
		try {
			Thread.sleep(500);
			MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel();
			replyChannel.send(new GenericMessage<>("response"));
			failure.set(new IllegalStateException("Expected exception"));
		}
		catch (InterruptedException e) {
			failure.set(e);
		}
		catch (MessageDeliveryException ex) {
			String expected = "Reply message received but the receiving thread has exited due to a timeout";
			String actual = ex.getMessage();
			if (!expected.equals(actual)) {
				failure.set(new IllegalStateException(
						"Unexpected error: '" + actual + "'"));
			}
		}
		finally {
			latch.countDown();
		}
	};
	return handler;
}
 
@Test
public void testPolledConsumer() throws Exception {
	RabbitTestBinder binder = getBinder();
	PollableSource<MessageHandler> inboundBindTarget = new DefaultPollableMessageSource(
			this.messageConverter);
	Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer(
			"pollable", "group", inboundBindTarget, createConsumerProperties());
	RabbitTemplate template = new RabbitTemplate(
			this.rabbitAvailableRule.getResource());
	template.convertAndSend("pollable.group", "testPollable");
	boolean polled = inboundBindTarget.poll(m -> {
		assertThat(m.getPayload()).isEqualTo("testPollable");
	});
	int n = 0;
	while (n++ < 100 && !polled) {
		polled = inboundBindTarget.poll(m -> {
			assertThat(m.getPayload()).isEqualTo("testPollable");
		});
	}
	assertThat(polled).isTrue();
	binding.unbind();
}
 
@Test
public void systemSubscription() throws Exception {

	MessageHandler handler = mock(MessageHandler.class);
	this.brokerRelay.setSystemSubscriptions(Collections.singletonMap("/topic/foo", handler));
	this.brokerRelay.start();

	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECTED);
	accessor.setLeaveMutable(true);
	MessageHeaders headers = accessor.getMessageHeaders();
	this.tcpClient.handleMessage(MessageBuilder.createMessage(new byte[0], headers));

	assertEquals(2, this.tcpClient.getSentMessages().size());
	assertEquals(StompCommand.CONNECT, this.tcpClient.getSentHeaders(0).getCommand());
	assertEquals(StompCommand.SUBSCRIBE, this.tcpClient.getSentHeaders(1).getCommand());
	assertEquals("/topic/foo", this.tcpClient.getSentHeaders(1).getDestination());

	Message<byte[]> message = message(StompCommand.MESSAGE, null, null, "/topic/foo");
	this.tcpClient.handleMessage(message);

	ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
	verify(handler).handleMessage(captor.capture());
	assertSame(message, captor.getValue());
}
 
源代码18 项目: rqueue   文件: RqueueMessageHandlerTest.java
@Test
public void testMethodWithMessagePayloadParameterIsInvoked() {
  StaticApplicationContext applicationContext = new StaticApplicationContext();
  applicationContext.registerSingleton("incomingMessageHandler", IncomingMessageHandler.class);
  applicationContext.registerSingleton("rqueueMessageHandler", RqueueMessageHandler.class);
  applicationContext.refresh();
  MessageHandler messageHandler = applicationContext.getBean(MessageHandler.class);
  messageHandler.handleMessage(buildMessage(messagePayloadQueue, payloadConvertedMessage));
  IncomingMessageHandler messageListener =
      applicationContext.getBean(IncomingMessageHandler.class);
  assertEquals(messagePayload, messageListener.getLastReceivedMessage());
}
 
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
		ExtendedProducerProperties<PubSubProducerProperties> producerProperties,
		MessageChannel errorChannel) {

	PubSubMessageHandler messageHandler = new PubSubMessageHandler(this.pubSubTemplate, destination.getName());
	messageHandler.setBeanFactory(getBeanFactory());
	return messageHandler;
}
 
private void doHandleMessage(MessageHandler handler, Message<?> message) {
	try {
		handler.handleMessage(message);
	}
	catch (Throwable t) { // NOSONAR
		throw new MessageHandlingException(message, t);
	}
}
 
@Bean
@Nullable
public MessageHandler userRegistryMessageHandler() {
	if (getBrokerRegistry().getUserRegistryBroadcast() == null) {
		return null;
	}
	SimpUserRegistry userRegistry = userRegistry();
	Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required");
	return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry,
			brokerMessagingTemplate(), getBrokerRegistry().getUserRegistryBroadcast(),
			messageBrokerTaskScheduler());
}
 
源代码22 项目: spring-jms   文件: ProducingChannelConfig.java
@Bean
@ServiceActivator(inputChannel = "producingChannel")
public MessageHandler jmsMessageHandler(JmsTemplate jmsTemplate) {
  JmsSendingMessageHandler handler =
      new JmsSendingMessageHandler(jmsTemplate);
  handler.setDestinationName(integrationDestination);

  return handler;
}
 
@Override
public boolean unsubscribe(MessageHandler handler) {
	boolean result = this.handlers.remove(handler);
	if (result) {
		if (logger.isDebugEnabled()) {
			logger.debug(getBeanName() + " removed " + handler);
		}
	}
	return result;
}
 
@Override
public boolean sendInternal(Message<?> message, long timeout) {
	for (MessageHandler handler : getSubscribers()) {
		SendTask sendTask = new SendTask(message, handler);
		if (this.executor == null) {
			sendTask.run();
		}
		else {
			this.executor.execute(sendTask);
		}
	}
	return true;
}
 
private void destroyErrorInfrastructure(ConsumerDestination destination, String group,
		C properties) {
	try {
		String recoverer = getErrorRecovererName(destination, group, properties);

		destroyBean(recoverer);

		String errorChannelName = errorsBaseName(destination, group, properties);
		String errorMessageHandlerName = getErrorMessageHandlerName(destination,
				group, properties);
		String errorBridgeHandlerName = getErrorBridgeName(destination, group,
				properties);
		MessageHandler bridgeHandler = null;
		if (getApplicationContext().containsBean(errorBridgeHandlerName)) {
			bridgeHandler = getApplicationContext().getBean(errorBridgeHandlerName,
					MessageHandler.class);
		}
		MessageHandler handler = null;
		if (getApplicationContext().containsBean(errorMessageHandlerName)) {
			handler = getApplicationContext().getBean(errorMessageHandlerName,
					MessageHandler.class);
		}
		if (getApplicationContext().containsBean(errorChannelName)) {
			SubscribableChannel channel = getApplicationContext()
					.getBean(errorChannelName, SubscribableChannel.class);
			if (bridgeHandler != null) {
				channel.unsubscribe(bridgeHandler);
				destroyBean(errorBridgeHandlerName);
			}
			if (handler != null) {
				channel.unsubscribe(handler);
				destroyBean(errorMessageHandlerName);
			}
			destroyBean(errorChannelName);
		}
	}
	catch (IllegalStateException e) {
		// context is shutting down.
	}
}
 
源代码26 项目: spring-cloud-gcp   文件: GcsMessageHandlerTests.java
@Bean
@ServiceActivator(inputChannel = "siGcsTestChannel")
public MessageHandler outboundAdapter(Storage gcs) {
	GcsMessageHandler adapter = new GcsMessageHandler(new GcsSessionFactory(gcs));
	adapter.setRemoteDirectoryExpression(new ValueExpression<>("testGcsBucket"));

	return adapter;
}
 
@Override
public Message<?> beforeHandle(Message<?> message, MessageChannel channel,
    MessageHandler handler) {
  if ((handler instanceof WebSocketAnnotationMethodMessageHandler ||
      handler instanceof SubProtocolWebSocketHandler) &&
      SimpMessageType.MESSAGE.equals(message.getHeaders().get(SIMP_MESSAGE_TYPE))) {
    Span span = message.getHeaders().get(OPENTRACING_SPAN, Span.class);
    Scope scope = tracer.scopeManager().activate(span);
    message = MessageBuilder.fromMessage(message)
        .setHeader(OPENTRACING_SCOPE, scope)
        .build();
  }
  return message;
}
 
@Test
public void brokerChannelWithBrokerRelay() {
	ApplicationContext context = loadConfig(BrokerRelayConfig.class);

	TestChannel channel = context.getBean("brokerChannel", TestChannel.class);
	Set<MessageHandler> handlers = channel.getSubscribers();

	assertEquals(2, handlers.size());
	assertTrue(handlers.contains(context.getBean(UserDestinationMessageHandler.class)));
	assertTrue(handlers.contains(context.getBean(StompBrokerRelayMessageHandler.class)));
}
 
@Test
public void failurePropagates()  {
	RuntimeException ex = new RuntimeException();
	willThrow(ex).given(this.handler).handleMessage(this.message);
	MessageHandler secondHandler = mock(MessageHandler.class);
	this.channel.subscribe(this.handler);
	this.channel.subscribe(secondHandler);
	try {
		this.channel.send(message);
	}
	catch (MessageDeliveryException actualException) {
		assertThat(actualException.getCause(), equalTo(ex));
	}
	verifyZeroInteractions(secondHandler);
}
 
@Override
public Message<?> beforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler) {
	super.beforeHandle(message, channel, handler);
	if (this.exceptionToRaise != null) {
		throw this.exceptionToRaise;
	}
	return (this.messageToReturn != null ? this.messageToReturn : message);
}
 
 类方法
 同包方法