下面列出了org.springframework.util.IdGenerator#org.springframework.messaging.Message 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void attemptToReadDisallowedUserPropertyIsNotFatal() throws JMSException {
javax.jms.Message jmsMessage = new StubTextMessage() {
@Override
public Object getObjectProperty(String name) throws JMSException {
if (name.equals("fail")) {
throw new JMSException("illegal property");
}
else {
return super.getObjectProperty(name);
}
}
};
jmsMessage.setBooleanProperty("fail", true);
assertAttemptReadDisallowedPropertyIsNotFatal(jmsMessage, "fail");
}
@Test
public void leaveMutableDefaultBehavior() {
MessageHeaderAccessor accessor = new MessageHeaderAccessor();
accessor.setHeader("foo", "bar");
MessageHeaders headers = accessor.getMessageHeaders();
Message<?> message = MessageBuilder.createMessage("payload", headers);
this.thrown.expect(IllegalStateException.class);
this.thrown.expectMessage("Already immutable");
accessor.setLeaveMutable(true);
this.thrown.expect(IllegalStateException.class);
this.thrown.expectMessage("Already immutable");
accessor.setHeader("foo", "baz");
assertEquals("bar", headers.get("foo"));
assertSame(accessor, MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class));
}
@Test
public void send() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String destination = "/topic/foo";
String payload = "sample payload";
this.session.send(destination, payload);
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.SEND, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 2, stompHeaders.size());
assertEquals(destination, stompHeaders.getDestination());
assertEquals(new MimeType("text", "plain", StandardCharsets.UTF_8), stompHeaders.getContentType());
assertEquals(-1, stompHeaders.getContentLength()); // StompEncoder isn't involved
assertEquals(payload, new String(message.getPayload(), StandardCharsets.UTF_8));
}
@Test // SPR-12444
public void handleMessageToOtherUser() {
TestSimpUser otherSimpUser = new TestSimpUser("anna");
otherSimpUser.addSessions(new TestSimpSession("456"));
given(this.registry.getUser("anna")).willReturn(otherSimpUser);
TestPrincipal user = new TestPrincipal("joe");
TestPrincipal otherUser = new TestPrincipal("anna");
String sourceDestination = "/user/anna/queue/foo";
Message<?> message = createMessage(SimpMessageType.MESSAGE, user, "456", sourceDestination);
UserDestinationResult actual = this.resolver.resolveDestination(message);
assertEquals(sourceDestination, actual.getSourceDestination());
assertEquals(1, actual.getTargetDestinations().size());
assertEquals("/queue/foo-user456", actual.getTargetDestinations().iterator().next());
assertEquals("/user/queue/foo", actual.getSubscribeDestination());
assertEquals(otherUser.getName(), actual.getUser());
}
@Test
public void toMessage() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
MyBean payload = new MyBean();
payload.setString("Foo");
payload.setNumber(42);
payload.setFraction(42F);
payload.setArray(new String[]{"Foo", "Bar"});
payload.setBool(true);
payload.setBytes(new byte[]{0x1, 0x2});
Message<?> message = converter.toMessage(payload, null);
String actual = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
assertTrue(actual.contains("\"string\":\"Foo\""));
assertTrue(actual.contains("\"number\":42"));
assertTrue(actual.contains("fraction\":42.0"));
assertTrue(actual.contains("\"array\":[\"Foo\",\"Bar\"]"));
assertTrue(actual.contains("\"bool\":true"));
assertTrue(actual.contains("\"bytes\":\"AQI=\""));
assertEquals("Invalid content-type", new MimeType("application", "json", StandardCharsets.UTF_8),
message.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class));
}
protected void processHandlerMethodException(HandlerMethod handlerMethod, Exception exception, Message<?> message) {
InvocableHandlerMethod invocable = getExceptionHandlerMethod(handlerMethod, exception);
if (invocable == null) {
logger.error("Unhandled exception from message handler method", exception);
return;
}
invocable.setMessageMethodArgumentResolvers(this.argumentResolvers);
if (logger.isDebugEnabled()) {
logger.debug("Invoking " + invocable.getShortLogMessage());
}
try {
Throwable cause = exception.getCause();
Object returnValue = (cause != null ?
invocable.invoke(message, exception, cause, handlerMethod) :
invocable.invoke(message, exception, handlerMethod));
MethodParameter returnType = invocable.getReturnType();
if (void.class == returnType.getParameterType()) {
return;
}
this.returnValueHandlers.handleReturnValue(returnValue, returnType, message);
}
catch (Throwable ex2) {
logger.error("Error while processing handler method exception", ex2);
}
}
@Test // SPR-14690
public void handleMessageFromClientWithTokenAuthentication() {
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel();
channel.addInterceptor(new AuthenticationInterceptor("[email protected]"));
channel.addInterceptor(new ImmutableMessageChannelInterceptor());
TestMessageHandler messageHandler = new TestMessageHandler();
channel.subscribe(messageHandler);
StompSubProtocolHandler handler = new StompSubProtocolHandler();
handler.afterSessionStarted(this.session, channel);
TextMessage wsMessage = StompTextMessageBuilder.create(StompCommand.CONNECT).build();
handler.handleMessageFromClient(this.session, wsMessage, channel);
assertEquals(1, messageHandler.getMessages().size());
Message<?> message = messageHandler.getMessages().get(0);
Principal user = SimpMessageHeaderAccessor.getUser(message.getHeaders());
assertNotNull(user);
assertEquals("[email protected]", user.getName());
}
@Test
public void attemptToWriteDisallowedTypePropertyIsNotFatal() throws JMSException {
Message<String> message = initBuilder()
.setHeader(JmsHeaders.TYPE, "someType")
.setHeader("foo", "bar")
.build();
javax.jms.Message jmsMessage = new StubTextMessage() {
@Override
public void setJMSType(String type) throws JMSException {
throw new JMSException("illegal property");
}
};
mapper.fromHeaders(message.getHeaders(), jmsMessage);
assertNull(jmsMessage.getJMSType());
assertNotNull(jmsMessage.getStringProperty("foo"));
assertEquals("bar", jmsMessage.getStringProperty("foo"));
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
Class<?> targetMessageType = parameter.getParameterType();
Class<?> targetPayloadType = getPayloadType(parameter);
if (!targetMessageType.isAssignableFrom(message.getClass())) {
throw new MethodArgumentTypeMismatchException(message, parameter, "Actual message type '" +
ClassUtils.getDescriptiveType(message) + "' does not match expected type '" +
ClassUtils.getQualifiedName(targetMessageType) + "'");
}
Object payload = message.getPayload();
if (targetPayloadType.isInstance(payload)) {
return message;
}
if (isEmptyPayload(payload)) {
throw new MessageConversionException(message, "Cannot convert from actual payload type '" +
ClassUtils.getDescriptiveType(payload) + "' to expected payload type '" +
ClassUtils.getQualifiedName(targetPayloadType) + "' when payload is empty");
}
payload = convertPayload(message, parameter, targetPayloadType);
return MessageBuilder.createMessage(payload, message.getHeaders());
}
@Test
public void clientOutboundChannelUsedByAnnotatedMethod() {
ApplicationContext context = loadConfig(SimpleBrokerConfig.class);
TestChannel channel = context.getBean("clientOutboundChannel", TestChannel.class);
SimpAnnotationMethodMessageHandler messageHandler =
context.getBean(SimpAnnotationMethodMessageHandler.class);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
headers.setSessionId("sess1");
headers.setSessionAttributes(new ConcurrentHashMap<>());
headers.setSubscriptionId("subs1");
headers.setDestination("/foo");
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
messageHandler.handleMessage(message);
message = channel.messages.get(0);
headers = StompHeaderAccessor.wrap(message);
assertEquals(SimpMessageType.MESSAGE, headers.getMessageType());
assertEquals("/foo", headers.getDestination());
assertEquals("bar", new String((byte[]) message.getPayload()));
}
@Test
public void ack() {
this.session.afterConnected(this.connection);
assertTrue(this.session.isConnected());
String messageId = "123";
this.session.acknowledge(messageId, true);
Message<byte[]> message = this.messageCaptor.getValue();
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.ACK, accessor.getCommand());
StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(accessor.getNativeHeaders());
assertEquals(stompHeaders.toString(), 1, stompHeaders.size());
assertEquals(messageId, stompHeaders.getId());
}
@Override
public Subscription subscribe(StompHeaders headers, StompFrameHandler handler) {
Assert.hasText(headers.getDestination(), "Destination header is required");
Assert.notNull(handler, "StompFrameHandler must not be null");
String subscriptionId = headers.getId();
if (!StringUtils.hasText(subscriptionId)) {
subscriptionId = String.valueOf(DefaultStompSession.this.subscriptionIndex.getAndIncrement());
headers.setId(subscriptionId);
}
checkOrAddReceipt(headers);
Subscription subscription = new DefaultSubscription(headers, handler);
StompHeaderAccessor accessor = createHeaderAccessor(StompCommand.SUBSCRIBE);
accessor.addNativeHeaders(headers);
Message<byte[]> message = createMessage(accessor, EMPTY_PAYLOAD);
execute(message);
return subscription;
}
@Test
public void clientInboundChannelSendMessage() throws Exception {
ApplicationContext config = createConfig(TestChannelConfig.class, TestConfigurer.class);
TestChannel channel = config.getBean("clientInboundChannel", TestChannel.class);
SubProtocolWebSocketHandler webSocketHandler = config.getBean(SubProtocolWebSocketHandler.class);
List<ChannelInterceptor> interceptors = channel.getInterceptors();
assertEquals(ImmutableMessageChannelInterceptor.class, interceptors.get(interceptors.size()-1).getClass());
TestWebSocketSession session = new TestWebSocketSession("s1");
session.setOpen(true);
webSocketHandler.afterConnectionEstablished(session);
webSocketHandler.handleMessage(session,
StompTextMessageBuilder.create(StompCommand.SEND).headers("destination:/foo").build());
Message<?> message = channel.messages.get(0);
StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertNotNull(accessor);
assertFalse(accessor.isMutable());
assertEquals(SimpMessageType.MESSAGE, accessor.getMessageType());
assertEquals("/foo", accessor.getDestination());
}
@Test
public void replyWithCustomTimeToLive() throws JMSException {
Session session = mock(Session.class);
Queue replyDestination = mock(Queue.class);
given(session.createQueue("queueOut")).willReturn(replyDestination);
MessageProducer messageProducer = mock(MessageProducer.class);
TextMessage responseMessage = mock(TextMessage.class);
given(session.createTextMessage("Response")).willReturn(responseMessage);
given(session.createProducer(replyDestination)).willReturn(messageProducer);
MessagingMessageListenerAdapter listener = getPayloadInstance("Response", "replyPayloadToQueue", Message.class);
QosSettings settings = new QosSettings();
settings.setTimeToLive(6000);
listener.setResponseQosSettings(settings);
listener.onMessage(mock(javax.jms.Message.class), session);
verify(session).createQueue("queueOut");
verify(session).createTextMessage("Response");
verify(messageProducer).send(responseMessage, javax.jms.Message.DEFAULT_DELIVERY_MODE,
javax.jms.Message.DEFAULT_PRIORITY, 6000);
verify(messageProducer).close();
}
private Object convertPayload(Message<?> message, MethodParameter parameter, Class<?> targetPayloadType) {
Object result = null;
if (this.converter instanceof SmartMessageConverter) {
SmartMessageConverter smartConverter = (SmartMessageConverter) this.converter;
result = smartConverter.fromMessage(message, targetPayloadType, parameter);
}
else if (this.converter != null) {
result = this.converter.fromMessage(message, targetPayloadType);
}
if (result == null) {
throw new MessageConversionException(message, "No converter found from actual payload type '" +
ClassUtils.getDescriptiveType(message.getPayload()) + "' to expected payload type '" +
ClassUtils.getQualifiedName(targetPayloadType) + "'");
}
return result;
}
@Test
public void resolveNameFromSystemProperty() throws Exception {
System.setProperty("systemProperty", "sysbar");
try {
Message<byte[]> message = MessageBuilder.withPayload(new byte[0]).setHeader("sysbar", "foo").build();
Object result = resolver.resolveArgument(paramSystemPropertyName, message);
assertEquals("foo", result);
}
finally {
System.clearProperty("systemProperty");
}
}
@Override
protected Mono<Void> handleEncodedContent(
Flux<DataBuffer> encodedContent, MethodParameter returnType, Message<?> message) {
this.encodedContent = encodedContent.cache();
return this.encodedContent.then();
}
@Test
public void doSendWithStompHeaders() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
accessor.setDestination("/user/queue/foo");
Message<?> message = MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders());
this.messagingTemplate.doSend("/queue/foo-user123", message);
List<Message<byte[]>> messages = this.messageChannel.getMessages();
Message<byte[]> sentMessage = messages.get(0);
MessageHeaderAccessor sentAccessor = MessageHeaderAccessor.getAccessor(sentMessage, MessageHeaderAccessor.class);
assertEquals(StompHeaderAccessor.class, sentAccessor.getClass());
assertEquals("/queue/foo-user123", ((StompHeaderAccessor) sentAccessor).getDestination());
}
private void addMatchesToCollection(Collection<T> mappingsToCheck, Message<?> message, List<Match> matches) {
for (T mapping : mappingsToCheck) {
T match = getMatchingMapping(mapping, message);
if (match != null) {
matches.add(new Match(match, this.handlerMethods.get(mapping)));
}
}
}
@Override
public final boolean send(Message<?> message, long timeout) {
Assert.notNull(message, "Message must not be null");
Message<?> messageToUse = message;
ChannelInterceptorChain chain = new ChannelInterceptorChain();
boolean sent = false;
try {
messageToUse = chain.applyPreSend(messageToUse, this);
if (messageToUse == null) {
return false;
}
sent = sendInternal(messageToUse, timeout);
chain.applyPostSend(messageToUse, this, sent);
chain.triggerAfterSendCompletion(messageToUse, this, sent, null);
return sent;
}
catch (Exception ex) {
chain.triggerAfterSendCompletion(messageToUse, this, sent, ex);
if (ex instanceof MessagingException) {
throw (MessagingException) ex;
}
throw new MessageDeliveryException(messageToUse,"Failed to send message to " + this, ex);
}
catch (Throwable err) {
MessageDeliveryException ex2 =
new MessageDeliveryException(messageToUse, "Failed to send message to " + this, err);
chain.triggerAfterSendCompletion(messageToUse, this, sent, ex2);
throw ex2;
}
}
@Test
public void send() {
Message<?> message = new GenericMessage<Object>("payload");
this.template.setDefaultDestination("home");
this.template.send(message);
assertEquals("home", this.template.destination);
assertSame(message, this.template.message);
}
@Test
public void compareNumberOfMatchingPatterns() throws Exception {
Message<?> message = messageTo("/foo");
DestinationPatternsMessageCondition c1 = condition("/foo", "bar");
DestinationPatternsMessageCondition c2 = condition("/foo", "f*");
DestinationPatternsMessageCondition match1 = c1.getMatchingCondition(message);
DestinationPatternsMessageCondition match2 = c2.getMatchingCondition(message);
assertEquals(1, match1.compareTo(match2, message));
}
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Message<?> messageToUse = message;
if (this.broadcastHandler != null) {
messageToUse = this.broadcastHandler.preHandle(message);
if (messageToUse == null) {
return;
}
}
UserDestinationResult result = this.destinationResolver.resolveDestination(messageToUse);
if (result == null) {
return;
}
if (result.getTargetDestinations().isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("No active sessions for user destination: " + result.getSourceDestination());
}
if (this.broadcastHandler != null) {
this.broadcastHandler.handleUnresolved(messageToUse);
}
return;
}
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(messageToUse);
initHeaders(accessor);
accessor.setNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION, result.getSubscribeDestination());
accessor.setLeaveMutable(true);
messageToUse = MessageBuilder.createMessage(messageToUse.getPayload(), accessor.getMessageHeaders());
if (logger.isTraceEnabled()) {
logger.trace("Translated " + result.getSourceDestination() + " -> " + result.getTargetDestinations());
}
for (String target : result.getTargetDestinations()) {
this.messagingTemplate.send(target, messageToUse);
}
}
@Test
public void attemptToReadDisallowedExpirationPropertyIsNotFatal() throws JMSException {
javax.jms.Message jmsMessage = new StubTextMessage() {
@Override
public long getJMSExpiration() throws JMSException {
throw new JMSException("illegal property");
}
};
assertAttemptReadDisallowedPropertyIsNotFatal(jmsMessage, JmsHeaders.EXPIRATION);
}
@Override
@Nullable
protected Message<?> doSendAndReceive(Destination destination, Message<?> requestMessage) {
try {
javax.jms.Message jmsMessage = obtainJmsTemplate().sendAndReceive(
destination, createMessageCreator(requestMessage));
return convertJmsMessage(jmsMessage);
}
catch (JmsException ex) {
throw convertJmsException(ex);
}
}
@Test
public void receiveAndConvertNoConverter() {
Message<?> expected = new GenericMessage<>("payload");
this.template.setDefaultDestination("home");
this.template.setReceiveMessage(expected);
this.template.setMessageConverter(new GenericMessageConverter());
try {
this.template.receiveAndConvert(Writer.class);
}
catch (MessageConversionException ex) {
assertTrue("Invalid exception message '" + ex.getMessage() + "'", ex.getMessage().contains("payload"));
assertSame(expected, ex.getFailedMessage());
}
}
@Test
public void incompleteCommand() {
BufferingStompDecoder stompDecoder = new BufferingStompDecoder(STOMP_DECODER, 128);
String chunk = "MESSAG";
List<Message<byte[]>> messages = stompDecoder.decode(toByteBuffer(chunk));
assertEquals(0, messages.size());
}
@Aggregator(inputChannel = Sink.INPUT, outputChannel = AggregatorApplication.TEMP_INPUT)
public String receive(List<Message> messageList) {
Random random = new Random();
int batch = random.nextInt(100000);
for (Message msg : messageList) {
System.out.println(batch + " === " + msg);
}
return "aggregator result[" + batch + "]";
}
@Test
public void getMatchingCondition() {
Message<?> message = message(SimpMessageType.MESSAGE);
SimpMessageTypeMessageCondition condition = condition(SimpMessageType.MESSAGE);
SimpMessageTypeMessageCondition actual = condition.getMatchingCondition(message);
assertNotNull(actual);
assertEquals(SimpMessageType.MESSAGE, actual.getMessageType());
}
@Test
public void copyHeadersIfAbsent() {
Message<String> message1 = MessageBuilder.withPayload("test1")
.setHeader("foo", "bar").build();
Message<String> message2 = MessageBuilder.withPayload("test2")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();
assertEquals("test2", message2.getPayload());
assertEquals(123, message2.getHeaders().get("foo"));
}