org.springframework.http.server.reactive.AbstractServerHttpResponse#org.springframework.web.reactive.socket.HandshakeInfo源码实例Demo

下面列出了org.springframework.http.server.reactive.AbstractServerHttpResponse#org.springframework.web.reactive.socket.HandshakeInfo 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

/**
 * Create a new WebSocket session.
 */
protected AbstractWebSocketSession(T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory) {
	Assert.notNull(delegate, "Native session is required.");
	Assert.notNull(id, "Session id is required.");
	Assert.notNull(info, "HandshakeInfo is required.");
	Assert.notNull(bufferFactory, "DataBuffer factory is required.");

	this.delegate = delegate;
	this.id = id;
	this.handshakeInfo = info;
	this.bufferFactory = bufferFactory;
	this.attributes.putAll(info.getAttributes());
	this.logPrefix = initLogPrefix(info, id);

	if (logger.isDebugEnabled()) {
		logger.debug(getLogPrefix() + "Session id \"" + getId() + "\" for " + getHandshakeInfo().getUri());
	}
}
 
@Override
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
	String protocols = StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols());
	return getHttpClient()
			.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
			.websocket(protocols, getMaxFramePayloadLength())
			.uri(url.toString())
			.handle((inbound, outbound) -> {
				HttpHeaders responseHeaders = toHttpHeaders(inbound);
				String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
				HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
				NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
				WebSocketSession session = new ReactorNettyWebSocketSession(
						inbound, outbound, info, factory, getMaxFramePayloadLength());
				if (logger.isDebugEnabled()) {
					logger.debug("Started session '" + session.getId() + "' for " + url);
				}
				return handler.handle(session).checkpoint(url + " [ReactorNettyWebSocketClient]");
			})
			.doOnRequest(n -> {
				if (logger.isDebugEnabled()) {
					logger.debug("Connecting to " + url);
				}
			})
			.next();
}
 
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
		@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {

	ServerHttpResponse response = exchange.getResponse();
	HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
	HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
	NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();

	return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
			(in, out) -> {
				ReactorNettyWebSocketSession session =
						new ReactorNettyWebSocketSession(
								in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
				URI uri = exchange.getRequest().getURI();
				return handler.handle(session).checkpoint(uri + " [ReactorNettyRequestUpgradeStrategy]");
			});
}
 
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
		@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {

	ServerHttpRequest request = exchange.getRequest();
	Assert.isInstanceOf(AbstractServerHttpRequest.class, request);
	HttpServerExchange httpExchange = ((AbstractServerHttpRequest) request).getNativeRequest();

	Set<String> protocols = (subProtocol != null ? Collections.singleton(subProtocol) : Collections.emptySet());
	Hybi13Handshake handshake = new Hybi13Handshake(protocols, false);
	List<Handshake> handshakes = Collections.singletonList(handshake);

	HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
	DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();

	try {
		DefaultCallback callback = new DefaultCallback(handshakeInfo, handler, bufferFactory);
		new WebSocketProtocolHandshakeHandler(handshakes, callback).handleRequest(httpExchange);
	}
	catch (Exception ex) {
		return Mono.error(ex);
	}

	return Mono.empty();
}
 
/**
 * Create a new WebSocket session.
 */
protected AbstractWebSocketSession(T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory) {
	Assert.notNull(delegate, "Native session is required.");
	Assert.notNull(id, "Session id is required.");
	Assert.notNull(info, "HandshakeInfo is required.");
	Assert.notNull(bufferFactory, "DataBuffer factory is required.");

	this.delegate = delegate;
	this.id = id;
	this.handshakeInfo = info;
	this.bufferFactory = bufferFactory;
	this.attributes.putAll(info.getAttributes());
	this.logPrefix = initLogPrefix(info, id);

	if (logger.isDebugEnabled()) {
		logger.debug(getLogPrefix() + "Session id \"" + getId() + "\" for " + getHandshakeInfo().getUri());
	}
}
 
@Override
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
	return getHttpClient()
			.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
			.websocket(StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()))
			.uri(url.toString())
			.handle((inbound, outbound) -> {
				HttpHeaders responseHeaders = toHttpHeaders(inbound);
				String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
				HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
				NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
				WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory);
				if (logger.isDebugEnabled()) {
					logger.debug("Started session '" + session.getId() + "' for " + url);
				}
				return handler.handle(session);
			})
			.doOnRequest(n -> {
				if (logger.isDebugEnabled()) {
					logger.debug("Connecting to " + url);
				}
			})
			.next();
}
 
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
		@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {

	ServerHttpResponse response = exchange.getResponse();
	HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
	HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
	NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory();

	return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength,
			(in, out) -> {
				ReactorNettyWebSocketSession session =
						new ReactorNettyWebSocketSession(
								in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength);
				return handler.handle(session);
			});
}
 
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
		@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {

	ServerHttpRequest request = exchange.getRequest();
	Assert.isInstanceOf(AbstractServerHttpRequest.class, request);
	HttpServerExchange httpExchange = ((AbstractServerHttpRequest) request).getNativeRequest();

	Set<String> protocols = (subProtocol != null ? Collections.singleton(subProtocol) : Collections.emptySet());
	Hybi13Handshake handshake = new Hybi13Handshake(protocols, false);
	List<Handshake> handshakes = Collections.singletonList(handshake);

	HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
	DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();

	try {
		DefaultCallback callback = new DefaultCallback(handshakeInfo, handler, bufferFactory);
		new WebSocketProtocolHandshakeHandler(handshakes, callback).handleRequest(httpExchange);
	}
	catch (Exception ex) {
		return Mono.error(ex);
	}

	return Mono.empty();
}
 
/**
 * Constructor with an additional maxFramePayloadLength argument.
 * @since 5.1
 */
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
		HandshakeInfo info, NettyDataBufferFactory bufferFactory,
		int maxFramePayloadLength) {

	super(new WebSocketConnection(inbound, outbound), info, bufferFactory);
	this.maxFramePayloadLength = maxFramePayloadLength;
}
 
/**
 * Alternative constructor with completion {@code Mono&lt;Void&gt;} to propagate
 * the session completion (success or error) (for client-side use).
 */
public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info,
		DataBufferFactory bufferFactory, @Nullable MonoProcessor<Void> completionMono) {

	super(delegate, id, info, bufferFactory);
	this.receivePublisher = new WebSocketReceivePublisher();
	this.completionMono = completionMono;
}
 
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
		@Nullable MonoProcessor<Void> completionMono) {

	super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono);
	// TODO: suspend causes failures if invoked at this stage
	// suspendReceiving();
}
 
private void handleChannel(URI url, WebSocketHandler handler, MonoProcessor<Void> completion,
		DefaultNegotiation negotiation, WebSocketChannel channel) {

	HandshakeInfo info = createHandshakeInfo(url, negotiation);
	UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, this.bufferFactory, completion);
	UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session);

	channel.getReceiveSetter().set(adapter);
	channel.resumeReceives();

	handler.handle(session)
			.checkpoint(url + " [UndertowWebSocketClient]")
			.subscribe(session);
}
 
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
		@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {

	ServerHttpRequest request = exchange.getRequest();
	ServerHttpResponse response = exchange.getResponse();

	HttpServletRequest servletRequest = getHttpServletRequest(request);
	HttpServletResponse servletResponse = getHttpServletResponse(response);

	HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
	DataBufferFactory factory = response.bufferFactory();

	JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(
			handler, session -> new JettyWebSocketSession(session, handshakeInfo, factory));

	startLazily(servletRequest);

	Assert.state(this.factory != null, "No WebSocketServerFactory available");
	boolean isUpgrade = this.factory.isUpgradeRequest(servletRequest, servletResponse);
	Assert.isTrue(isUpgrade, "Not a WebSocket handshake");

	try {
		adapterHolder.set(new WebSocketHandlerContainer(adapter, subProtocol));
		this.factory.acceptWebSocket(servletRequest, servletResponse);
	}
	catch (IOException ex) {
		return Mono.error(ex);
	}
	finally {
		adapterHolder.remove();
	}

	return Mono.empty();
}
 
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
		@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory){

	ServerHttpRequest request = exchange.getRequest();
	ServerHttpResponse response = exchange.getResponse();

	HttpServletRequest servletRequest = getHttpServletRequest(request);
	HttpServletResponse servletResponse = getHttpServletResponse(response);

	HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
	DataBufferFactory bufferFactory = response.bufferFactory();

	Endpoint endpoint = new StandardWebSocketHandlerAdapter(
			handler, session -> new TomcatWebSocketSession(session, handshakeInfo, bufferFactory));

	String requestURI = servletRequest.getRequestURI();
	DefaultServerEndpointConfig config = new DefaultServerEndpointConfig(requestURI, endpoint);
	config.setSubprotocols(subProtocol != null ?
			Collections.singletonList(subProtocol) : Collections.emptyList());

	try {
		WsServerContainer container = getContainer(servletRequest);
		container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap());
	}
	catch (ServletException | IOException ex) {
		return Mono.error(ex);
	}

	return Mono.empty();
}
 
private HandshakeInfo createHandshakeInfo(ServerWebExchange exchange, ServerHttpRequest request,
		@Nullable String protocol, Map<String, Object> attributes) {

	URI uri = request.getURI();
	// Copy request headers, as they might be pooled and recycled by
	// the server implementation once the handshake HTTP exchange is done.
	HttpHeaders headers = new HttpHeaders();
	headers.addAll(request.getHeaders());
	Mono<Principal> principal = exchange.getPrincipal();
	String logPrefix = exchange.getLogPrefix();
	InetSocketAddress remoteAddress = request.getRemoteAddress();
	return new HandshakeInfo(uri, headers, principal, protocol, remoteAddress, attributes, logPrefix);
}
 
@Test
public void sessionAttributePredicate() {
	MockWebSession session = new MockWebSession();
	session.getAttributes().put("a1", "v1");
	session.getAttributes().put("a2", "v2");
	session.getAttributes().put("a3", "v3");
	session.getAttributes().put("a4", "v4");
	session.getAttributes().put("a5", "v5");

	MockServerHttpRequest request = initHandshakeRequest();
	MockServerWebExchange exchange = MockServerWebExchange.builder(request).session(session).build();

	TestRequestUpgradeStrategy upgradeStrategy = new TestRequestUpgradeStrategy();
	HandshakeWebSocketService service = new HandshakeWebSocketService(upgradeStrategy);
	service.setSessionAttributePredicate(name -> Arrays.asList("a1", "a3", "a5").contains(name));

	service.handleRequest(exchange, mock(WebSocketHandler.class)).block();

	HandshakeInfo info = upgradeStrategy.handshakeInfo;
	assertNotNull(info);

	Map<String, Object> attributes = info.getAttributes();
	assertEquals(3, attributes.size());
	assertThat(attributes, Matchers.hasEntry("a1", "v1"));
	assertThat(attributes, Matchers.hasEntry("a3", "v3"));
	assertThat(attributes, Matchers.hasEntry("a5", "v5"));
}
 
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler,
		@Nullable  String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {

	this.handshakeInfo = handshakeInfoFactory.get();
	return Mono.empty();
}
 
/**
 * Constructor with an additional maxFramePayloadLength argument.
 * @since 5.1
 */
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
		HandshakeInfo info, NettyDataBufferFactory bufferFactory,
		int maxFramePayloadLength) {

	super(new WebSocketConnection(inbound, outbound), info, bufferFactory);
	this.maxFramePayloadLength = maxFramePayloadLength;
}
 
/**
 * Alternative constructor with completion {@code Mono&lt;Void&gt;} to propagate
 * the session completion (success or error) (for client-side use).
 */
public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info,
		DataBufferFactory bufferFactory, @Nullable MonoProcessor<Void> completionMono) {

	super(delegate, id, info, bufferFactory);
	this.receivePublisher = new WebSocketReceivePublisher();
	this.completionMono = completionMono;
}
 
public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory,
		@Nullable MonoProcessor<Void> completionMono) {

	super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionMono);
	// TODO: suspend causes failures if invoked at this stage
	// suspendReceiving();
}
 
private void handleChannel(URI url, WebSocketHandler handler, MonoProcessor<Void> completion,
		DefaultNegotiation negotiation, WebSocketChannel channel) {

	HandshakeInfo info = createHandshakeInfo(url, negotiation);
	UndertowWebSocketSession session = new UndertowWebSocketSession(channel, info, this.bufferFactory, completion);
	UndertowWebSocketHandlerAdapter adapter = new UndertowWebSocketHandlerAdapter(session);

	channel.getReceiveSetter().set(adapter);
	channel.resumeReceives();

	handler.handle(session).subscribe(session);
}
 
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
		@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {

	ServerHttpRequest request = exchange.getRequest();
	ServerHttpResponse response = exchange.getResponse();

	HttpServletRequest servletRequest = getHttpServletRequest(request);
	HttpServletResponse servletResponse = getHttpServletResponse(response);

	HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
	DataBufferFactory factory = response.bufferFactory();

	JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter(
			handler, session -> new JettyWebSocketSession(session, handshakeInfo, factory));

	startLazily(servletRequest);

	Assert.state(this.factory != null, "No WebSocketServerFactory available");
	boolean isUpgrade = this.factory.isUpgradeRequest(servletRequest, servletResponse);
	Assert.isTrue(isUpgrade, "Not a WebSocket handshake");

	try {
		adapterHolder.set(new WebSocketHandlerContainer(adapter, subProtocol));
		this.factory.acceptWebSocket(servletRequest, servletResponse);
	}
	catch (IOException ex) {
		return Mono.error(ex);
	}
	finally {
		adapterHolder.remove();
	}

	return Mono.empty();
}
 
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
		@Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory){

	ServerHttpRequest request = exchange.getRequest();
	ServerHttpResponse response = exchange.getResponse();

	HttpServletRequest servletRequest = getHttpServletRequest(request);
	HttpServletResponse servletResponse = getHttpServletResponse(response);

	HandshakeInfo handshakeInfo = handshakeInfoFactory.get();
	DataBufferFactory bufferFactory = response.bufferFactory();

	Endpoint endpoint = new StandardWebSocketHandlerAdapter(
			handler, session -> new TomcatWebSocketSession(session, handshakeInfo, bufferFactory));

	String requestURI = servletRequest.getRequestURI();
	DefaultServerEndpointConfig config = new DefaultServerEndpointConfig(requestURI, endpoint);
	config.setSubprotocols(subProtocol != null ?
			Collections.singletonList(subProtocol) : Collections.emptyList());

	try {
		WsServerContainer container = getContainer(servletRequest);
		container.doUpgrade(servletRequest, servletResponse, config, Collections.emptyMap());
	}
	catch (ServletException | IOException ex) {
		return Mono.error(ex);
	}

	return Mono.empty();
}
 
private HandshakeInfo createHandshakeInfo(ServerWebExchange exchange, ServerHttpRequest request,
		@Nullable String protocol, Map<String, Object> attributes) {

	URI uri = request.getURI();
	// Copy request headers, as they might be pooled and recycled by
	// the server implementation once the handshake HTTP exchange is done.
	HttpHeaders headers = new HttpHeaders();
	headers.addAll(request.getHeaders());
	Mono<Principal> principal = exchange.getPrincipal();
	String logPrefix = exchange.getLogPrefix();
	InetSocketAddress remoteAddress = request.getRemoteAddress();
	return new HandshakeInfo(uri, headers, principal, protocol, remoteAddress, attributes, logPrefix);
}
 
@Test
public void sessionAttributePredicate() {
	MockWebSession session = new MockWebSession();
	session.getAttributes().put("a1", "v1");
	session.getAttributes().put("a2", "v2");
	session.getAttributes().put("a3", "v3");
	session.getAttributes().put("a4", "v4");
	session.getAttributes().put("a5", "v5");

	MockServerHttpRequest request = initHandshakeRequest();
	MockServerWebExchange exchange = MockServerWebExchange.builder(request).session(session).build();

	TestRequestUpgradeStrategy upgradeStrategy = new TestRequestUpgradeStrategy();
	HandshakeWebSocketService service = new HandshakeWebSocketService(upgradeStrategy);
	service.setSessionAttributePredicate(name -> Arrays.asList("a1", "a3", "a5").contains(name));

	service.handleRequest(exchange, mock(WebSocketHandler.class)).block();

	HandshakeInfo info = upgradeStrategy.handshakeInfo;
	assertNotNull(info);

	Map<String, Object> attributes = info.getAttributes();
	assertEquals(3, attributes.size());
	assertThat(attributes, Matchers.hasEntry("a1", "v1"));
	assertThat(attributes, Matchers.hasEntry("a3", "v3"));
	assertThat(attributes, Matchers.hasEntry("a5", "v5"));
}
 
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler,
		@Nullable  String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {

	this.handshakeInfo = handshakeInfoFactory.get();
	return Mono.empty();
}
 
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler,
    @Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) {

    LOGGER.debug("Upgrading request to web socket");

    ServerHttpRequest request = exchange.getRequest();
    HttpServerRequest vertxRequest = ((AbstractServerHttpRequest) request).getNativeRequest();

    ServerWebSocket webSocket = vertxRequest.upgrade();
    VertxWebSocketSession session =
        new VertxWebSocketSession(webSocket, handshakeInfoFactory.get(), bufferConverter);

    return handler.handle(session);
}
 
@Test
@Ignore
public void subProtocol() throws Exception {
	String protocol = "echo-v1";
	String protocol2 = "echo-v2";
	AtomicReference<HandshakeInfo> infoRef = new AtomicReference<>();
	MonoProcessor<Object> output = MonoProcessor.create();

	client.execute(getUrl("/sub-protocol"), new WebSocketHandler() {
		@Override
		public List<String> getSubProtocols() {
			return Arrays.asList(protocol, protocol2);
		}

		@Override
		public Mono<Void> handle(WebSocketSession session) {
			infoRef.set(session.getHandshakeInfo());
			return session.receive().map(WebSocketMessage::getPayloadAsText)
					.subscribeWith(output).then();
		}
	}).block(Duration.ofMillis(5000));

	HandshakeInfo info = infoRef.get();
	assertThat(info.getHeaders().getFirst("Upgrade"))
			.isEqualToIgnoringCase("websocket");

	assertThat(info.getHeaders().getFirst("Sec-WebSocket-Protocol"))
			.isEqualTo(protocol);
	assertThat(info.getSubProtocol()).as("Wrong protocol accepted")
			.isEqualTo(protocol);
	assertThat(output.block(Duration.ofSeconds(5)))
			.as("Wrong protocol detected on the server side").isEqualTo(protocol);
}
 
/**
 * Constructor for the session, using the {@link #DEFAULT_FRAME_MAX_SIZE} value.
 */
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
		HandshakeInfo info, NettyDataBufferFactory bufferFactory) {

	this(inbound, outbound, info, bufferFactory, DEFAULT_FRAME_MAX_SIZE);
}
 
protected NettyWebSocketSessionSupport(T delegate, HandshakeInfo info, NettyDataBufferFactory factory) {
	super(delegate, ObjectUtils.getIdentityHexString(delegate), info, factory);
}