下面列出了org.springframework.http.server.reactive.AbstractServerHttpResponse#org.springframework.web.reactive.socket.HandshakeInfo 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Create a new WebSocket session.
*/
protected AbstractWebSocketSession(T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory) {
Assert.notNull(delegate, "Native session is required.");
Assert.notNull(id, "Session id is required.");
Assert.notNull(info, "HandshakeInfo is required.");
Assert.notNull(bufferFactory, "DataBuffer factory is required.");
this.delegate = delegate;
this.id = id;
this.handshakeInfo = info;
this.bufferFactory = bufferFactory;
this.attributes.putAll(info.getAttributes());
this.logPrefix = initLogPrefix(info, id);
if (logger.isDebugEnabled()) {
logger.debug(getLogPrefix() + "Session id \"" + getId() + "\" for " + getHandshakeInfo().getUri());
}
}
@Override
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
String protocols = StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols());
return getHttpClient()
.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
.websocket(protocols, getMaxFramePayloadLength())
.uri(url.toString())
.handle((inbound, outbound) -> {
HttpHeaders responseHeaders = toHttpHeaders(inbound);
String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
WebSocketSession session = new ReactorNettyWebSocketSession(
inbound, outbound, info, factory, getMaxFramePayloadLength());
if (logger.isDebugEnabled()) {
logger.debug("Started session '" + session.getId() + "' for " + url);
}
return handler.handle(session).checkpoint(url + " [ReactorNettyWebSocketClient]");
})
.doOnRequest(n -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
})
.next();
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpResponse response = exchange.getResponse();
HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
URI uri = exchange.getRequest().getURI();
return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]");
});
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpRequest request = exchange.getRequest();
Assert.isInstanceOf(AbstractServerHttpRequest.class, request);
HttpServerExchange httpExchange = ((AbstractServerHttpRequest) request).getNativeRequest();
Set<String> protocols = (subProtocol != null ? Collections.singleton(subProtocol) : Collections.emptySet());
Hybi13Handshake handshake = new Hybi13Handshake(protocols, false);
List<Handshake> handshakes = Collections.singletonList(handshake);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
try {
DefaultCallback callback = new DefaultCallback(handshakeInfo, handler, bufferFactory);
new WebSocketProtocolHandshakeHandler(handshakes, callback).handleRequest(httpExchange);
}
catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
}
/**
* Create a new WebSocket session.
*/
protected AbstractWebSocketSession(T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory) {
Assert.notNull(delegate, "Native session is required.");
Assert.notNull(id, "Session id is required.");
Assert.notNull(info, "HandshakeInfo is required.");
Assert.notNull(bufferFactory, "DataBuffer factory is required.");
this.delegate = delegate;
this.id = id;
this.handshakeInfo = info;
this.bufferFactory = bufferFactory;
this.attributes.putAll(info.getAttributes());
this.logPrefix = initLogPrefix(info, id);
if (logger.isDebugEnabled()) {
logger.debug(getLogPrefix() + "Session id \"" + getId() + "\" for " + getHandshakeInfo().getUri());
}
}
@Override
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
return getHttpClient()
.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
.websocket(StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()))
.uri(url.toString())
.handle((inbound, outbound) -> {
HttpHeaders responseHeaders = toHttpHeaders(inbound);
String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory);
if (logger.isDebugEnabled()) {
logger.debug("Started session '" + session.getId() + "' for " + url);
}
return handler.handle(session);
})
.doOnRequest(n -> {
if (logger.isDebugEnabled()) {
logger.debug("Connecting to " + url);
}
})
.next();
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpResponse response = exchange.getResponse();
HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
(in, out) -> {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(
in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
return handler.handle(session);
});
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpRequest request = exchange.getRequest();
Assert.isInstanceOf(AbstractServerHttpRequest.class, request);
HttpServerExchange httpExchange = ((AbstractServerHttpRequest) request).getNativeRequest();
Set<String> protocols = (subProtocol != null ? Collections.singleton(subProtocol) : Collections.emptySet());
Hybi13Handshake handshake = new Hybi13Handshake(protocols, false);
List<Handshake> handshakes = Collections.singletonList(handshake);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();
try {
DefaultCallback callback = new DefaultCallback(handshakeInfo, handler, bufferFactory);
new WebSocketProtocolHandshakeHandler(handshakes, callback).handleRequest(httpExchange);
}
catch (Exception ex) {
return Mono.error(ex);
}
return Mono.empty();
}
/**
* Constructor with an additional maxFramePayloadLength argument.
* @since 5.1
*/
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
HandshakeInfo info, NettyDataBufferFactory bufferFactory,
int maxFramePayloadLength) {
super(new WebSocketConnection(inbound, outbound), info, bufferFactory);
this.maxFramePayloadLength = maxFramePayloadLength;
}
/**
* Alternative constructor with completion {@code Mono<Void>} to propagate
* the session completion (success or error) (for client-side use).
*/
public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info,
DataBufferFactory bufferFactory, @Nullable MonoProcessor<Void> completionMono) {
super(delegate, id, info, bufferFactory);
this.receivePublisher = new WebSocketReceivePublisher();
this.completionMono = completionMono;
}
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
@Nullable MonoProcessor<Void> completionMono) {
super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono);
// TODO: suspend causes failures if invoked at this stage
// suspendReceiving();
}
private void handleChannel(URI url, WebSocketHandler handler, MonoProcessor<Void> completion,
DefaultNegotiation negotiation, WebSocketChannel channel) {
HandshakeInfo info = createHandshakeInfo(url, negotiation);
UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, this.bufferFactory, completion);
UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session);
channel.getReceiveSetter().set(adapter);
channel.resumeReceives();
handler.handle(session)
.checkpoint(url + " [UndertowWebSocketClient]")
.subscribe(session);
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory factory = response.bufferFactory();
JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(
handler, session -> new JettyWebSocketSession(session, handshakeInfo, factory));
startLazily(servletRequest);
Assert.state(this.factory != null, "No WebSocketServerFactory available");
boolean isUpgrade = this.factory.isUpgradeRequest(servletRequest, servletResponse);
Assert.isTrue(isUpgrade, "Not a WebSocket handshake");
try {
adapterHolder.set(new WebSocketHandlerContainer(adapter, subProtocol));
this.factory.acceptWebSocket(servletRequest, servletResponse);
}
catch (IOException ex) {
return Mono.error(ex);
}
finally {
adapterHolder.remove();
}
return Mono.empty();
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory){
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory bufferFactory = response.bufferFactory();
Endpoint endpoint = new StandardWebSocketHandlerAdapter(
handler, session -> new TomcatWebSocketSession(session, handshakeInfo, bufferFactory));
String requestURI = servletRequest.getRequestURI();
DefaultServerEndpointConfig config = new DefaultServerEndpointConfig(requestURI, endpoint);
config.setSubprotocols(subProtocol != null ?
Collections.singletonList(subProtocol) : Collections.emptyList());
try {
WsServerContainer container = getContainer(servletRequest);
container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap());
}
catch (ServletException | IOException ex) {
return Mono.error(ex);
}
return Mono.empty();
}
private HandshakeInfo createHandshakeInfo(ServerWebExchange exchange, ServerHttpRequest request,
@Nullable String protocol, Map<String, Object> attributes) {
URI uri = request.getURI();
// Copy request headers, as they might be pooled and recycled by
// the server implementation once the handshake HTTP exchange is done.
HttpHeaders headers = new HttpHeaders();
headers.addAll(request.getHeaders());
Mono<Principal> principal = exchange.getPrincipal();
String logPrefix = exchange.getLogPrefix();
InetSocketAddress remoteAddress = request.getRemoteAddress();
return new HandshakeInfo(uri, headers, principal, protocol, remoteAddress, attributes, logPrefix);
}
@Test
public void sessionAttributePredicate() {
MockWebSession session = new MockWebSession();
session.getAttributes().put("a1", "v1");
session.getAttributes().put("a2", "v2");
session.getAttributes().put("a3", "v3");
session.getAttributes().put("a4", "v4");
session.getAttributes().put("a5", "v5");
MockServerHttpRequest request = initHandshakeRequest();
MockServerWebExchange exchange = MockServerWebExchange.builder(request).session(session).build();
TestRequestUpgradeStrategy upgradeStrategy = new TestRequestUpgradeStrategy();
HandshakeWebSocketService service = new HandshakeWebSocketService(upgradeStrategy);
service.setSessionAttributePredicate(name -> Arrays.asList("a1", "a3", "a5").contains(name));
service.handleRequest(exchange, mock(WebSocketHandler.class)).block();
HandshakeInfo info = upgradeStrategy.handshakeInfo;
assertNotNull(info);
Map<String, Object> attributes = info.getAttributes();
assertEquals(3, attributes.size());
assertThat(attributes, Matchers.hasEntry("a1", "v1"));
assertThat(attributes, Matchers.hasEntry("a3", "v3"));
assertThat(attributes, Matchers.hasEntry("a5", "v5"));
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
this.handshakeInfo = handshakeInfoFactory.get();
return Mono.empty();
}
/**
* Constructor with an additional maxFramePayloadLength argument.
* @since 5.1
*/
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
HandshakeInfo info, NettyDataBufferFactory bufferFactory,
int maxFramePayloadLength) {
super(new WebSocketConnection(inbound, outbound), info, bufferFactory);
this.maxFramePayloadLength = maxFramePayloadLength;
}
/**
* Alternative constructor with completion {@code Mono<Void>} to propagate
* the session completion (success or error) (for client-side use).
*/
public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info,
DataBufferFactory bufferFactory, @Nullable MonoProcessor<Void> completionMono) {
super(delegate, id, info, bufferFactory);
this.receivePublisher = new WebSocketReceivePublisher();
this.completionMono = completionMono;
}
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
@Nullable MonoProcessor<Void> completionMono) {
super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono);
// TODO: suspend causes failures if invoked at this stage
// suspendReceiving();
}
private void handleChannel(URI url, WebSocketHandler handler, MonoProcessor<Void> completion,
DefaultNegotiation negotiation, WebSocketChannel channel) {
HandshakeInfo info = createHandshakeInfo(url, negotiation);
UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, this.bufferFactory, completion);
UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session);
channel.getReceiveSetter().set(adapter);
channel.resumeReceives();
handler.handle(session).subscribe(session);
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory factory = response.bufferFactory();
JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(
handler, session -> new JettyWebSocketSession(session, handshakeInfo, factory));
startLazily(servletRequest);
Assert.state(this.factory != null, "No WebSocketServerFactory available");
boolean isUpgrade = this.factory.isUpgradeRequest(servletRequest, servletResponse);
Assert.isTrue(isUpgrade, "Not a WebSocket handshake");
try {
adapterHolder.set(new WebSocketHandlerContainer(adapter, subProtocol));
this.factory.acceptWebSocket(servletRequest, servletResponse);
}
catch (IOException ex) {
return Mono.error(ex);
}
finally {
adapterHolder.remove();
}
return Mono.empty();
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory){
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
HttpServletRequest servletRequest = getHttpServletRequest(request);
HttpServletResponse servletResponse = getHttpServletResponse(response);
HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
DataBufferFactory bufferFactory = response.bufferFactory();
Endpoint endpoint = new StandardWebSocketHandlerAdapter(
handler, session -> new TomcatWebSocketSession(session, handshakeInfo, bufferFactory));
String requestURI = servletRequest.getRequestURI();
DefaultServerEndpointConfig config = new DefaultServerEndpointConfig(requestURI, endpoint);
config.setSubprotocols(subProtocol != null ?
Collections.singletonList(subProtocol) : Collections.emptyList());
try {
WsServerContainer container = getContainer(servletRequest);
container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap());
}
catch (ServletException | IOException ex) {
return Mono.error(ex);
}
return Mono.empty();
}
private HandshakeInfo createHandshakeInfo(ServerWebExchange exchange, ServerHttpRequest request,
@Nullable String protocol, Map<String, Object> attributes) {
URI uri = request.getURI();
// Copy request headers, as they might be pooled and recycled by
// the server implementation once the handshake HTTP exchange is done.
HttpHeaders headers = new HttpHeaders();
headers.addAll(request.getHeaders());
Mono<Principal> principal = exchange.getPrincipal();
String logPrefix = exchange.getLogPrefix();
InetSocketAddress remoteAddress = request.getRemoteAddress();
return new HandshakeInfo(uri, headers, principal, protocol, remoteAddress, attributes, logPrefix);
}
@Test
public void sessionAttributePredicate() {
MockWebSession session = new MockWebSession();
session.getAttributes().put("a1", "v1");
session.getAttributes().put("a2", "v2");
session.getAttributes().put("a3", "v3");
session.getAttributes().put("a4", "v4");
session.getAttributes().put("a5", "v5");
MockServerHttpRequest request = initHandshakeRequest();
MockServerWebExchange exchange = MockServerWebExchange.builder(request).session(session).build();
TestRequestUpgradeStrategy upgradeStrategy = new TestRequestUpgradeStrategy();
HandshakeWebSocketService service = new HandshakeWebSocketService(upgradeStrategy);
service.setSessionAttributePredicate(name -> Arrays.asList("a1", "a3", "a5").contains(name));
service.handleRequest(exchange, mock(WebSocketHandler.class)).block();
HandshakeInfo info = upgradeStrategy.handshakeInfo;
assertNotNull(info);
Map<String, Object> attributes = info.getAttributes();
assertEquals(3, attributes.size());
assertThat(attributes, Matchers.hasEntry("a1", "v1"));
assertThat(attributes, Matchers.hasEntry("a3", "v3"));
assertThat(attributes, Matchers.hasEntry("a5", "v5"));
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
this.handshakeInfo = handshakeInfoFactory.get();
return Mono.empty();
}
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {
LOGGER.debug("Upgrading request to web socket");
ServerHttpRequest request = exchange.getRequest();
HttpServerRequest vertxRequest = ((AbstractServerHttpRequest) request).getNativeRequest();
ServerWebSocket webSocket = vertxRequest.upgrade();
VertxWebSocketSession session =
new VertxWebSocketSession(webSocket, handshakeInfoFactory.get(), bufferConverter);
return handler.handle(session);
}
@Test
@Ignore
public void subProtocol() throws Exception {
String protocol = "echo-v1";
String protocol2 = "echo-v2";
AtomicReference<HandshakeInfo> infoRef = new AtomicReference<>();
MonoProcessor<Object> output = MonoProcessor.create();
client.execute(getUrl("/sub-protocol"), new WebSocketHandler() {
@Override
public List<String> getSubProtocols() {
return Arrays.asList(protocol, protocol2);
}
@Override
public Mono<Void> handle(WebSocketSession session) {
infoRef.set(session.getHandshakeInfo());
return session.receive().map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output).then();
}
}).block(Duration.ofMillis(5000));
HandshakeInfo info = infoRef.get();
assertThat(info.getHeaders().getFirst("Upgrade"))
.isEqualToIgnoringCase("websocket");
assertThat(info.getHeaders().getFirst("Sec-WebSocket-Protocol"))
.isEqualTo(protocol);
assertThat(info.getSubProtocol()).as("Wrong protocol accepted")
.isEqualTo(protocol);
assertThat(output.block(Duration.ofSeconds(5)))
.as("Wrong protocol detected on the server side").isEqualTo(protocol);
}
/**
* Constructor for the session, using the {@link #DEFAULT_FRAME_MAX_SIZE} value.
*/
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
HandshakeInfo info, NettyDataBufferFactory bufferFactory) {
this(inbound, outbound, info, bufferFactory, DEFAULT_FRAME_MAX_SIZE);
}
protected NettyWebSocketSessionSupport(T delegate, HandshakeInfo info, NettyDataBufferFactory factory) {
super(delegate, ObjectUtils.getIdentityHexString(delegate), info, factory);
}