下面列出了 io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker # handshake ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void sendResponse(FullHttpRequest req, ChannelHandlerContext ctx) throws Exception {
Client client = clientFactory.get(ctx.channel());
// Handshake
WebSocketServerHandshaker handshaker = createWebSocketHandshaker(ctx, req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
// The chunked write handler interferes with large websocket messages
// so it needs to be removed from the pipeline since we are setting up
// a websocket here.
ctx.pipeline().remove(Bridge10ChannelInitializer.CHUNKED_WRITE_HANDLER);
// Only create the session after the handshake.
// at this point the session is not fully initialized
// because we haven't gotten a message from the device that
// can identify it. We only put the session in the registry when it
// is initialized
metrics.incSocketCounter();
Session socketSession = createAndSetSocketSession(client, ctx.channel(), metrics);
updateClientInfo(req, socketSession);
try(MdcContextReference ref = BridgeMdcUtil.captureAndInitializeContext(socketSession)) {
logger.trace("Getting ready to call session listeners [{}]", sessionListeners);
sessionListeners.forEach((l) -> { l.onSessionCreated(socketSession); });
if(socketSession.isInitialized()) {
sessionRegistry.putSession(socketSession);
}
}
}
}
/**
* handle WebSocket request,then, the the RPC could happen in WebSocket.
*
* @param ctx
* @param request
*/
protected void handleWebSocket(final ChannelHandlerContext ctx, FullHttpRequest request) {
if (logger.isDebugEnabled()) {
logger.debug("handleWebSocket request: uri={}", request.uri());
}
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(request.uri(), null, true);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(request);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
return;
}
ChannelFutureListener callback = websocketHandshakeListener(ctx, request);
ChannelFuture future = handshaker.handshake(ctx.channel(), request);
if (callback != null) {
future.addListener(callback);
}
ChannelPipeline pipe = ctx.pipeline();
if (pipe.get(WebsocketFrameHandler.class) == null) {
pipe.addAfter(ctx.name(), "wsFrameHandler", new WebsocketFrameHandler(handshaker));
ChannelHandler handlerAws = pipe.get(AwsProxyProtocolDecoder.class);
if (handlerAws != null) {
pipe.remove(handlerAws);
}
pipe.remove(ctx.name());// Remove current Handler
}
}
private void handleWebSocketResponse(ChannelHandlerContext ctx, Outgoing out) {
WebSocketHttpResponse response = (WebSocketHttpResponse) out.message;
WebSocketServerHandshaker handshaker = response.handshakerFactory().newHandshaker(lastRequest);
if (handshaker == null) {
HttpResponse res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.UPGRADE_REQUIRED);
res.headers().set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, WebSocketVersion.V13.toHttpHeaderValue());
HttpUtil.setContentLength(res, 0);
super.unbufferedWrite(ctx, new Outgoing(res, out.promise));
response.subscribe(new CancelledSubscriber<>());
} else {
// First, insert new handlers in the chain after us for handling the websocket
ChannelPipeline pipeline = ctx.pipeline();
HandlerPublisher<WebSocketFrame> publisher = new HandlerPublisher<>(ctx.executor(), WebSocketFrame.class);
HandlerSubscriber<WebSocketFrame> subscriber = new HandlerSubscriber<>(ctx.executor());
pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-subscriber", subscriber);
pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-publisher", publisher);
// Now remove ourselves from the chain
ctx.pipeline().remove(ctx.name());
// Now do the handshake
// Wrap the request in an empty request because we don't need the WebSocket handshaker ignoring the body,
// we already have handled the body.
handshaker.handshake(ctx.channel(), new EmptyHttpRequest(lastRequest));
// And hook up the subscriber/publishers
response.subscribe(subscriber);
publisher.subscribe(response);
}
}
private void handleHandshake(ChannelHandlerContext ctx, HttpRequest req) {
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketURL(req), null, true);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}
else {
handshaker.handshake(ctx.channel(), req);
}
}