下面列出了org.springframework.http.server.reactive.AbstractServerHttpResponse#org.springframework.web.reactive.socket.WebSocketHandler 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Mono<Void> executeInternal(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
MonoProcessor<Void> completionMono = MonoProcessor.create();
return Mono.fromCallable(
() -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
List<String> protocols = handler.getSubProtocols();
DefaultConfigurator configurator = new DefaultConfigurator(requestHeaders);
Endpoint endpoint = createEndpoint(url, handler, completionMono, configurator);
ClientEndpointConfig config = createEndpointConfig(configurator, protocols);
return this.webSocketContainer.connectToServer(endpoint, config, url);
})
.subscribeOn(Schedulers.elastic()) // connectToServer is blocking
.then(completionMono);
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpResponse response = exchange.getResponse();
HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
URI uri = exchange.getRequest().getURI();
return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]");
});
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpRequest request = exchange.getRequest();
Assert.isInstanceOf(AbstractServerHttpRequest.class, request);
HttpServerExchange httpExchange = ((AbstractServerHttpRequest) request).getNativeRequest();
Set<String> protocols = (subProtocol != null ? Collections.singleton(subProtocol) : Collections.emptySet());
Hybi13Handshake handshake = new Hybi13Handshake(protocols, false);
List<Handshake> handshakes = Collections.singletonList(handshake);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
try {
DefaultCallback callback = new DefaultCallback(handshakeInfo, handler, bufferFactory);
new WebSocketProtocolHandshakeHandler(handshakes, callback).handleRequest(httpExchange);
}
catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
}
@Bean
public HandlerMapping webSocketMessagingHandlerMapping(MessagingManager messagingManager,
UserTokenManager userTokenManager,
ReactiveAuthenticationManager authenticationManager) {
WebSocketMessagingHandler messagingHandler=new WebSocketMessagingHandler(
messagingManager,
userTokenManager,
authenticationManager
);
final Map<String, WebSocketHandler> map = new HashMap<>(1);
map.put("/messaging/**", messagingHandler);
final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
mapping.setUrlMap(map);
return mapping;
}
private Mono<Void> executeInternal(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
MonoProcessor<Void> completionMono = MonoProcessor.create();
return Mono.fromCallable(
() -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
List<String> protocols = handler.getSubProtocols();
DefaultConfigurator configurator = new DefaultConfigurator(requestHeaders);
Endpoint endpoint = createEndpoint(url, handler, completionMono, configurator);
ClientEndpointConfig config = createEndpointConfig(configurator, protocols);
return this.webSocketContainer.connectToServer(endpoint, config, url);
})
.subscribeOn(Schedulers.elastic()) // connectToServer is blocking
.then(completionMono);
}
@Override
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
return getHttpClient()
.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
.websocket(StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()))
.uri(url.toString())
.handle((inbound, outbound) -> {
HttpHeaders responseHeaders = toHttpHeaders(inbound);
String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory);
if (logger.isDebugEnabled()) {
logger.debug("Started session '" + session.getId() + "' for " + url);
}
return handler.handle(session);
})
.doOnRequest(n -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
})
.next();
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpResponse response = exchange.getResponse();
HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
return handler.handle(session);
});
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpRequest request = exchange.getRequest();
Assert.isInstanceOf(AbstractServerHttpRequest.class, request);
HttpServerExchange httpExchange = ((AbstractServerHttpRequest) request).getNativeRequest();
Set<String> protocols = (subProtocol != null ? Collections.singleton(subProtocol) : Collections.emptySet());
Hybi13Handshake handshake = new Hybi13Handshake(protocols, false);
List<Handshake> handshakes = Collections.singletonList(handshake);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
try {
DefaultCallback callback = new DefaultCallback(handshakeInfo, handler, bufferFactory);
new WebSocketProtocolHandshakeHandler(handshakes, callback).handleRequest(httpExchange);
}
catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
}
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/echo", this::echoHandler);
map.put("/sink", this::sinkHandler);
map.put("/double-producer", this::doubleProducerHandler);
map.put("/close", this::closeHandler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1);
CorsConfiguration cors = new CorsConfiguration();
cors.addAllowedOrigin("http://snowdrop.dev");
mapping.setCorsConfigurations(Collections.singletonMap("/sink", cors));
return mapping;
}
@NonNull
@Override
public Mono<Void> handle(@NonNull final WebSocketSession session) {
// pass headers along so custom headers can be sent through
return client.execute(url, this.headers, new WebSocketHandler() {
@NonNull
@Override
public Mono<Void> handle(@NonNull final WebSocketSession webSocketSession) {
// Use retain() for Reactor Netty
Mono<Void> sessionSend = webSocketSession
.send(session.receive().doOnNext(WebSocketMessage::retain));
Mono<Void> serverSessionSend = session.send(
webSocketSession.receive().doOnNext(WebSocketMessage::retain));
return Mono.zip(sessionSend, serverSessionSend).then();
}
@NonNull
@Override
public List<String> getSubProtocols() {
return SoulWebSocketHandler.this.subProtocols;
}
});
}
@Bean
HandlerMapping webSocketMapping(CommentService commentService) {
Map<String, WebSocketHandler> urlMap = new HashMap<>();
urlMap.put("/topic/comments.new", commentService);
Map<String, CorsConfiguration> corsConfigurationMap =
new HashMap<>();
CorsConfiguration corsConfiguration = new CorsConfiguration();
corsConfiguration.addAllowedOrigin("http://localhost:8080");
corsConfigurationMap.put(
"/topic/comments.new", corsConfiguration);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(10);
mapping.setUrlMap(urlMap);
mapping.setCorsConfigurations(corsConfigurationMap);
return mapping;
}
@Override
public Mono<Void> handle(WebSocketSession session) {
// pass headers along so custom headers can be sent through
return client.execute(url, this.headers, new WebSocketHandler() {
@Override
public Mono<Void> handle(WebSocketSession proxySession) {
// Use retain() for Reactor Netty
Mono<Void> proxySessionSend = proxySession
.send(session.receive().doOnNext(WebSocketMessage::retain));
// .log("proxySessionSend", Level.FINE);
Mono<Void> serverSessionSend = session.send(
proxySession.receive().doOnNext(WebSocketMessage::retain));
// .log("sessionSend", Level.FINE);
return Mono.zip(proxySessionSend, serverSessionSend).then();
}
/**
* Copy subProtocols so they are available downstream.
* @return
*/
@Override
public List<String> getSubProtocols() {
return ProxyWebSocketHandler.this.subProtocols;
}
});
}
public JettyWebSocketHandlerAdapter(WebSocketHandler handler,
Function<Session, JettyWebSocketSession> sessionFactory) {
Assert.notNull(handler, "WebSocketHandler is required");
Assert.notNull(sessionFactory, "'sessionFactory' is required");
this.delegateHandler = handler;
this.sessionFactory = sessionFactory;
}
public StandardWebSocketHandlerAdapter(WebSocketHandler handler,
Function<Session, StandardWebSocketSession> sessionFactory) {
Assert.notNull(handler, "WebSocketHandler is required");
Assert.notNull(sessionFactory, "'sessionFactory' is required");
this.delegateHandler = handler;
this.sessionFactory = sessionFactory;
}
@Autowired
@Bean
public HandlerMapping webSocketMapping(final EchoHandler echoHandler) {
final Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/echo", echoHandler);
final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
mapping.setUrlMap(map);
return mapping;
}
private void handleChannel(URI url, WebSocketHandler handler, MonoProcessor<Void> completion,
DefaultNegotiation negotiation, WebSocketChannel channel) {
HandshakeInfo info = createHandshakeInfo(url, negotiation);
UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, this.bufferFactory, completion);
UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session);
channel.getReceiveSetter().set(adapter);
channel.resumeReceives();
handler.handle(session)
.checkpoint(url + " [UndertowWebSocketClient]")
.subscribe(session);
}
@Test
@Ignore
public void subProtocol() throws Exception {
String protocol = "echo-v1";
String protocol2 = "echo-v2";
AtomicReference<HandshakeInfo> infoRef = new AtomicReference<>();
MonoProcessor<Object> output = MonoProcessor.create();
client.execute(getUrl("/sub-protocol"), new WebSocketHandler() {
@Override
public List<String> getSubProtocols() {
return Arrays.asList(protocol, protocol2);
}
@Override
public Mono<Void> handle(WebSocketSession session) {
infoRef.set(session.getHandshakeInfo());
return session.receive().map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output).then();
}
}).block(Duration.ofMillis(5000));
HandshakeInfo info = infoRef.get();
assertThat(info.getHeaders().getFirst("Upgrade"))
.isEqualToIgnoringCase("websocket");
assertThat(info.getHeaders().getFirst("Sec-WebSocket-Protocol"))
.isEqualTo(protocol);
assertThat(info.getSubProtocol()).as("Wrong protocol accepted")
.isEqualTo(protocol);
assertThat(output.block(Duration.ofSeconds(5)))
.as("Wrong protocol detected on the server side").isEqualTo(protocol);
}
private Mono<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
MonoProcessor<Void> completionMono = MonoProcessor.create();
return Mono.fromCallable(
() -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
Object jettyHandler = createHandler(url, handler, completionMono);
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setSubProtocols(handler.getSubProtocols());
UpgradeListener upgradeListener = new DefaultUpgradeListener(headers);
return this.jettyClient.connect(jettyHandler, url, request, upgradeListener);
})
.then(completionMono);
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory factory = response.bufferFactory();
JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(
handler, session -> new JettyWebSocketSession(session, handshakeInfo, factory));
startLazily(servletRequest);
Assert.state(this.factory != null, "No WebSocketServerFactory available");
boolean isUpgrade = this.factory.isUpgradeRequest(servletRequest, servletResponse);
Assert.isTrue(isUpgrade, "Not a WebSocket handshake");
try {
adapterHolder.set(new WebSocketHandlerContainer(adapter, subProtocol));
this.factory.acceptWebSocket(servletRequest, servletResponse);
}
catch (IOException ex) {
return Mono.error(ex);
}
finally {
adapterHolder.remove();
}
return Mono.empty();
}
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/echo", new EchoWebSocketHandler());
map.put("/posts", new PostsWebSocketHandler(this.posts));
// map.put("/custom-header", new CustomHeaderHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
return mapping;
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory){
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory bufferFactory = response.bufferFactory();
Endpoint endpoint = new StandardWebSocketHandlerAdapter(
handler, session -> new TomcatWebSocketSession(session, handshakeInfo, bufferFactory));
String requestURI = servletRequest.getRequestURI();
DefaultServerEndpointConfig config = new DefaultServerEndpointConfig(requestURI, endpoint);
config.setSubprotocols(subProtocol != null ?
Collections.singletonList(subProtocol) : Collections.emptyList());
try {
WsServerContainer container = getContainer(servletRequest);
container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap());
}
catch (ServletException | IOException ex) {
return Mono.error(ex);
}
return Mono.empty();
}
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/employee-feed", new EmployeeWebSocketHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(10);
return mapping;
}
@Override
public Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler) {
ServerHttpRequest request = exchange.getRequest();
HttpMethod method = request.getMethod();
HttpHeaders headers = request.getHeaders();
if (HttpMethod.GET != method) {
return Mono.error(new MethodNotAllowedException(
request.getMethodValue(), Collections.singleton(HttpMethod.GET)));
}
if (!"WebSocket".equalsIgnoreCase(headers.getUpgrade())) {
return handleBadRequest(exchange, "Invalid 'Upgrade' header: " + headers);
}
List<String> connectionValue = headers.getConnection();
if (!connectionValue.contains("Upgrade") && !connectionValue.contains("upgrade")) {
return handleBadRequest(exchange, "Invalid 'Connection' header: " + headers);
}
String key = headers.getFirst(SEC_WEBSOCKET_KEY);
if (key == null) {
return handleBadRequest(exchange, "Missing \"Sec-WebSocket-Key\" header");
}
String protocol = selectProtocol(headers, handler);
return initAttributes(exchange).flatMap(attributes ->
this.upgradeStrategy.upgrade(exchange, handler, protocol,
() -> createHandshakeInfo(exchange, request, protocol, attributes))
);
}
@Nullable
private String selectProtocol(HttpHeaders headers, WebSocketHandler handler) {
String protocolHeader = headers.getFirst(SEC_WEBSOCKET_PROTOCOL);
if (protocolHeader != null) {
List<String> supportedProtocols = handler.getSubProtocols();
for (String protocol : StringUtils.commaDelimitedListToStringArray(protocolHeader)) {
if (supportedProtocols.contains(protocol)) {
return protocol;
}
}
}
return null;
}
@Test
public void sessionAttributePredicate() {
MockWebSession session = new MockWebSession();
session.getAttributes().put("a1", "v1");
session.getAttributes().put("a2", "v2");
session.getAttributes().put("a3", "v3");
session.getAttributes().put("a4", "v4");
session.getAttributes().put("a5", "v5");
MockServerHttpRequest request = initHandshakeRequest();
MockServerWebExchange exchange = MockServerWebExchange.builder(request).session(session).build();
TestRequestUpgradeStrategy upgradeStrategy = new TestRequestUpgradeStrategy();
HandshakeWebSocketService service = new HandshakeWebSocketService(upgradeStrategy);
service.setSessionAttributePredicate(name -> Arrays.asList("a1", "a3", "a5").contains(name));
service.handleRequest(exchange, mock(WebSocketHandler.class)).block();
HandshakeInfo info = upgradeStrategy.handshakeInfo;
assertNotNull(info);
Map<String, Object> attributes = info.getAttributes();
assertEquals(3, attributes.size());
assertThat(attributes, Matchers.hasEntry("a1", "v1"));
assertThat(attributes, Matchers.hasEntry("a3", "v3"));
assertThat(attributes, Matchers.hasEntry("a5", "v5"));
}
@Autowired
@Bean
public HandlerMapping webSocketMapping(final EchoHandler echoHandler) {
final Map<String, WebSocketHandler> map = new HashMap<>(1);
map.put("/objectecho", echoHandler);
final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
mapping.setUrlMap(map);
return mapping;
}
public JettyWebSocketHandlerAdapter(WebSocketHandler handler,
Function<Session, JettyWebSocketSession> sessionFactory) {
Assert.notNull(handler, "WebSocketHandler is required");
Assert.notNull(sessionFactory, "'sessionFactory' is required");
this.delegateHandler = handler;
this.sessionFactory = sessionFactory;
}
public StandardWebSocketHandlerAdapter(WebSocketHandler handler,
Function<Session, StandardWebSocketSession> sessionFactory) {
Assert.notNull(handler, "WebSocketHandler is required");
Assert.notNull(sessionFactory, "'sessionFactory' is required");
this.delegateHandler = handler;
this.sessionFactory = sessionFactory;
}
private static SimpleUrlHandlerMapping configureUrlMappings(CommentService commentService,
InboundChatService inboundChatService,
OutboundChatService outboundChatService) {
Map<String, WebSocketHandler> urlMap = new HashMap<>();
urlMap.put("/topic/comments.new", commentService);
urlMap.put("/app/chatMessage.new", inboundChatService);
urlMap.put("/topic/chatMessage.new", outboundChatService);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(10);
mapping.setUrlMap(urlMap);
return mapping;
}
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/echo", new EchoWebSocketHandler());
map.put("/echoForHttp", new EchoWebSocketHandler());
map.put("/sub-protocol", new SubProtocolWebSocketHandler());
map.put("/custom-header", new CustomHeaderHandler());
map.put("/close", new SessionClosingHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
return mapping;
}