下面列出了怎么用org.springframework.messaging.simp.SimpMessagingTemplate的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testHeadersToSend() throws Exception {
Message<?> message = createMessage("sess1", "sub1", "/app", "/dest", null);
SimpMessageSendingOperations messagingTemplate = Mockito.mock(SimpMessageSendingOperations.class);
SendToMethodReturnValueHandler handler = new SendToMethodReturnValueHandler(messagingTemplate, false);
handler.handleReturnValue(PAYLOAD, this.noAnnotationsReturnType, message);
ArgumentCaptor<MessageHeaders> captor = ArgumentCaptor.forClass(MessageHeaders.class);
verify(messagingTemplate).convertAndSend(eq("/topic/dest"), eq(PAYLOAD), captor.capture());
MessageHeaders headers = captor.getValue();
SimpMessageHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class);
assertNotNull(accessor);
assertTrue(accessor.isMutable());
assertEquals("sess1", accessor.getSessionId());
assertNull("Subscription id should not be copied", accessor.getSubscriptionId());
assertEquals(this.noAnnotationsReturnType,
accessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
@Test
public void sendToUserDefaultDestinationSingleSession() throws Exception {
given(this.messageChannel.send(any(Message.class))).willReturn(true);
String sessionId = "sess1";
TestUser user = new TestUser();
Message<?> message = createMessage(sessionId, "sub1", "/app", "/dest", user);
this.handler.handleReturnValue(PAYLOAD, this.sendToUserInSessionDefaultDestReturnType, message);
verify(this.messageChannel, times(1)).send(this.messageCaptor.capture());
SimpMessageHeaderAccessor accessor = getCapturedAccessor(0);
assertEquals(sessionId, accessor.getSessionId());
assertEquals("/user/" + user.getName() + "/queue/dest", accessor.getDestination());
assertEquals(MIME_TYPE, accessor.getContentType());
assertNull("Subscription id should not be copied", accessor.getSubscriptionId());
assertEquals(this.sendToUserInSessionDefaultDestReturnType,
accessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
SimpMessagingTemplate messagingTemplate = new SimpMessagingTemplate(this.messageChannel);
messagingTemplate.setMessageConverter(new StringMessageConverter());
this.handler = new SubscriptionMethodReturnValueHandler(messagingTemplate);
SimpMessagingTemplate jsonMessagingTemplate = new SimpMessagingTemplate(this.messageChannel);
jsonMessagingTemplate.setMessageConverter(new MappingJackson2MessageConverter());
this.jsonHandler = new SubscriptionMethodReturnValueHandler(jsonMessagingTemplate);
Method method = this.getClass().getDeclaredMethod("getData");
this.subscribeEventReturnType = new MethodParameter(method, -1);
method = this.getClass().getDeclaredMethod("getDataAndSendTo");
this.subscribeEventSendToReturnType = new MethodParameter(method, -1);
method = this.getClass().getDeclaredMethod("handle");
this.messageMappingReturnType = new MethodParameter(method, -1);
method = this.getClass().getDeclaredMethod("getJsonView");
this.subscribeEventJsonViewReturnType = new MethodParameter(method, -1);
}
@Test
public void testMessageSentToChannel() throws Exception {
given(this.messageChannel.send(any(Message.class))).willReturn(true);
String sessionId = "sess1";
String subscriptionId = "subs1";
String destination = "/dest";
Message<?> inputMessage = createInputMessage(sessionId, subscriptionId, destination, null);
this.handler.handleReturnValue(PAYLOAD, this.subscribeEventReturnType, inputMessage);
verify(this.messageChannel).send(this.messageCaptor.capture());
assertNotNull(this.messageCaptor.getValue());
Message<?> message = this.messageCaptor.getValue();
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.wrap(message);
assertNull("SimpMessageHeaderAccessor should have disabled id", headerAccessor.getId());
assertNull("SimpMessageHeaderAccessor should have disabled timestamp", headerAccessor.getTimestamp());
assertEquals(sessionId, headerAccessor.getSessionId());
assertEquals(subscriptionId, headerAccessor.getSubscriptionId());
assertEquals(destination, headerAccessor.getDestination());
assertEquals(MIME_TYPE, headerAccessor.getContentType());
assertEquals(this.subscribeEventReturnType, headerAccessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testHeadersPassedToMessagingTemplate() throws Exception {
String sessionId = "sess1";
String subscriptionId = "subs1";
String destination = "/dest";
Message<?> inputMessage = createInputMessage(sessionId, subscriptionId, destination, null);
MessageSendingOperations messagingTemplate = Mockito.mock(MessageSendingOperations.class);
SubscriptionMethodReturnValueHandler handler = new SubscriptionMethodReturnValueHandler(messagingTemplate);
handler.handleReturnValue(PAYLOAD, this.subscribeEventReturnType, inputMessage);
ArgumentCaptor<MessageHeaders> captor = ArgumentCaptor.forClass(MessageHeaders.class);
verify(messagingTemplate).convertAndSend(eq("/dest"), eq(PAYLOAD), captor.capture());
SimpMessageHeaderAccessor headerAccessor =
MessageHeaderAccessor.getAccessor(captor.getValue(), SimpMessageHeaderAccessor.class);
assertNotNull(headerAccessor);
assertTrue(headerAccessor.isMutable());
assertEquals(sessionId, headerAccessor.getSessionId());
assertEquals(subscriptionId, headerAccessor.getSubscriptionId());
assertEquals(this.subscribeEventReturnType, headerAccessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
@Test
public void sendToUserSingleSession() throws Exception {
given(this.messageChannel.send(any(Message.class))).willReturn(true);
String sessionId = "sess1";
TestUser user = new TestUser();
Message<?> inputMessage = createInputMessage(sessionId, "sub1", null, null, user);
this.handler.handleReturnValue(PAYLOAD, this.sendToUserSingleSessionReturnType, inputMessage);
verify(this.messageChannel, times(2)).send(this.messageCaptor.capture());
SimpMessageHeaderAccessor accessor = getCapturedAccessor(0);
assertEquals(sessionId, accessor.getSessionId());
assertEquals(MIME_TYPE, accessor.getContentType());
assertEquals("/user/" + user.getName() + "/dest1", accessor.getDestination());
assertNull("Subscription id should not be copied", accessor.getSubscriptionId());
assertEquals(this.sendToUserSingleSessionReturnType, accessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
accessor = getCapturedAccessor(1);
assertEquals(sessionId, accessor.getSessionId());
assertEquals("/user/" + user.getName() + "/dest2", accessor.getDestination());
assertEquals(MIME_TYPE, accessor.getContentType());
assertNull("Subscription id should not be copied", accessor.getSubscriptionId());
assertEquals(this.sendToUserSingleSessionReturnType, accessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testHeadersPassedToMessagingTemplate() throws Exception {
String sessionId = "sess1";
String subscriptionId = "subs1";
String destination = "/dest";
Message<?> inputMessage = createInputMessage(sessionId, subscriptionId, destination, null);
MessageSendingOperations messagingTemplate = Mockito.mock(MessageSendingOperations.class);
SubscriptionMethodReturnValueHandler handler = new SubscriptionMethodReturnValueHandler(messagingTemplate);
handler.handleReturnValue(PAYLOAD, this.subscribeEventReturnType, inputMessage);
ArgumentCaptor<MessageHeaders> captor = ArgumentCaptor.forClass(MessageHeaders.class);
verify(messagingTemplate).convertAndSend(eq("/dest"), eq(PAYLOAD), captor.capture());
SimpMessageHeaderAccessor headerAccessor =
MessageHeaderAccessor.getAccessor(captor.getValue(), SimpMessageHeaderAccessor.class);
assertNotNull(headerAccessor);
assertTrue(headerAccessor.isMutable());
assertEquals(sessionId, headerAccessor.getSessionId());
assertEquals(subscriptionId, headerAccessor.getSubscriptionId());
assertEquals(this.subscribeEventReturnType, headerAccessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
@Test
public void sendToUserDefaultDestinationSingleSession() throws Exception {
given(this.messageChannel.send(any(Message.class))).willReturn(true);
String sessionId = "sess1";
TestUser user = new TestUser();
Message<?> inputMessage = createInputMessage(sessionId, "sub1", "/app", "/dest", user);
this.handler.handleReturnValue(PAYLOAD, this.sendToUserSingleSessionDefaultDestReturnType, inputMessage);
verify(this.messageChannel, times(1)).send(this.messageCaptor.capture());
SimpMessageHeaderAccessor accessor = getCapturedAccessor(0);
assertEquals(sessionId, accessor.getSessionId());
assertEquals("/user/" + user.getName() + "/queue/dest", accessor.getDestination());
assertEquals(MIME_TYPE, accessor.getContentType());
assertNull("Subscription id should not be copied", accessor.getSubscriptionId());
assertEquals(this.sendToUserSingleSessionDefaultDestReturnType, accessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
@Inject
public PageResource(
PageService pageService,
PageRepository pageRepository,
SimpMessagingTemplate messagingTemplate,
ContractToPageMapper contractToPageMapper,
AssetService<Page> pageAssetService,
AssetVisitor assetVisitor,
ComponentVisitor componentVisitor, AuthRulesCollector authRulesCollector) {
super(pageAssetService, pageRepository, assetVisitor, Optional.of(messagingTemplate));
this.pageRepository = pageRepository;
this.messagingTemplate = messagingTemplate;
this.contractToPageMapper = contractToPageMapper;
this.pageService = pageService;
this.componentVisitor = componentVisitor;
this.authRulesCollector = authRulesCollector;
}
@Test
public void testHeadersToSend() throws Exception {
Message<?> message = createMessage("sess1", "sub1", "/app", "/dest", null);
SimpMessageSendingOperations messagingTemplate = Mockito.mock(SimpMessageSendingOperations.class);
SendToMethodReturnValueHandler handler = new SendToMethodReturnValueHandler(messagingTemplate, false);
handler.handleReturnValue(PAYLOAD, this.noAnnotationsReturnType, message);
ArgumentCaptor<MessageHeaders> captor = ArgumentCaptor.forClass(MessageHeaders.class);
verify(messagingTemplate).convertAndSend(eq("/topic/dest"), eq(PAYLOAD), captor.capture());
MessageHeaders headers = captor.getValue();
SimpMessageHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class);
assertNotNull(accessor);
assertTrue(accessor.isMutable());
assertEquals("sess1", accessor.getSessionId());
assertNull("Subscription id should not be copied", accessor.getSubscriptionId());
assertEquals(this.noAnnotationsReturnType,
accessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
@Test
public void testHeadersToSend() throws Exception {
Message<?> inputMessage = createInputMessage("sess1", "sub1", "/app", "/dest", null);
SimpMessageSendingOperations messagingTemplate = Mockito.mock(SimpMessageSendingOperations.class);
SendToMethodReturnValueHandler handler = new SendToMethodReturnValueHandler(messagingTemplate, false);
handler.handleReturnValue(PAYLOAD, this.noAnnotationsReturnType, inputMessage);
ArgumentCaptor<MessageHeaders> captor = ArgumentCaptor.forClass(MessageHeaders.class);
verify(messagingTemplate).convertAndSend(eq("/topic/dest"), eq(PAYLOAD), captor.capture());
MessageHeaders messageHeaders = captor.getValue();
SimpMessageHeaderAccessor accessor = getAccessor(messageHeaders, SimpMessageHeaderAccessor.class);
assertNotNull(accessor);
assertTrue(accessor.isMutable());
assertEquals("sess1", accessor.getSessionId());
assertNull("Subscription id should not be copied", accessor.getSubscriptionId());
assertEquals(this.noAnnotationsReturnType, accessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(this.brokerChannel.send(any())).thenReturn(true);
this.converter = new MappingJackson2MessageConverter();
SimpMessagingTemplate brokerTemplate = new SimpMessagingTemplate(this.brokerChannel);
brokerTemplate.setMessageConverter(this.converter);
this.localRegistry = mock(SimpUserRegistry.class);
this.multiServerRegistry = new MultiServerUserRegistry(this.localRegistry);
this.handler = new UserRegistryMessageHandler(this.multiServerRegistry, brokerTemplate,
"/topic/simp-user-registry", this.taskScheduler);
}
@Test
public void sendTo() throws Exception {
given(this.messageChannel.send(any(Message.class))).willReturn(true);
String sessionId = "sess1";
Message<?> inputMessage = createInputMessage(sessionId, "sub1", null, null, null);
this.handler.handleReturnValue(PAYLOAD, this.sendToReturnType, inputMessage);
verify(this.messageChannel, times(2)).send(this.messageCaptor.capture());
SimpMessageHeaderAccessor accessor = getCapturedAccessor(0);
assertEquals(sessionId, accessor.getSessionId());
assertEquals("/dest1", accessor.getDestination());
assertEquals(MIME_TYPE, accessor.getContentType());
assertNull("Subscription id should not be copied", accessor.getSubscriptionId());
assertEquals(this.sendToReturnType, accessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
accessor = getCapturedAccessor(1);
assertEquals(sessionId, accessor.getSessionId());
assertEquals("/dest2", accessor.getDestination());
assertEquals(MIME_TYPE, accessor.getContentType());
assertNull("Subscription id should not be copied", accessor.getSubscriptionId());
assertEquals(this.sendToReturnType, accessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
@Test
public void testMessageSentToChannel() throws Exception {
given(this.messageChannel.send(any(Message.class))).willReturn(true);
String sessionId = "sess1";
String subscriptionId = "subs1";
String destination = "/dest";
Message<?> inputMessage = createInputMessage(sessionId, subscriptionId, destination, null);
this.handler.handleReturnValue(PAYLOAD, this.subscribeEventReturnType, inputMessage);
verify(this.messageChannel).send(this.messageCaptor.capture());
assertNotNull(this.messageCaptor.getValue());
Message<?> message = this.messageCaptor.getValue();
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.wrap(message);
assertNull("SimpMessageHeaderAccessor should have disabled id", headerAccessor.getId());
assertNull("SimpMessageHeaderAccessor should have disabled timestamp", headerAccessor.getTimestamp());
assertEquals(sessionId, headerAccessor.getSessionId());
assertEquals(subscriptionId, headerAccessor.getSubscriptionId());
assertEquals(destination, headerAccessor.getDestination());
assertEquals(MIME_TYPE, headerAccessor.getContentType());
assertEquals(this.subscribeEventReturnType, headerAccessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
@Test
public void sendToNoAnnotations() throws Exception {
given(this.messageChannel.send(any(Message.class))).willReturn(true);
Message<?> inputMessage = createInputMessage("sess1", "sub1", "/app", "/dest", null);
this.handler.handleReturnValue(PAYLOAD, this.noAnnotationsReturnType, inputMessage);
verify(this.messageChannel, times(1)).send(this.messageCaptor.capture());
SimpMessageHeaderAccessor accessor = getCapturedAccessor(0);
assertEquals("sess1", accessor.getSessionId());
assertEquals("/topic/dest", accessor.getDestination());
assertEquals(MIME_TYPE, accessor.getContentType());
assertNull("Subscription id should not be copied", accessor.getSubscriptionId());
assertEquals(this.noAnnotationsReturnType, accessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(this.brokerChannel.send(any())).thenReturn(true);
this.converter = new MappingJackson2MessageConverter();
SimpMessagingTemplate brokerTemplate = new SimpMessagingTemplate(this.brokerChannel);
brokerTemplate.setMessageConverter(this.converter);
this.localRegistry = mock(SimpUserRegistry.class);
this.multiServerRegistry = new MultiServerUserRegistry(this.localRegistry);
this.handler = new UserRegistryMessageHandler(this.multiServerRegistry, brokerTemplate,
"/topic/simp-user-registry", this.taskScheduler);
}
public ResultEntity operate(String msg,
Session session,
Map<String, Object> map,
SimpMessagingTemplate smt) throws Exception {
// {"cmdType":"start/stop", "projectName":"", "topoName":"testdb", "jarPath":""}
// {"cmdType":"stop", "topoName":"testdb", "id":10, "uid": topoName+time}
// {"cmdType":"start", "topoName":"testdb", "id":10, "uid": topoName+time}
Map<String, Object> param = null;
if (session != null) {
ObjectMapper mapper = new ObjectMapper();
param = mapper.readValue(msg, new TypeReference<HashMap<String, Object>>() {
});
} else {
param = map;
}
ResultEntity result = null;
if (StringUtils.equals("start", (String) param.get("cmdType")))
result = start(param, session, smt);
else if (StringUtils.equals("stop", (String) param.get("cmdType")))
result = stop(param, session, smt);
return result;
}
/**
* Constructor.
* @param webKafkaConsumerFactory For creating new Consumers.
* @param messagingTemplate For publishing consumed messages back through the web socket.
* @param maxConcurrentConsumers Configuration, how many consumers to run.
*/
public WebSocketConsumersManager(
final WebKafkaConsumerFactory webKafkaConsumerFactory,
final SimpMessagingTemplate messagingTemplate,
final int maxConcurrentConsumers
) {
this(
webKafkaConsumerFactory,
messagingTemplate,
// Setup managed thread pool with number of concurrent threads.
new ThreadPoolExecutor(
maxConcurrentConsumers,
maxConcurrentConsumers,
5,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(100)
)
);
}
@Autowired
public HrMaxQuizService(KieContainer kieContainer, SimpMessagingTemplate template) {
log.info("Initialising a new quiz session.");
this.kieSession = kieContainer.newKieSession("HrmaxSession");
this.agendaEventPublisher = new PublishingAgendaEventListener(template);
this.agendaEventListener = new LoggingAgendaEventListener();
this.ruleRuntimeEventListener = new LoggingRuleRuntimeEventListener();
this.kieSession.addEventListener(agendaEventPublisher);
this.kieSession.addEventListener(agendaEventListener);
this.kieSession.addEventListener(ruleRuntimeEventListener);
this.kieSession.fireAllRules();
}
/**
* Create an instance of SimpAnnotationMethodMessageHandler with the given
* message channels and broker messaging template.
* @param clientInboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
* @param clientOutboundChannel the channel for messages to clients (e.g. WebSocket clients)
* @param brokerTemplate a messaging template to send application messages to the broker
*/
public SimpAnnotationMethodMessageHandler(SubscribableChannel clientInboundChannel,
MessageChannel clientOutboundChannel, SimpMessageSendingOperations brokerTemplate) {
Assert.notNull(clientInboundChannel, "clientInboundChannel must not be null");
Assert.notNull(clientOutboundChannel, "clientOutboundChannel must not be null");
Assert.notNull(brokerTemplate, "brokerTemplate must not be null");
this.clientInboundChannel = clientInboundChannel;
this.clientMessagingTemplate = new SimpMessagingTemplate(clientOutboundChannel);
this.brokerTemplate = brokerTemplate;
Collection<MessageConverter> converters = new ArrayList<MessageConverter>();
converters.add(new StringMessageConverter());
converters.add(new ByteArrayMessageConverter());
this.messageConverter = new CompositeMessageConverter(converters);
}
@Test
public void sendToDefaultDestination() throws Exception {
given(this.messageChannel.send(any(Message.class))).willReturn(true);
String sessionId = "sess1";
Message<?> inputMessage = createInputMessage(sessionId, "sub1", "/app", "/dest", null);
this.handler.handleReturnValue(PAYLOAD, this.sendToDefaultDestReturnType, inputMessage);
verify(this.messageChannel, times(1)).send(this.messageCaptor.capture());
SimpMessageHeaderAccessor accessor = getCapturedAccessor(0);
assertEquals(sessionId, accessor.getSessionId());
assertEquals("/topic/dest", accessor.getDestination());
assertEquals(MIME_TYPE, accessor.getContentType());
assertNull("Subscription id should not be copied", accessor.getSubscriptionId());
assertEquals(this.sendToDefaultDestReturnType, accessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
/**
* Create an instance of SimpAnnotationMethodMessageHandler with the given
* message channels and broker messaging template.
* @param clientInboundChannel the channel for receiving messages from clients (e.g. WebSocket clients)
* @param clientOutboundChannel the channel for messages to clients (e.g. WebSocket clients)
* @param brokerTemplate a messaging template to send application messages to the broker
*/
public SimpAnnotationMethodMessageHandler(SubscribableChannel clientInboundChannel,
MessageChannel clientOutboundChannel, SimpMessageSendingOperations brokerTemplate) {
Assert.notNull(clientInboundChannel, "clientInboundChannel must not be null");
Assert.notNull(clientOutboundChannel, "clientOutboundChannel must not be null");
Assert.notNull(brokerTemplate, "brokerTemplate must not be null");
this.clientInboundChannel = clientInboundChannel;
this.clientMessagingTemplate = new SimpMessagingTemplate(clientOutboundChannel);
this.brokerTemplate = brokerTemplate;
Collection<MessageConverter> converters = new ArrayList<>();
converters.add(new StringMessageConverter());
converters.add(new ByteArrayMessageConverter());
this.messageConverter = new CompositeMessageConverter(converters);
}
private MessageHeaders createHeaders(@Nullable String sessionId, String subscriptionId, MethodParameter returnType) {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
if (getHeaderInitializer() != null) {
getHeaderInitializer().initHeaders(accessor);
}
if (sessionId != null) {
accessor.setSessionId(sessionId);
}
accessor.setSubscriptionId(subscriptionId);
accessor.setHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER, returnType);
accessor.setLeaveMutable(true);
return accessor.getMessageHeaders();
}
private MessageHeaders createHeaders(@Nullable String sessionId, MethodParameter returnType) {
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
if (getHeaderInitializer() != null) {
getHeaderInitializer().initHeaders(headerAccessor);
}
if (sessionId != null) {
headerAccessor.setSessionId(sessionId);
}
headerAccessor.setHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER, returnType);
headerAccessor.setLeaveMutable(true);
return headerAccessor.getMessageHeaders();
}
/**
* Create an instance with the given client and broker channels subscribing
* to handle messages from each and then sending any resolved messages to the
* broker channel.
* @param clientInboundChannel messages received from clients.
* @param brokerChannel messages sent to the broker.
* @param resolver the resolver for "user" destinations.
*/
public UserDestinationMessageHandler(SubscribableChannel clientInboundChannel,
SubscribableChannel brokerChannel, UserDestinationResolver resolver) {
Assert.notNull(clientInboundChannel, "'clientInChannel' must not be null");
Assert.notNull(brokerChannel, "'brokerChannel' must not be null");
Assert.notNull(resolver, "resolver must not be null");
this.clientInboundChannel = clientInboundChannel;
this.brokerChannel = brokerChannel;
this.messagingTemplate = new SimpMessagingTemplate(brokerChannel);
this.destinationResolver = resolver;
}
/**
* Constructor.
* @param userRegistry the registry with local and remote user registry information
* @param brokerTemplate template for broadcasting local registry information
* @param broadcastDestination the destination to broadcast to
* @param scheduler the task scheduler to use
*/
public UserRegistryMessageHandler(MultiServerUserRegistry userRegistry,
SimpMessagingTemplate brokerTemplate, String broadcastDestination, TaskScheduler scheduler) {
Assert.notNull(userRegistry, "'userRegistry' is required");
Assert.notNull(brokerTemplate, "'brokerTemplate' is required");
Assert.hasText(broadcastDestination, "'broadcastDestination' is required");
Assert.notNull(scheduler, "'scheduler' is required");
this.userRegistry = userRegistry;
this.brokerTemplate = brokerTemplate;
this.broadcastDestination = broadcastDestination;
this.scheduler = scheduler;
}
@Bean
public SimpMessagingTemplate brokerMessagingTemplate() {
SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel());
String prefix = getBrokerRegistry().getUserDestinationPrefix();
if (prefix != null) {
template.setUserDestinationPrefix(prefix);
}
template.setMessageConverter(brokerMessageConverter());
return template;
}
@Inject
public WidgetResource(JacksonObjectMapper objectMapper,
WidgetRepository widgetRepository,
WidgetService widgetService,
AssetService<Widget> widgetAssetService,
@Named("widgetPath") Path widgetPath,
List<WidgetContainerRepository> widgetContainerRepositories,
AssetVisitor assetVisitor) {
super(widgetAssetService, widgetRepository, assetVisitor, Optional.<SimpMessagingTemplate>absent());
this.widgetRepository = widgetRepository;
this.objectMapper = objectMapper;
this.widgetService = widgetService;
this.widgetPath = widgetPath;
this.widgetContainerRepositories = widgetContainerRepositories;
}
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
SimpMessagingTemplate messagingTemplate = new SimpMessagingTemplate(this.messageChannel);
messagingTemplate.setMessageConverter(new StringMessageConverter());
this.handler = new SendToMethodReturnValueHandler(messagingTemplate, true);
this.handlerAnnotationNotRequired = new SendToMethodReturnValueHandler(messagingTemplate, false);
SimpMessagingTemplate jsonMessagingTemplate = new SimpMessagingTemplate(this.messageChannel);
jsonMessagingTemplate.setMessageConverter(new MappingJackson2MessageConverter());
this.jsonHandler = new SendToMethodReturnValueHandler(jsonMessagingTemplate, true);
}
private void assertResponse(MethodParameter methodParameter, String sessionId,
int index, String destination) {
SimpMessageHeaderAccessor accessor = getCapturedAccessor(index);
assertEquals(sessionId, accessor.getSessionId());
assertEquals(destination, accessor.getDestination());
assertEquals(MIME_TYPE, accessor.getContentType());
assertNull("Subscription id should not be copied", accessor.getSubscriptionId());
assertEquals(methodParameter, accessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}