下面列出了怎么用org.springframework.messaging.SubscribableChannel的API类实例代码及写法,或者点击链接到github查看源代码。
@StreamListener
public void receive(@Input(Processor.INPUT) SubscribableChannel input,
@Output(Processor.OUTPUT) final MessageChannel output1,
@Output(StreamListenerTestUtils.FooOutboundChannel1.OUTPUT) final MessageChannel output2) {
input.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
if (message.getHeaders().get("output").equals("output1")) {
output1.send(org.springframework.messaging.support.MessageBuilder
.withPayload(
message.getPayload().toString().toUpperCase())
.build());
}
else if (message.getHeaders().get("output").equals("output2")) {
output2.send(org.springframework.messaging.support.MessageBuilder
.withPayload(
message.getPayload().toString().toLowerCase())
.build());
}
}
});
}
@Override
public void run(ApplicationArguments args) throws Exception {
/*
* if (args.containsOption("partitioned") &&
* Boolean.valueOf(args.getOptionValues("partitioned").get(0))) {
* binder.setPartitionSelector(stubPartitionSelectorStrategy()); }
*/
SubscribableChannel producerChannel = producerChannel();
ProducerProperties properties = new ProducerProperties();
properties.setPartitionKeyExpression(
new SpelExpressionParser().parseExpression("payload"));
this.binder.bindProducer(ConsulBinderTests.BINDING_NAME, producerChannel,
properties);
Message<String> message = new GenericMessage<>(ConsulBinderTests.MESSAGE_PAYLOAD);
logger.info("Writing message to binder {}", this.binder);
producerChannel.send(message);
}
private IntegrationFlow incomingMessageFlow(SubscribableChannel incoming,
String prefix) {
Log log = LogFactory.getLog(getClass());
return IntegrationFlows
.from(incoming)
.transform(String.class, String::toUpperCase)
.handle(
String.class,
(greeting, headers) -> {
log.info("greeting received in IntegrationFlow (" + prefix + "): "
+ greeting);
return null;
}).get();
}
/**
* Create an instance of SimpAnnotationMethodMessageHandler with the given
* message channels and broker messaging template.
* @param clientInboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
* @param clientOutboundChannel the channel for messages to clients (e.g. WebSocket clients)
* @param brokerTemplate a messaging template to send application messages to the broker
*/
public SimpAnnotationMethodMessageHandler(SubscribableChannel clientInboundChannel,
MessageChannel clientOutboundChannel, SimpMessageSendingOperations brokerTemplate) {
Assert.notNull(clientInboundChannel, "clientInboundChannel must not be null");
Assert.notNull(clientOutboundChannel, "clientOutboundChannel must not be null");
Assert.notNull(brokerTemplate, "brokerTemplate must not be null");
this.clientInboundChannel = clientInboundChannel;
this.clientMessagingTemplate = new SimpMessagingTemplate(clientOutboundChannel);
this.brokerTemplate = brokerTemplate;
Collection<MessageConverter> converters = new ArrayList<>();
converters.add(new StringMessageConverter());
converters.add(new ByteArrayMessageConverter());
this.messageConverter = new CompositeMessageConverter(converters);
}
@Override
protected SimpleBrokerMessageHandler getMessageHandler(SubscribableChannel brokerChannel) {
SimpleBrokerMessageHandler handler = new SimpleBrokerMessageHandler(getClientInboundChannel(),
getClientOutboundChannel(), brokerChannel, getDestinationPrefixes());
if (this.taskScheduler != null) {
handler.setTaskScheduler(this.taskScheduler);
}
if (this.heartbeat != null) {
handler.setHeartbeatValue(this.heartbeat);
}
return handler;
}
@Override
public SubscribableChannel createInput(String name) {
DirectWithAttributesChannel subscribableChannel = new DirectWithAttributesChannel();
subscribableChannel.setComponentName(name);
subscribableChannel.setAttribute("type", Sink.INPUT);
this.messageChannelConfigurer.configureInputChannel(subscribableChannel, name);
if (context != null && !context.containsBean(name)) {
context.registerBean(name, DirectWithAttributesChannel.class, () -> subscribableChannel);
}
return subscribableChannel;
}
@Test
public void sendAndReceive() {
SubscribableChannel channel = new ExecutorSubscribableChannel(this.executor);
channel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel();
replyChannel.send(new GenericMessage<>("response"));
}
});
String actual = this.template.convertSendAndReceive(channel, "request", String.class);
assertEquals("response", actual);
}
private void destroyErrorInfrastructure(ConsumerDestination destination, String group,
C properties) {
try {
String recoverer = getErrorRecovererName(destination, group, properties);
destroyBean(recoverer);
String errorChannelName = errorsBaseName(destination, group, properties);
String errorMessageHandlerName = getErrorMessageHandlerName(destination,
group, properties);
String errorBridgeHandlerName = getErrorBridgeName(destination, group,
properties);
MessageHandler bridgeHandler = null;
if (getApplicationContext().containsBean(errorBridgeHandlerName)) {
bridgeHandler = getApplicationContext().getBean(errorBridgeHandlerName,
MessageHandler.class);
}
MessageHandler handler = null;
if (getApplicationContext().containsBean(errorMessageHandlerName)) {
handler = getApplicationContext().getBean(errorMessageHandlerName,
MessageHandler.class);
}
if (getApplicationContext().containsBean(errorChannelName)) {
SubscribableChannel channel = getApplicationContext()
.getBean(errorChannelName, SubscribableChannel.class);
if (bridgeHandler != null) {
channel.unsubscribe(bridgeHandler);
destroyBean(errorBridgeHandlerName);
}
if (handler != null) {
channel.unsubscribe(handler);
destroyBean(errorMessageHandlerName);
}
destroyBean(errorChannelName);
}
}
catch (IllegalStateException e) {
// context is shutting down.
}
}
@Test
public void getSubProtocols() throws Exception {
SubscribableChannel channel = mock(SubscribableChannel.class);
SubProtocolWebSocketHandler handler = new SubProtocolWebSocketHandler(channel, channel);
StompSubProtocolHandler stompHandler = new StompSubProtocolHandler();
handler.addProtocolHandler(stompHandler);
TaskScheduler scheduler = mock(TaskScheduler.class);
DefaultSockJsService service = new DefaultSockJsService(scheduler);
WebSocketServerSockJsSession session = new WebSocketServerSockJsSession("1", service, handler, null);
SockJsWebSocketHandler sockJsHandler = new SockJsWebSocketHandler(service, handler, session);
assertEquals(stompHandler.getSupportedProtocols(), sockJsHandler.getSubProtocols());
}
@Before
public void setUp() throws Exception {
this.applicationContext = new StaticApplicationContext();
this.applicationContext.registerSingleton("controller", TestController.class);
this.applicationContext.registerSingleton("controllerAdvice", TestControllerAdvice.class);
this.applicationContext.refresh();
SubscribableChannel channel = Mockito.mock(SubscribableChannel.class);
SimpMessageSendingOperations brokerTemplate = new SimpMessagingTemplate(channel);
this.messageHandler = new TestWebSocketAnnotationMethodMessageHandler(brokerTemplate, channel, channel);
this.messageHandler.setApplicationContext(this.applicationContext);
this.messageHandler.afterPropertiesSet();
}
@Test
public void testRequeueFromErrorFlow() {
TestChannelBinder binder = createBinder();
MessageConverterConfigurer configurer = this.context
.getBean(MessageConverterConfigurer.class);
DefaultPollableMessageSource pollableSource = new DefaultPollableMessageSource(
this.messageConverter);
configurer.configurePolledMessageSource(pollableSource, "foo");
AcknowledgmentCallback callback = mock(AcknowledgmentCallback.class);
pollableSource.addInterceptor(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return MessageBuilder.fromMessage(message)
.setHeader(
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
callback)
.build();
}
});
ExtendedConsumerProperties<Object> properties = new ExtendedConsumerProperties<>(null);
properties.setMaxAttempts(1);
binder.bindPollableConsumer("foo", "bar", pollableSource, properties);
SubscribableChannel errorChannel = new DirectChannel();
errorChannel.subscribe(msg -> {
throw new RequeueCurrentMessageException((Throwable) msg.getPayload());
});
pollableSource.setErrorChannel(errorChannel);
try {
pollableSource.poll(received -> {
throw new RuntimeException("test requeue from error flow");
});
}
catch (Exception e) {
// no op
}
verify(callback).acknowledge(Status.REQUEUE);
}
@StreamListener
@Output(StreamListenerTestUtils.FooOutboundChannel1.OUTPUT)
public void receive(@Input(Processor.INPUT) SubscribableChannel input,
@Output(Processor.OUTPUT) final MessageChannel output1) {
input.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
output1.send(org.springframework.messaging.support.MessageBuilder
.withPayload(message.getPayload().toString().toUpperCase())
.build());
}
});
}
/**
* Create an instance with the given client and broker channels subscribing
* to handle messages from each and then sending any resolved messages to the
* broker channel.
* @param clientInboundChannel messages received from clients.
* @param brokerChannel messages sent to the broker.
* @param resolver the resolver for "user" destinations.
*/
public UserDestinationMessageHandler(SubscribableChannel clientInboundChannel,
SubscribableChannel brokerChannel, UserDestinationResolver resolver) {
Assert.notNull(clientInboundChannel, "'clientInChannel' must not be null");
Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
Assert.notNull(resolver, "resolver must not be null");
this.clientInboundChannel = clientInboundChannel;
this.brokerChannel = brokerChannel;
this.messagingTemplate = new SimpMessagingTemplate(brokerChannel);
this.destinationResolver = resolver;
}
/**
* Create a new {@code SubProtocolWebSocketHandler} for the given inbound and outbound channels.
* @param clientInboundChannel the inbound {@code MessageChannel}
* @param clientOutboundChannel the outbound {@code MessageChannel}
*/
public SubProtocolWebSocketHandler(MessageChannel clientInboundChannel, SubscribableChannel clientOutboundChannel) {
Assert.notNull(clientInboundChannel, "Inbound MessageChannel must not be null");
Assert.notNull(clientOutboundChannel, "Outbound MessageChannel must not be null");
this.clientInboundChannel = clientInboundChannel;
this.clientOutboundChannel = clientOutboundChannel;
}
@Override
protected SimpleBrokerMessageHandler getMessageHandler(SubscribableChannel brokerChannel) {
SimpleBrokerMessageHandler handler = new SimpleBrokerMessageHandler(getClientInboundChannel(),
getClientOutboundChannel(), brokerChannel, getDestinationPrefixes());
if (this.taskScheduler != null) {
handler.setTaskScheduler(this.taskScheduler);
}
if (this.heartbeat != null) {
handler.setHeartbeatValue(this.heartbeat);
}
handler.setSelectorHeaderName(this.selectorHeaderName);
return handler;
}
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void propertyPassthrough() {
Map<String, BindingProperties> bindings = new HashMap<>();
BindingProperties genericProperties = new BindingProperties();
genericProperties.setContentType("text/plain");
bindings.put("foo", genericProperties);
this.bindingServiceProperties.setBindings(bindings);
Binder binder = mock(Binder.class);
Binder binder2 = mock(Binder.class);
BinderFactory mockBinderFactory = Mockito.mock(BinderFactory.class);
Binding<MessageChannel> fooBinding = Mockito.mock(Binding.class);
Binding<MessageChannel> barBinding = Mockito.mock(Binding.class);
when(binder.bindProducer(matches("foo"), any(DirectChannel.class),
any(ProducerProperties.class))).thenReturn(fooBinding);
when(binder2.bindProducer(matches("bar"), any(DirectChannel.class),
any(ProducerProperties.class))).thenReturn(barBinding);
when(mockBinderFactory.getBinder(null, DirectWithAttributesChannel.class))
.thenReturn(binder);
when(mockBinderFactory.getBinder("someTransport",
DirectWithAttributesChannel.class)).thenReturn(binder2);
BindingService bindingService = new BindingService(this.bindingServiceProperties,
mockBinderFactory);
BinderAwareChannelResolver resolver = new BinderAwareChannelResolver(
bindingService, this.bindingTargetFactory,
new DynamicDestinationsBindable());
resolver.setBeanFactory(this.context.getBeanFactory());
SubscribableChannel resolved = (SubscribableChannel) resolver
.resolveDestination("foo");
verify(binder).bindProducer(eq("foo"), any(MessageChannel.class),
any(ProducerProperties.class));
assertThat(resolved).isSameAs(this.context.getBean("foo"));
this.context.close();
}
protected StompBrokerRelayMessageHandler getMessageHandler(SubscribableChannel brokerChannel) {
StompBrokerRelayMessageHandler handler = new StompBrokerRelayMessageHandler(
getClientInboundChannel(), getClientOutboundChannel(),
brokerChannel, getDestinationPrefixes());
handler.setRelayHost(this.relayHost);
handler.setRelayPort(this.relayPort);
handler.setClientLogin(this.clientLogin);
handler.setClientPasscode(this.clientPasscode);
handler.setSystemLogin(this.systemLogin);
handler.setSystemPasscode(this.systemPasscode);
if (this.systemHeartbeatSendInterval != null) {
handler.setSystemHeartbeatSendInterval(this.systemHeartbeatSendInterval);
}
if (this.systemHeartbeatReceiveInterval != null) {
handler.setSystemHeartbeatReceiveInterval(this.systemHeartbeatReceiveInterval);
}
if (this.virtualHost != null) {
handler.setVirtualHost(this.virtualHost);
}
if (this.tcpClient != null) {
handler.setTcpClient(this.tcpClient);
}
handler.setAutoStartup(this.autoStartup);
return handler;
}
@Override
public SubscribableChannel createOutput(String name) {
DirectWithAttributesChannel subscribableChannel = new DirectWithAttributesChannel();
subscribableChannel.setComponentName(name);
subscribableChannel.setAttribute("type", Source.OUTPUT);
this.messageChannelConfigurer.configureOutputChannel(subscribableChannel, name);
if (context != null && !context.containsBean(name)) {
context.registerBean(name, DirectWithAttributesChannel.class, () -> subscribableChannel);
}
return subscribableChannel;
}
protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) {
if (this.simpleBrokerRegistration == null && this.brokerRelayRegistration == null) {
enableSimpleBroker();
}
if (this.simpleBrokerRegistration != null) {
SimpleBrokerMessageHandler handler = this.simpleBrokerRegistration.getMessageHandler(brokerChannel);
handler.setPathMatcher(this.pathMatcher);
return handler;
}
return null;
}
/**
* Will provision consumer destination as SI {@link DirectChannel}.
*/
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ConsumerProperties properties) throws ProvisioningException {
SubscribableChannel destination = this.provisionDestination(name, false);
if (this.source != null) {
this.source.setChannel(destination);
}
return new SpringIntegrationConsumerDestination(name, destination);
}
@Before
public void setup() {
SubscribableChannel inChannel = mock(SubscribableChannel.class);
SubscribableChannel outChannel = mock(SubscribableChannel.class);
this.webSocketHandler = new SubProtocolWebSocketHandler(inChannel, outChannel);
WebSocketTransportRegistration transport = new WebSocketTransportRegistration();
TaskScheduler scheduler = mock(TaskScheduler.class);
this.endpointRegistry = new WebMvcStompEndpointRegistry(this.webSocketHandler, transport, scheduler);
}
@Input(INPUT0)
SubscribableChannel input0();
@Input(APPLICATIONS_IN)
SubscribableChannel sourceOfLoanApplications();
@Input("admin-login-log-topic")
SubscribableChannel adminLoginLog();
@Input(BROKER_TO_CLIENT)
SubscribableChannel brokerToClient();
@Input(UserSink.INPUT)
SubscribableChannel user();
@Input
SubscribableChannel subscriptionOnMoneyTransferredChannel();
public SubscribableChannel getClientInboundChannel() {
return this.clientInboundChannel;
}
public SubscribableChannel getBrokerChannel() {
return this.brokerChannel;
}
@Input(FeedBinder.USER_INITIALIZED)
SubscribableChannel userInitialized();