org.springframework.util.IdGenerator#org.springframework.messaging.MessageHeaders源码实例Demo

下面列出了org.springframework.util.IdGenerator#org.springframework.messaging.MessageHeaders 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

private MessageHeaders getHeaders(HttpRequestMessage<I> event) {
	Map<String, Object> headers = new HashMap<String, Object>();

	if (event.getHeaders() != null) {
		headers.putAll(event.getHeaders());
	}
	if (event.getQueryParameters() != null) {
		headers.putAll(event.getQueryParameters());
	}
	if (event.getUri() != null) {
		headers.put("path", event.getUri().getPath());
	}

	if (event.getHttpMethod() != null) {
		headers.put("httpMethod", event.getHttpMethod().toString());
	}

	headers.put("request", event.getBody());
	return new MessageHeaders(headers);
}
 
@Override
public MimeType resolve(MessageHeaders headers) {
	if (headers == null || headers.get(MessageHeaders.CONTENT_TYPE) == null) {
		return this.defaultMimeType;
	}
	Object value = headers.get(MessageHeaders.CONTENT_TYPE);
	if (value instanceof MimeType) {
		return (MimeType) value;
	}
	else if (value instanceof String) {
		return MimeType.valueOf((String) value);
	}
	else {
		throw new IllegalArgumentException(
				"Unknown type for contentType header value: " + value.getClass());
	}
}
 
@Test
void send_WithDestinationNameAndMessage_shouldResolveTheDestinationAndSendTheMessage()
		throws Exception {
	// Arrange
	MessageSendingTemplateTest messageSendingTemplate = new MessageSendingTemplateTest(
			this.destinationResolver);
	when(this.destinationResolver.resolveDestination("destination"))
			.thenReturn("resolvedDestination");

	Map<String, Object> headers = Collections.singletonMap("headerKey",
			"headerValue");
	String payload = "payload";

	// Act
	messageSendingTemplate.send("destination",
			MessageBuilder.createMessage(payload, new MessageHeaders(headers)));

	// Assert
	verify(this.destinationResolver).resolveDestination("destination");
	assertThat(
			messageSendingTemplate.getMessageChannel().getSentMessage().getPayload())
					.isEqualTo(payload);
	assertThat(messageSendingTemplate.getMessageChannel().getSentMessage()
			.getHeaders().get("headerKey")).isEqualTo(headers.get("headerKey"));
}
 
@Test
void receiveMessage_methodWithMessageAsParameter_parameterIsConverted() {
	new ApplicationContextRunner()
			.withConfiguration(UserConfigurations
					.of(QueueMessageHandlerWithJacksonMappingConfiguration.class))
			.withBean(IncomingMessageHandlerWithMessageParameter.class)
			.run((context) -> {
				DummyKeyValueHolder messagePayload = new DummyKeyValueHolder("myKey",
						"A value");
				MappingJackson2MessageConverter jsonMapper = context
						.getBean(MappingJackson2MessageConverter.class);
				Message<?> message = jsonMapper.toMessage(messagePayload,
						new MessageHeaders(Collections.singletonMap(
								QueueMessageHandler.LOGICAL_RESOURCE_ID,
								"testQueue")));

				MessageHandler messageHandler = context.getBean(MessageHandler.class);
				messageHandler.handleMessage(message);

				IncomingMessageHandlerWithMessageParameter messageListener = context
						.getBean(IncomingMessageHandlerWithMessageParameter.class);
				assertThat(messageListener.getLastReceivedMessage()).isNotNull();
				assertThat(messageListener.getLastReceivedMessage().getPayload())
						.isEqualTo(messagePayload);
			});
}
 
private void sendSystemSubscriptions() {
	int i = 0;
	for (String destination : getSystemSubscriptions().keySet()) {
		StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
		accessor.setSubscriptionId(String.valueOf(i++));
		accessor.setDestination(destination);
		if (logger.isDebugEnabled()) {
			logger.debug("Subscribing to " + destination + " on \"system\" connection.");
		}
		TcpConnection<byte[]> conn = getTcpConnection();
		if (conn != null) {
			MessageHeaders headers = accessor.getMessageHeaders();
			conn.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).addCallback(
					new ListenableFutureCallback<Void>() {
						public void onSuccess(Void result) {
						}
						public void onFailure(Throwable ex) {
							String error = "Failed to subscribe in \"system\" session.";
							handleTcpConnectionFailure(error, ex);
						}
					});
		}
	}
}
 
@Test
public void leaveMutableDefaultBehavior() {
	MessageHeaderAccessor accessor = new MessageHeaderAccessor();
	accessor.setHeader("foo", "bar");
	MessageHeaders headers = accessor.getMessageHeaders();
	Message<?> message = MessageBuilder.createMessage("payload", headers);

	this.thrown.expect(IllegalStateException.class);
	this.thrown.expectMessage("Already immutable");
	accessor.setLeaveMutable(true);

	this.thrown.expect(IllegalStateException.class);
	this.thrown.expectMessage("Already immutable");
	accessor.setHeader("foo", "baz");

	assertEquals("bar", headers.get("foo"));
	assertSame(accessor, MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class));
}
 
@Test
public void systemSubscription() throws Exception {

	MessageHandler handler = mock(MessageHandler.class);
	this.brokerRelay.setSystemSubscriptions(Collections.singletonMap("/topic/foo", handler));
	this.brokerRelay.start();

	StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECTED);
	accessor.setLeaveMutable(true);
	MessageHeaders headers = accessor.getMessageHeaders();
	this.tcpClient.handleMessage(MessageBuilder.createMessage(new byte[0], headers));

	assertEquals(2, this.tcpClient.getSentMessages().size());
	assertEquals(StompCommand.CONNECT, this.tcpClient.getSentHeaders(0).getCommand());
	assertEquals(StompCommand.SUBSCRIBE, this.tcpClient.getSentHeaders(1).getCommand());
	assertEquals("/topic/foo", this.tcpClient.getSentHeaders(1).getDestination());

	Message<byte[]> message = message(StompCommand.MESSAGE, null, null, "/topic/foo");
	this.tcpClient.handleMessage(message);

	ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
	verify(handler).handleMessage(captor.capture());
	assertSame(message, captor.getValue());
}
 
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void completableFutureSuccess() {
	Message emptyMessage = MessageBuilder.withPayload(new byte[0]).build();
	given(this.channel.send(any(Message.class))).willReturn(true);
	given(this.converter.toMessage(any(), any(MessageHeaders.class))).willReturn(emptyMessage);

	CompletableFutureController controller = new CompletableFutureController();
	this.messageHandler.registerHandler(controller);
	this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/"));

	Message<?> message = createMessage("/app1/completable-future");
	this.messageHandler.handleMessage(message);

	assertNotNull(controller.future);
	controller.future.complete("foo");
	verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class));
	assertEquals("foo", this.payloadCaptor.getValue());
}
 
@Test
public void testDefaultRoutingFunctionBindingFlux() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					RoutingFunctionConfiguration.class))
							.web(WebApplicationType.NONE)
							.run("--spring.jmx.enabled=false",
									"--spring.cloud.stream.function.routing.enabled=true")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder
				.withPayload("Hello".getBytes())
				.setHeader("spring.cloud.function.definition", "echoFlux")
				.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
				.build();
		inputDestination.send(inputMessage);

		TestChannelBinder binder = context.getBean(TestChannelBinder.class);
		Throwable ex = ((Exception) binder.getLastError().getPayload()).getCause();
		assertThat(ex).isInstanceOf(IllegalStateException.class);
		assertThat(ex.getMessage()).isEqualTo("Routing to functions that return Publisher is not supported in the context of Spring Cloud Stream.");
	}
}
 
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void monoSuccess() {
	Message emptyMessage = MessageBuilder.withPayload(new byte[0]).build();
	given(this.channel.send(any(Message.class))).willReturn(true);
	given(this.converter.toMessage(any(), any(MessageHeaders.class))).willReturn(emptyMessage);

	ReactiveController controller = new ReactiveController();
	this.messageHandler.registerHandler(controller);
	this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/"));

	Message<?> message = createMessage("/app1/mono");
	this.messageHandler.handleMessage(message);

	assertNotNull(controller.mono);
	controller.mono.onNext("foo");
	verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class));
	assertEquals("foo", this.payloadCaptor.getValue());
}
 
@Nullable
private ParseResult parseSubscriptionMessage(Message<?> message, String sourceDestination) {
	MessageHeaders headers = message.getHeaders();
	String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
	if (sessionId == null) {
		logger.error("No session id. Ignoring " + message);
		return null;
	}
	int prefixEnd = this.prefix.length() - 1;
	String actualDestination = sourceDestination.substring(prefixEnd);
	if (isRemoveLeadingSlash()) {
		actualDestination = actualDestination.substring(1);
	}
	Principal principal = SimpMessageHeaderAccessor.getUser(headers);
	String user = (principal != null ? principal.getName() : null);
	Set<String> sessionIds = Collections.singleton(sessionId);
	return new ParseResult(sourceDestination, actualDestination, sourceDestination, sessionIds, user);
}
 
@Test
public void convertAndSendWithMutableSimpMessageHeaders() {
	SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create();
	accessor.setHeader("key", "value");
	accessor.setNativeHeader("fooNative", "barNative");
	accessor.setLeaveMutable(true);
	MessageHeaders headers = accessor.getMessageHeaders();

	this.messagingTemplate.convertAndSend("/foo", "data", headers);

	List<Message<byte[]>> messages = this.messageChannel.getMessages();
	Message<byte[]> message = messages.get(0);

	assertSame(headers, message.getHeaders());
	assertFalse(accessor.isMutable());
}
 
@Test
public void testImperativeFunction() {
	FunctionCatalog catalog = this.configureCatalog();

	Function<String, String> asIs = catalog.lookup("uppercase");
	assertThat(asIs.apply("uppercase")).isEqualTo("UPPERCASE");

	Function<Flux<String>, Flux<String>> asFlux = catalog.lookup("uppercase");
	List<String> result = asFlux.apply(Flux.just("uppercaseFlux", "uppercaseFlux2")).collectList().block();
	assertThat(result.get(0)).isEqualTo("UPPERCASEFLUX");
	assertThat(result.get(1)).isEqualTo("UPPERCASEFLUX2");

	Function<Flux<Message<byte[]>>, Flux<Message<byte[]>>> messageFlux = catalog.lookup("uppercase", "application/json");
	Message<byte[]> message1 = MessageBuilder.withPayload("\"uppercaseFlux\"".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build();
	Message<byte[]> message2 = MessageBuilder.withPayload("\"uppercaseFlux2\"".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build();
	List<Message<byte[]>> messageResult = messageFlux.apply(Flux.just(message1, message2)).collectList().block();
	assertThat(messageResult.get(0).getPayload()).isEqualTo("\"UPPERCASEFLUX\"".getBytes(StandardCharsets.UTF_8));
	assertThat(messageResult.get(1).getPayload()).isEqualTo("\"UPPERCASEFLUX2\"".getBytes(StandardCharsets.UTF_8));
}
 
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void monoFailure() {
	Message emptyMessage = MessageBuilder.withPayload(new byte[0]).build();
	given(this.channel.send(any(Message.class))).willReturn(true);
	given(this.converter.toMessage(any(), any(MessageHeaders.class))).willReturn(emptyMessage);

	ReactiveController controller = new ReactiveController();
	this.messageHandler.registerHandler(controller);
	this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/"));

	Message<?> message = createMessage("/app1/mono");
	this.messageHandler.handleMessage(message);

	controller.mono.onError(new IllegalStateException());
	assertTrue(controller.exceptionCaught);
}
 
@Test
public void testHeadersToSend() throws Exception {
	Message<?> message = createMessage("sess1", "sub1", "/app", "/dest", null);

	SimpMessageSendingOperations messagingTemplate = Mockito.mock(SimpMessageSendingOperations.class);
	SendToMethodReturnValueHandler handler = new SendToMethodReturnValueHandler(messagingTemplate, false);

	handler.handleReturnValue(PAYLOAD, this.noAnnotationsReturnType, message);

	ArgumentCaptor<MessageHeaders> captor = ArgumentCaptor.forClass(MessageHeaders.class);
	verify(messagingTemplate).convertAndSend(eq("/topic/dest"), eq(PAYLOAD), captor.capture());

	MessageHeaders headers = captor.getValue();
	SimpMessageHeaderAccessor accessor =
			MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class);
	assertNotNull(accessor);
	assertTrue(accessor.isMutable());
	assertEquals("sess1", accessor.getSessionId());
	assertNull("Subscription id should not be copied", accessor.getSubscriptionId());
	assertEquals(this.noAnnotationsReturnType,
			accessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
 
@Test
public void testMultiOutputAsTuplePojoInInputByteArrayInputTypePojoMessage() {
	FunctionCatalog catalog = this.configureCatalog();
	Function<Flux<Message<byte[]>>, Tuple3<Flux<Person>, Flux<String>, Flux<Integer>>> multiOutputFunction =
								catalog.lookup("multiOutputAsTupleMessageIn");

	Message<byte[]> uncleSam = MessageBuilder.withPayload("{\"name\":\"Uncle Sam\",\"id\":1}".getBytes(StandardCharsets.UTF_8))
			.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
			.build();
	Message<byte[]> unclePierre = MessageBuilder.withPayload("{\"name\":\"Oncle Pierre\",\"id\":2}".getBytes(StandardCharsets.UTF_8))
			.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
			.build();
	Flux<Message<byte[]>> personStream = Flux.just(uncleSam, unclePierre);

	Tuple3<Flux<Person>, Flux<String>, Flux<Integer>> result = multiOutputFunction.apply(personStream);
	result.getT1().subscribe(v -> System.out.println("=> 1: " + v));
	result.getT2().subscribe(v -> System.out.println("=> 2: " + v));
	result.getT3().subscribe(v -> System.out.println("=> 3: " + v));
}
 
@Test
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testHeadersPassedToMessagingTemplate() throws Exception {
	String sessionId = "sess1";
	String subscriptionId = "subs1";
	String destination = "/dest";
	Message<?> inputMessage = createInputMessage(sessionId, subscriptionId, destination, null);

	MessageSendingOperations messagingTemplate = Mockito.mock(MessageSendingOperations.class);
	SubscriptionMethodReturnValueHandler handler = new SubscriptionMethodReturnValueHandler(messagingTemplate);

	handler.handleReturnValue(PAYLOAD, this.subscribeEventReturnType, inputMessage);

	ArgumentCaptor<MessageHeaders> captor = ArgumentCaptor.forClass(MessageHeaders.class);
	verify(messagingTemplate).convertAndSend(eq("/dest"), eq(PAYLOAD), captor.capture());

	SimpMessageHeaderAccessor headerAccessor =
			MessageHeaderAccessor.getAccessor(captor.getValue(), SimpMessageHeaderAccessor.class);

	assertNotNull(headerAccessor);
	assertTrue(headerAccessor.isMutable());
	assertEquals(sessionId, headerAccessor.getSessionId());
	assertEquals(subscriptionId, headerAccessor.getSubscriptionId());
	assertEquals(this.subscribeEventReturnType, headerAccessor.getHeader(SimpMessagingTemplate.CONVERSION_HINT_HEADER));
}
 
/**
 * Add the outbound prefix if necessary.
 * <p>Convert {@link MessageHeaders#CONTENT_TYPE} to {@code content_type} for JMS compliance.
 * @see #CONTENT_TYPE_PROPERTY
 */
@Override
protected String fromHeaderName(String headerName) {
	if (MessageHeaders.CONTENT_TYPE.equals(headerName)) {
		return CONTENT_TYPE_PROPERTY;
	}
	return super.fromHeaderName(headerName);
}
 
@Test
public void supportsMimeTypeNotSupported() {
	Message<String> message = MessageBuilder.withPayload(
			"ABC").setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build();

	assertNull(this.converter.fromMessage(message, String.class));
}
 
@Override
protected boolean canConvertTo(Object payload, @Nullable MessageHeaders headers) {
	if (!supportsMimeType(headers)) {
		return false;
	}
	return true;
}
 
@Test
public void supportsMimeTypeNoneConfigured() {

	Message<String> message = MessageBuilder.withPayload(
			"ABC").setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build();

	this.converter = new TestMessageConverter(Collections.<MimeType>emptyList());
	this.converter.setContentTypeResolver(new DefaultContentTypeResolver());

	assertEquals("success-from", this.converter.fromMessage(message, String.class));
}
 
@Override
@Nullable
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers,
		@Nullable Object conversionHint) {

	try {
		Class<?> view = getSerializationView(conversionHint);
		if (byte[].class == getSerializedPayloadClass()) {
			ByteArrayOutputStream out = new ByteArrayOutputStream(1024);
			JsonEncoding encoding = getJsonEncoding(getMimeType(headers));
			JsonGenerator generator = this.objectMapper.getFactory().createGenerator(out, encoding);
			if (view != null) {
				this.objectMapper.writerWithView(view).writeValue(generator, payload);
			}
			else {
				this.objectMapper.writeValue(generator, payload);
			}
			payload = out.toByteArray();
		}
		else {
			Writer writer = new StringWriter();
			if (view != null) {
				this.objectMapper.writerWithView(view).writeValue(writer, payload);
			}
			else {
				this.objectMapper.writeValue(writer, payload);
			}
			payload = writer.toString();
		}
	}
	catch (IOException ex) {
		throw new MessageConversionException("Could not write JSON: " + ex.getMessage(), ex);
	}
	return payload;
}
 
@Override
public Message<?> toMessage(Object payload, @Nullable MessageHeaders headers) {
	if (headers != null) {
		MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(headers, MessageHeaderAccessor.class);
		if (accessor != null && accessor.isMutable()) {
			return MessageBuilder.createMessage(payload, accessor.getMessageHeaders());
		}
	}
	return MessageBuilder.withPayload(payload).copyHeaders(headers).build();
}
 
@Override
@Nullable
protected Object convertToInternal(
		Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {

	return payload;
}
 
@Test
public void toMessageWithPayloadAndHeaders() {
	MessageHeaders headers = new MessageHeaders(Collections.<String, Object>singletonMap("foo", "bar"));
	Message<?> message = this.converter.toMessage("payload", headers);

	assertEquals("payload", message.getPayload());
	assertEquals("bar", message.getHeaders().get("foo"));
}
 
@Override
@Nullable
public Message<?> toMessage(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
	for (MessageConverter converter : getConverters()) {
		Message<?> result = (converter instanceof SmartMessageConverter ?
				((SmartMessageConverter) converter).toMessage(payload, headers, conversionHint) :
				converter.toMessage(payload, headers));
		if (result != null) {
			return result;
		}
	}
	return null;
}
 
@Test
public void canConvertFromStrictContentTypeMatch() {
	this.converter = new TestMessageConverter(Arrays.asList(MimeTypeUtils.TEXT_PLAIN));
	this.converter.setStrictContentTypeMatch(true);

	Message<String> message = MessageBuilder.withPayload("ABC").build();
	assertFalse(this.converter.canConvertFrom(message, String.class));

	message = MessageBuilder.withPayload("ABC")
			.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build();
	assertTrue(this.converter.canConvertFrom(message, String.class));

}
 
@Test
public void testSerializationWithCompatibleWildcardAcceptHeader() {
	MimeType acceptableType = MimeType.valueOf("*/*");

	Message<?> result = NegotiatingMessageConverterWrapper.wrap(new NaiveCsvTupleMessageConverter())
		.toMessage(somePayload, new MessageHeaders(newHashMap(ACCEPT, acceptableType)));

	assertMessageContent(result, "text/csv", expectedSerializedPayload);
}
 
@Test
public void supportsMimeTypeNoneConfigured() {
	Message<String> message = MessageBuilder.withPayload(
			"ABC").setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build();
	this.converter = new TestMessageConverter(Collections.<MimeType>emptyList());

	assertEquals("success-from", this.converter.fromMessage(message, String.class));
}
 
源代码30 项目: rocketmq-spring   文件: RocketMQUtil.java
private static void addUserProperties(Map<String, String> properties, MessageBuilder messageBuilder) {
    if (!CollectionUtils.isEmpty(properties)) {
        properties.forEach((key, val) -> {
            if (!MessageConst.STRING_HASH_SET.contains(key) && !MessageHeaders.ID.equals(key)
                && !MessageHeaders.TIMESTAMP.equals(key) &&
                (!key.startsWith(RocketMQHeaders.PREFIX) || !MessageConst.STRING_HASH_SET.contains(key.replaceFirst("^" + RocketMQHeaders.PREFIX, "")))) {
                messageBuilder.setHeader(key, val);
            }
        });
    }
}