类org.springframework.messaging.support.NativeMessageHeaderAccessor源码实例Demo

下面列出了怎么用org.springframework.messaging.support.NativeMessageHeaderAccessor的API类实例代码及写法,或者点击链接到github查看源代码。

@Test
public void convertAndSendWithCustomHeaderNonNative() {
	Map<String, Object> headers = new HashMap<>();
	headers.put("key", "value");
	headers.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, new LinkedMultiValueMap<String, String>());
	this.messagingTemplate.convertAndSend("/foo", "data", headers);

	List<Message<byte[]>> messages = this.messageChannel.getMessages();

	SimpMessageHeaderAccessor headerAccessor =
			MessageHeaderAccessor.getAccessor(messages.get(0), SimpMessageHeaderAccessor.class);

	assertNotNull(headerAccessor);
	assertEquals("value", headerAccessor.toMap().get("key"));
	assertNull(headerAccessor.getNativeHeader("key"));
}
 
源代码2 项目: WeEvent   文件: WebSocketTransport.java
@SuppressWarnings("unchecked")
private void handleMessageFrame(StompHeaderAccessor stompHeaderAccessor, Message<byte[]> stompMsg) {
    String subscriptionId = stompHeaderAccessor.getFirstNativeHeader("subscription-id");

    // custom properties, eventId is in native header
    Map<String, String> extensions = new HashMap<>();
    for (Map.Entry<String, List<String>> entry : ((Map<String, List<String>>) stompMsg.getHeaders().get(NativeMessageHeaderAccessor.NATIVE_HEADERS)).entrySet()) {
        if (entry.getKey().startsWith("weevent-")) {
            extensions.put(entry.getKey(), entry.getValue().get(0));
        }
    }

    WeEvent event = new WeEvent(stompHeaderAccessor.getDestination(), stompMsg.getPayload(), extensions);
    event.setEventId(stompHeaderAccessor.getFirstNativeHeader("eventId"));
    log.info("received: {}", event);

    if (this.subscription2EventCache.containsKey(subscriptionId)) {
        IWeEventClient.EventListener listener = this.subscription2EventCache.get(subscriptionId).getSecond();
        listener.onEvent(event);
        // update the cache eventId
        this.subscription2EventCache.get(subscriptionId).getFirst().setOffset(event.getEventId());
    }
}
 
@Test
public void convertAndSendWithCustomHeaderNonNative() {
	Map<String, Object> headers = new HashMap<>();
	headers.put("key", "value");
	headers.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, new LinkedMultiValueMap<String, String>());
	this.messagingTemplate.convertAndSend("/foo", "data", headers);

	List<Message<byte[]>> messages = this.messageChannel.getMessages();

	SimpMessageHeaderAccessor headerAccessor =
			MessageHeaderAccessor.getAccessor(messages.get(0), SimpMessageHeaderAccessor.class);

	assertNotNull(headerAccessor);
	assertEquals("value", headerAccessor.toMap().get("key"));
	assertNull(headerAccessor.getNativeHeader("key"));
}
 
@SuppressWarnings("unchecked")
private Message<?> preSendServerSpan(final Message<?> message) {
  final String destination = (String)message.getHeaders().get(SIMP_DESTINATION);
  final SpanBuilder spanBuilder = tracer
    .buildSpan(destination != null ? destination : UNKNOWN_DESTINATION)
    .withTag(Tags.SPAN_KIND.getKey(), spanKind)
    .withTag(Tags.COMPONENT.getKey(), WEBSOCKET);

  final Map<String,List<String>> nativeHeaders = (Map<String,List<String>>)message.getHeaders().get(NativeMessageHeaderAccessor.NATIVE_HEADERS);
  SpanContext spanContext = null;
  if (nativeHeaders != null)
    spanContext = tracer.extract(Builtin.TEXT_MAP, new NativeHeadersExtractAdapter(nativeHeaders));

  if (spanContext == null)
    spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(message.getHeaders()));

  if (spanContext != null)
    spanBuilder.asChildOf(spanContext);

  final Span span = spanBuilder.start();
  return MessageBuilder.fromMessage(message).setHeader(OPENTRACING_SPAN, span).build();
}
 
@Test
public void convertAndSendWithCustomHeaderNonNative() {
	Map<String, Object> headers = new HashMap<>();
	headers.put("key", "value");
	headers.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, new LinkedMultiValueMap<String, String>());
	this.messagingTemplate.convertAndSend("/foo", "data", headers);

	List<Message<byte[]>> messages = this.messageChannel.getMessages();

	SimpMessageHeaderAccessor headerAccessor =
			MessageHeaderAccessor.getAccessor(messages.get(0), SimpMessageHeaderAccessor.class);

	assertNotNull(headerAccessor);
	assertEquals("value", headerAccessor.toMap().get("key"));
	assertNull(headerAccessor.getNativeHeader("key"));
}
 
@Test
public void producerConsidersOldSpanIds_nativeHeaders() {
	this.channel.addInterceptor(producerSideOnly(this.interceptor));

	NativeMessageHeaderAccessor accessor = new NativeMessageHeaderAccessor() {
	};

	accessor.setNativeHeader("b3",
			"000000000000000a-000000000000000b-1-000000000000000a");

	this.channel.send(MessageBuilder.withPayload("foo")
			.copyHeaders(accessor.toMessageHeaders()).build());

	TraceContext receiveContext = parseB3SingleFormat(
			((List) ((Map) this.channel.receive().getHeaders().get(NATIVE_HEADERS))
					.get("b3")).get(0).toString()).context();
	assertThat(receiveContext.parentIdString()).isEqualTo("000000000000000b");
}
 
@Test
public void testFilterHeaders() {
	PubSubHeaderMapper mapper = new PubSubHeaderMapper();
	Map<String, Object> originalHeaders = new HashMap<>();
	originalHeaders.put("my header", "pantagruel's nativity");
	originalHeaders.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, "deerhunter");
	MessageHeaders internalHeaders = new MessageHeaders(originalHeaders);

	Map<String, String> filteredHeaders = new HashMap<>();
	mapper.fromHeaders(internalHeaders, filteredHeaders);
	assertThat(filteredHeaders.size()).isEqualTo(1);
	assertThat(filteredHeaders.get("my header")).isEqualTo("pantagruel's nativity");
}
 
@Test
public void testSkipWrongValueTypeForGet() {
	MessageHeaderAccessor carrier = carrier();
	carrier.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS,
			"{spanTraceId=[123], spanId=[456], spanSampled=[0]}");
	MessageHeaderPropagation.INSTANCE.get(carrier, "b3");
}
 
@Test
public void testSkipWrongValueTypeForRemoval() {
	MessageHeaderAccessor carrier = carrier();
	carrier.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS,
			"{spanTraceId=[123], spanId=[456], spanSampled=[0]}");
	MessageHeaderPropagation.removeAnyTraceHeaders(carrier,
			Collections.singletonList("b3"));
}
 
@Test
public void testSkipWrongValueTypeForPut() {
	MessageHeaderAccessor carrier = carrier();
	carrier.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS,
			"{spanTraceId=[123], spanId=[456], spanSampled=[0]}");
	MessageHeaderPropagation.INSTANCE.put(carrier, "b3", "1234");
}
 
源代码11 项目: spring-analysis-note   文件: StompEncoder.java
private void writeHeaders(StompCommand command, Map<String, Object> headers, byte[] payload,
		DataOutputStream output) throws IOException {

	@SuppressWarnings("unchecked")
	Map<String,List<String>> nativeHeaders =
			(Map<String, List<String>>) headers.get(NativeMessageHeaderAccessor.NATIVE_HEADERS);

	if (logger.isTraceEnabled()) {
		logger.trace("Encoding STOMP " + command + ", headers=" + nativeHeaders);
	}

	if (nativeHeaders == null) {
		return;
	}

	boolean shouldEscape = (command != StompCommand.CONNECT && command != StompCommand.STOMP
			&& command != StompCommand.CONNECTED);

	for (Entry<String, List<String>> entry : nativeHeaders.entrySet()) {
		if (command.requiresContentLength() && "content-length".equals(entry.getKey())) {
			continue;
		}

		List<String> values = entry.getValue();
		if ((StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) &&
				StompHeaderAccessor.STOMP_PASSCODE_HEADER.equals(entry.getKey())) {
			values = Collections.singletonList(StompHeaderAccessor.getPasscode(headers));
		}

		byte[] encodedKey = encodeHeaderKey(entry.getKey(), shouldEscape);
		for (String value : values) {
			output.write(encodedKey);
			output.write(COLON);
			output.write(encodeHeaderValue(value, shouldEscape));
			output.write(LF);
		}
	}

	if (command.requiresContentLength()) {
		int contentLength = payload.length;
		output.write("content-length:".getBytes(StandardCharsets.UTF_8));
		output.write(Integer.toString(contentLength).getBytes(StandardCharsets.UTF_8));
		output.write(LF);
	}
}
 
@SuppressWarnings("unchecked")
@Nullable
private Map<String, List<String>> getNativeHeaders(Message<?> message) {
	return (Map<String, List<String>>) message.getHeaders().get(NativeMessageHeaderAccessor.NATIVE_HEADERS);
}
 
@SuppressWarnings("unchecked")
@Nullable
private Map<String, List<String>> getNativeHeaders(Message<?> message) {
	return (Map<String, List<String>>) message.getHeaders().get(NativeMessageHeaderAccessor.NATIVE_HEADERS);
}
 
源代码14 项目: java-technology-stack   文件: StompEncoder.java
private void writeHeaders(StompCommand command, Map<String, Object> headers, byte[] payload,
		DataOutputStream output) throws IOException {

	@SuppressWarnings("unchecked")
	Map<String,List<String>> nativeHeaders =
			(Map<String, List<String>>) headers.get(NativeMessageHeaderAccessor.NATIVE_HEADERS);

	if (logger.isTraceEnabled()) {
		logger.trace("Encoding STOMP " + command + ", headers=" + nativeHeaders);
	}

	if (nativeHeaders == null) {
		return;
	}

	boolean shouldEscape = (command != StompCommand.CONNECT && command != StompCommand.CONNECTED);

	for (Entry<String, List<String>> entry : nativeHeaders.entrySet()) {
		if (command.requiresContentLength() && "content-length".equals(entry.getKey())) {
			continue;
		}

		List<String> values = entry.getValue();
		if (StompCommand.CONNECT.equals(command) &&
				StompHeaderAccessor.STOMP_PASSCODE_HEADER.equals(entry.getKey())) {
			values = Collections.singletonList(StompHeaderAccessor.getPasscode(headers));
		}

		byte[] encodedKey = encodeHeaderKey(entry.getKey(), shouldEscape);
		for (String value : values) {
			output.write(encodedKey);
			output.write(COLON);
			output.write(encodeHeaderValue(value, shouldEscape));
			output.write(LF);
		}
	}

	if (command.requiresContentLength()) {
		int contentLength = payload.length;
		output.write("content-length:".getBytes(StandardCharsets.UTF_8));
		output.write(Integer.toString(contentLength).getBytes(StandardCharsets.UTF_8));
		output.write(LF);
	}
}
 
@SuppressWarnings("unchecked")
private Map<String, List<String>> getNativeHeaders(Message<?> message) {
	return (Map<String, List<String>>) message.getHeaders().get(
			NativeMessageHeaderAccessor.NATIVE_HEADERS);
}
 
源代码16 项目: spring4-understanding   文件: StompEncoder.java
private void writeHeaders(StompCommand command, Map<String, Object> headers, byte[] payload, DataOutputStream output)
		throws IOException {

	@SuppressWarnings("unchecked")
	Map<String,List<String>> nativeHeaders =
			(Map<String, List<String>>) headers.get(NativeMessageHeaderAccessor.NATIVE_HEADERS);

	if (logger.isTraceEnabled()) {
		logger.trace("Encoding STOMP " + command + ", headers=" + nativeHeaders);
	}

	if (nativeHeaders == null) {
		return;
	}

	boolean shouldEscape = (command != StompCommand.CONNECT && command != StompCommand.CONNECTED);

	for (Entry<String, List<String>> entry : nativeHeaders.entrySet()) {
		byte[] key = encodeHeaderString(entry.getKey(), shouldEscape);
		if (command.requiresContentLength() && "content-length".equals(entry.getKey())) {
			continue;
		}
		List<String> values = entry.getValue();
		if (StompCommand.CONNECT.equals(command) &&
				StompHeaderAccessor.STOMP_PASSCODE_HEADER.equals(entry.getKey())) {
			values = Arrays.asList(StompHeaderAccessor.getPasscode(headers));
		}
		for (String value : values) {
			output.write(key);
			output.write(COLON);
			output.write(encodeHeaderString(value, shouldEscape));
			output.write(LF);
		}
	}
	if (command.requiresContentLength()) {
		int contentLength = payload.length;
		output.write("content-length:".getBytes(StompDecoder.UTF8_CHARSET));
		output.write(Integer.toString(contentLength).getBytes(StompDecoder.UTF8_CHARSET));
		output.write(LF);
	}
}
 
@SuppressWarnings("unchecked")
private Map<String, List<String>> getNativeHeaders(Message<?> message) {
	return (Map<String, List<String>>) message.getHeaders().get(
			NativeMessageHeaderAccessor.NATIVE_HEADERS);
}
 
@Override
protected Iterable<String> read(MessageHeaderAccessor carrier, String key) {
	return ((NativeMessageHeaderAccessor) carrier).getNativeHeader(key);
}
 
 类方法
 同包方法