io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker # handshake ( ) 源码实例Demo

下面列出了 io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker # handshake ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。


@Override
public void sendResponse(FullHttpRequest req, ChannelHandlerContext ctx) throws Exception {
   Client client = clientFactory.get(ctx.channel());
   // Handshake
   WebSocketServerHandshaker handshaker = createWebSocketHandshaker(ctx, req);
   if (handshaker == null) {
      WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
   } else {
      handshaker.handshake(ctx.channel(), req);

      // The chunked write handler interferes with large websocket messages
      // so it needs to be removed from the pipeline since we are setting up
      // a websocket here.
      ctx.pipeline().remove(Bridge10ChannelInitializer.CHUNKED_WRITE_HANDLER);

      // Only create the session after the handshake.
      // at this point the session is not fully initialized
      // because we haven't gotten a message from the device that
      // can identify it. We only put the session in the registry when it
      // is initialized
      metrics.incSocketCounter();
      Session socketSession = createAndSetSocketSession(client, ctx.channel(), metrics);
      updateClientInfo(req, socketSession);
      try(MdcContextReference ref = BridgeMdcUtil.captureAndInitializeContext(socketSession)) {
         logger.trace("Getting ready to call session listeners [{}]", sessionListeners);
         sessionListeners.forEach((l) -> { l.onSessionCreated(socketSession); });
         if(socketSession.isInitialized()) {
            sessionRegistry.putSession(socketSession);
         }
      }
   }
}
 

/**
 * handle WebSocket request,then, the the RPC could happen in WebSocket.
 * 
 * @param ctx
 * @param request
 */
protected void handleWebSocket(final ChannelHandlerContext ctx, FullHttpRequest request) {
	if (logger.isDebugEnabled()) {
		logger.debug("handleWebSocket request: uri={}", request.uri());
	}
	// Handshake
	WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(request.uri(), null, true);
	WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(request);
	if (handshaker == null) {
		WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
		return;
	}
	ChannelFutureListener callback = websocketHandshakeListener(ctx, request);
	ChannelFuture future = handshaker.handshake(ctx.channel(), request);
	if (callback != null) {
		future.addListener(callback);
	}
	ChannelPipeline pipe = ctx.pipeline();
	if (pipe.get(WebsocketFrameHandler.class) == null) {
		pipe.addAfter(ctx.name(), "wsFrameHandler", new WebsocketFrameHandler(handshaker));
		ChannelHandler handlerAws = pipe.get(AwsProxyProtocolDecoder.class);
		if (handlerAws != null) {
			pipe.remove(handlerAws);
		}
		pipe.remove(ctx.name());// Remove current Handler
	}
}
 

private void handleWebSocketResponse(ChannelHandlerContext ctx, Outgoing out) {
    WebSocketHttpResponse response = (WebSocketHttpResponse) out.message;
    WebSocketServerHandshaker handshaker = response.handshakerFactory().newHandshaker(lastRequest);

    if (handshaker == null) {
        HttpResponse res = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1,
                HttpResponseStatus.UPGRADE_REQUIRED);
        res.headers().set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, WebSocketVersion.V13.toHttpHeaderValue());
        HttpUtil.setContentLength(res, 0);
        super.unbufferedWrite(ctx, new Outgoing(res, out.promise));
        response.subscribe(new CancelledSubscriber<>());
    } else {
        // First, insert new handlers in the chain after us for handling the websocket
        ChannelPipeline pipeline = ctx.pipeline();
        HandlerPublisher<WebSocketFrame> publisher = new HandlerPublisher<>(ctx.executor(), WebSocketFrame.class);
        HandlerSubscriber<WebSocketFrame> subscriber = new HandlerSubscriber<>(ctx.executor());
        pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-subscriber", subscriber);
        pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-publisher", publisher);

        // Now remove ourselves from the chain
        ctx.pipeline().remove(ctx.name());

        // Now do the handshake
        // Wrap the request in an empty request because we don't need the WebSocket handshaker ignoring the body,
        // we already have handled the body.
        handshaker.handshake(ctx.channel(), new EmptyHttpRequest(lastRequest));

        // And hook up the subscriber/publishers
        response.subscribe(subscriber);
        publisher.subscribe(response);
    }

}
 

private void handleHandshake(ChannelHandlerContext ctx, HttpRequest req) {
  WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketURL(req), null, true);
  WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
  if (handshaker == null) {
    WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
  }
  else {
    handshaker.handshake(ctx.channel(), req);
  }
}