下面列出了怎么用org.springframework.messaging.support.NativeMessageHeaderAccessor的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void convertAndSendWithCustomHeaderNonNative() {
Map<String, Object> headers = new HashMap<>();
headers.put("key", "value");
headers.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, new LinkedMultiValueMap<String, String>());
this.messagingTemplate.convertAndSend("/foo", "data", headers);
List<Message<byte[]>> messages = this.messageChannel.getMessages();
SimpMessageHeaderAccessor headerAccessor =
MessageHeaderAccessor.getAccessor(messages.get(0), SimpMessageHeaderAccessor.class);
assertNotNull(headerAccessor);
assertEquals("value", headerAccessor.toMap().get("key"));
assertNull(headerAccessor.getNativeHeader("key"));
}
@SuppressWarnings("unchecked")
private void handleMessageFrame(StompHeaderAccessor stompHeaderAccessor, Message<byte[]> stompMsg) {
String subscriptionId = stompHeaderAccessor.getFirstNativeHeader("subscription-id");
// custom properties, eventId is in native header
Map<String, String> extensions = new HashMap<>();
for (Map.Entry<String, List<String>> entry : ((Map<String, List<String>>) stompMsg.getHeaders().get(NativeMessageHeaderAccessor.NATIVE_HEADERS)).entrySet()) {
if (entry.getKey().startsWith("weevent-")) {
extensions.put(entry.getKey(), entry.getValue().get(0));
}
}
WeEvent event = new WeEvent(stompHeaderAccessor.getDestination(), stompMsg.getPayload(), extensions);
event.setEventId(stompHeaderAccessor.getFirstNativeHeader("eventId"));
log.info("received: {}", event);
if (this.subscription2EventCache.containsKey(subscriptionId)) {
IWeEventClient.EventListener listener = this.subscription2EventCache.get(subscriptionId).getSecond();
listener.onEvent(event);
// update the cache eventId
this.subscription2EventCache.get(subscriptionId).getFirst().setOffset(event.getEventId());
}
}
@Test
public void convertAndSendWithCustomHeaderNonNative() {
Map<String, Object> headers = new HashMap<>();
headers.put("key", "value");
headers.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, new LinkedMultiValueMap<String, String>());
this.messagingTemplate.convertAndSend("/foo", "data", headers);
List<Message<byte[]>> messages = this.messageChannel.getMessages();
SimpMessageHeaderAccessor headerAccessor =
MessageHeaderAccessor.getAccessor(messages.get(0), SimpMessageHeaderAccessor.class);
assertNotNull(headerAccessor);
assertEquals("value", headerAccessor.toMap().get("key"));
assertNull(headerAccessor.getNativeHeader("key"));
}
@SuppressWarnings("unchecked")
private Message<?> preSendServerSpan(final Message<?> message) {
final String destination = (String)message.getHeaders().get(SIMP_DESTINATION);
final SpanBuilder spanBuilder = tracer
.buildSpan(destination != null ? destination : UNKNOWN_DESTINATION)
.withTag(Tags.SPAN_KIND.getKey(), spanKind)
.withTag(Tags.COMPONENT.getKey(), WEBSOCKET);
final Map<String,List<String>> nativeHeaders = (Map<String,List<String>>)message.getHeaders().get(NativeMessageHeaderAccessor.NATIVE_HEADERS);
SpanContext spanContext = null;
if (nativeHeaders != null)
spanContext = tracer.extract(Builtin.TEXT_MAP, new NativeHeadersExtractAdapter(nativeHeaders));
if (spanContext == null)
spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapExtractAdapter(message.getHeaders()));
if (spanContext != null)
spanBuilder.asChildOf(spanContext);
final Span span = spanBuilder.start();
return MessageBuilder.fromMessage(message).setHeader(OPENTRACING_SPAN, span).build();
}
@Test
public void convertAndSendWithCustomHeaderNonNative() {
Map<String, Object> headers = new HashMap<>();
headers.put("key", "value");
headers.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, new LinkedMultiValueMap<String, String>());
this.messagingTemplate.convertAndSend("/foo", "data", headers);
List<Message<byte[]>> messages = this.messageChannel.getMessages();
SimpMessageHeaderAccessor headerAccessor =
MessageHeaderAccessor.getAccessor(messages.get(0), SimpMessageHeaderAccessor.class);
assertNotNull(headerAccessor);
assertEquals("value", headerAccessor.toMap().get("key"));
assertNull(headerAccessor.getNativeHeader("key"));
}
@Test
public void producerConsidersOldSpanIds_nativeHeaders() {
this.channel.addInterceptor(producerSideOnly(this.interceptor));
NativeMessageHeaderAccessor accessor = new NativeMessageHeaderAccessor() {
};
accessor.setNativeHeader("b3",
"000000000000000a-000000000000000b-1-000000000000000a");
this.channel.send(MessageBuilder.withPayload("foo")
.copyHeaders(accessor.toMessageHeaders()).build());
TraceContext receiveContext = parseB3SingleFormat(
((List) ((Map) this.channel.receive().getHeaders().get(NATIVE_HEADERS))
.get("b3")).get(0).toString()).context();
assertThat(receiveContext.parentIdString()).isEqualTo("000000000000000b");
}
@Test
public void testFilterHeaders() {
PubSubHeaderMapper mapper = new PubSubHeaderMapper();
Map<String, Object> originalHeaders = new HashMap<>();
originalHeaders.put("my header", "pantagruel's nativity");
originalHeaders.put(NativeMessageHeaderAccessor.NATIVE_HEADERS, "deerhunter");
MessageHeaders internalHeaders = new MessageHeaders(originalHeaders);
Map<String, String> filteredHeaders = new HashMap<>();
mapper.fromHeaders(internalHeaders, filteredHeaders);
assertThat(filteredHeaders.size()).isEqualTo(1);
assertThat(filteredHeaders.get("my header")).isEqualTo("pantagruel's nativity");
}
@Test
public void testSkipWrongValueTypeForGet() {
MessageHeaderAccessor carrier = carrier();
carrier.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS,
"{spanTraceId=[123], spanId=[456], spanSampled=[0]}");
MessageHeaderPropagation.INSTANCE.get(carrier, "b3");
}
@Test
public void testSkipWrongValueTypeForRemoval() {
MessageHeaderAccessor carrier = carrier();
carrier.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS,
"{spanTraceId=[123], spanId=[456], spanSampled=[0]}");
MessageHeaderPropagation.removeAnyTraceHeaders(carrier,
Collections.singletonList("b3"));
}
@Test
public void testSkipWrongValueTypeForPut() {
MessageHeaderAccessor carrier = carrier();
carrier.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS,
"{spanTraceId=[123], spanId=[456], spanSampled=[0]}");
MessageHeaderPropagation.INSTANCE.put(carrier, "b3", "1234");
}
private void writeHeaders(StompCommand command, Map<String, Object> headers, byte[] payload,
DataOutputStream output) throws IOException {
@SuppressWarnings("unchecked")
Map<String,List<String>> nativeHeaders =
(Map<String, List<String>>) headers.get(NativeMessageHeaderAccessor.NATIVE_HEADERS);
if (logger.isTraceEnabled()) {
logger.trace("Encoding STOMP " + command + ", headers=" + nativeHeaders);
}
if (nativeHeaders == null) {
return;
}
boolean shouldEscape = (command != StompCommand.CONNECT && command != StompCommand.STOMP
&& command != StompCommand.CONNECTED);
for (Entry<String, List<String>> entry : nativeHeaders.entrySet()) {
if (command.requiresContentLength() && "content-length".equals(entry.getKey())) {
continue;
}
List<String> values = entry.getValue();
if ((StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) &&
StompHeaderAccessor.STOMP_PASSCODE_HEADER.equals(entry.getKey())) {
values = Collections.singletonList(StompHeaderAccessor.getPasscode(headers));
}
byte[] encodedKey = encodeHeaderKey(entry.getKey(), shouldEscape);
for (String value : values) {
output.write(encodedKey);
output.write(COLON);
output.write(encodeHeaderValue(value, shouldEscape));
output.write(LF);
}
}
if (command.requiresContentLength()) {
int contentLength = payload.length;
output.write("content-length:".getBytes(StandardCharsets.UTF_8));
output.write(Integer.toString(contentLength).getBytes(StandardCharsets.UTF_8));
output.write(LF);
}
}
@SuppressWarnings("unchecked")
@Nullable
private Map<String, List<String>> getNativeHeaders(Message<?> message) {
return (Map<String, List<String>>) message.getHeaders().get(NativeMessageHeaderAccessor.NATIVE_HEADERS);
}
@SuppressWarnings("unchecked")
@Nullable
private Map<String, List<String>> getNativeHeaders(Message<?> message) {
return (Map<String, List<String>>) message.getHeaders().get(NativeMessageHeaderAccessor.NATIVE_HEADERS);
}
private void writeHeaders(StompCommand command, Map<String, Object> headers, byte[] payload,
DataOutputStream output) throws IOException {
@SuppressWarnings("unchecked")
Map<String,List<String>> nativeHeaders =
(Map<String, List<String>>) headers.get(NativeMessageHeaderAccessor.NATIVE_HEADERS);
if (logger.isTraceEnabled()) {
logger.trace("Encoding STOMP " + command + ", headers=" + nativeHeaders);
}
if (nativeHeaders == null) {
return;
}
boolean shouldEscape = (command != StompCommand.CONNECT && command != StompCommand.CONNECTED);
for (Entry<String, List<String>> entry : nativeHeaders.entrySet()) {
if (command.requiresContentLength() && "content-length".equals(entry.getKey())) {
continue;
}
List<String> values = entry.getValue();
if (StompCommand.CONNECT.equals(command) &&
StompHeaderAccessor.STOMP_PASSCODE_HEADER.equals(entry.getKey())) {
values = Collections.singletonList(StompHeaderAccessor.getPasscode(headers));
}
byte[] encodedKey = encodeHeaderKey(entry.getKey(), shouldEscape);
for (String value : values) {
output.write(encodedKey);
output.write(COLON);
output.write(encodeHeaderValue(value, shouldEscape));
output.write(LF);
}
}
if (command.requiresContentLength()) {
int contentLength = payload.length;
output.write("content-length:".getBytes(StandardCharsets.UTF_8));
output.write(Integer.toString(contentLength).getBytes(StandardCharsets.UTF_8));
output.write(LF);
}
}
@SuppressWarnings("unchecked")
private Map<String, List<String>> getNativeHeaders(Message<?> message) {
return (Map<String, List<String>>) message.getHeaders().get(
NativeMessageHeaderAccessor.NATIVE_HEADERS);
}
private void writeHeaders(StompCommand command, Map<String, Object> headers, byte[] payload, DataOutputStream output)
throws IOException {
@SuppressWarnings("unchecked")
Map<String,List<String>> nativeHeaders =
(Map<String, List<String>>) headers.get(NativeMessageHeaderAccessor.NATIVE_HEADERS);
if (logger.isTraceEnabled()) {
logger.trace("Encoding STOMP " + command + ", headers=" + nativeHeaders);
}
if (nativeHeaders == null) {
return;
}
boolean shouldEscape = (command != StompCommand.CONNECT && command != StompCommand.CONNECTED);
for (Entry<String, List<String>> entry : nativeHeaders.entrySet()) {
byte[] key = encodeHeaderString(entry.getKey(), shouldEscape);
if (command.requiresContentLength() && "content-length".equals(entry.getKey())) {
continue;
}
List<String> values = entry.getValue();
if (StompCommand.CONNECT.equals(command) &&
StompHeaderAccessor.STOMP_PASSCODE_HEADER.equals(entry.getKey())) {
values = Arrays.asList(StompHeaderAccessor.getPasscode(headers));
}
for (String value : values) {
output.write(key);
output.write(COLON);
output.write(encodeHeaderString(value, shouldEscape));
output.write(LF);
}
}
if (command.requiresContentLength()) {
int contentLength = payload.length;
output.write("content-length:".getBytes(StompDecoder.UTF8_CHARSET));
output.write(Integer.toString(contentLength).getBytes(StompDecoder.UTF8_CHARSET));
output.write(LF);
}
}
@SuppressWarnings("unchecked")
private Map<String, List<String>> getNativeHeaders(Message<?> message) {
return (Map<String, List<String>>) message.getHeaders().get(
NativeMessageHeaderAccessor.NATIVE_HEADERS);
}
@Override
protected Iterable<String> read(MessageHeaderAccessor carrier, String key) {
return ((NativeMessageHeaderAccessor) carrier).getNativeHeader(key);
}