下面列出了怎么用 io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker 的API类实例代码及写法,或者点击链接到github查看源代码。
private void handshake(final ChannelHandlerContext ctx, final FullHttpRequest req, final String requestPath) {
WebSocketServerHandshakerFactory wsFactory =
new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, true, maxWebSocketFrameSize);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
if (handshaker != null) {
handshaker.handshake(ctx.channel(), req).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
final String sessionId = PipelineUtils.getSessionId(requestPath);
if (future.isSuccess()) {
ctx.channel().pipeline().addBefore(
SocketIOChannelInitializer.SOCKETIO_WEBSOCKET_HANDLER,
SocketIOChannelInitializer.WEBSOCKET_FRAME_AGGREGATOR,
new WebSocketFrameAggregator(maxWebSocketFrameSize));
connect(ctx, req, sessionId);
} else {
log.error("Can't handshake: {}", sessionId, future.cause());
}
}
});
} else {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}
}
protected WebSocketServerHandshaker createWebSocketHandshaker(ChannelHandlerContext ctx, FullHttpRequest request) {
// TODO verify it isn't already open...
// TODO inject the factory
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(request),
null,
false,
serverConfig.getMaxFrameSize()
);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(request);
return handshaker;
}
protected void closeWebSocket(
ChannelHandlerContext ctx,
CloseWebSocketFrame frame
) {
WebSocketServerHandshaker handshaker = ctx.attr(ATTR_WEBSOCKET_HANDLER).getAndRemove();
if(handshaker != null) {
handshaker.close(ctx.channel(), frame);
}
ctx.close();
}
@Override
public void sendResponse(FullHttpRequest req, ChannelHandlerContext ctx) throws Exception {
Client client = clientFactory.get(ctx.channel());
// Handshake
WebSocketServerHandshaker handshaker = createWebSocketHandshaker(ctx, req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
// The chunked write handler interferes with large websocket messages
// so it needs to be removed from the pipeline since we are setting up
// a websocket here.
ctx.pipeline().remove(Bridge10ChannelInitializer.CHUNKED_WRITE_HANDLER);
// Only create the session after the handshake.
// at this point the session is not fully initialized
// because we haven't gotten a message from the device that
// can identify it. We only put the session in the registry when it
// is initialized
metrics.incSocketCounter();
Session socketSession = createAndSetSocketSession(client, ctx.channel(), metrics);
updateClientInfo(req, socketSession);
try(MdcContextReference ref = BridgeMdcUtil.captureAndInitializeContext(socketSession)) {
logger.trace("Getting ready to call session listeners [{}]", sessionListeners);
sessionListeners.forEach((l) -> { l.onSessionCreated(socketSession); });
if(socketSession.isInitialized()) {
sessionRegistry.putSession(socketSession);
}
}
}
}
@Override
protected void upgradeInternal(ServerHttpRequest request, ServerHttpResponse response, String selectedProtocol,
List<Extension> selectedExtensions, Endpoint endpoint) throws HandshakeFailureException {
HttpServletRequest servletRequest = getHttpServletRequest(request);
ServletHttpServletRequest httpServletRequest = ServletUtil.unWrapper(servletRequest);
if(httpServletRequest == null) {
throw new HandshakeFailureException(
"Servlet request failed to upgrade to WebSocket: " + servletRequest.getRequestURL());
}
WebSocketServerContainer serverContainer = getContainer(servletRequest);
Principal principal = request.getPrincipal();
Map<String, String> pathParams = new LinkedHashMap<>(3);
ServerEndpointRegistration endpointConfig = new ServerEndpointRegistration(servletRequest.getRequestURI(), endpoint);
endpointConfig.setSubprotocols(Arrays.asList(WebSocketServerHandshaker.SUB_PROTOCOL_WILDCARD,selectedProtocol));
if(selectedExtensions != null) {
endpointConfig.setExtensions(selectedExtensions);
}
try {
handshakeToWebsocket(httpServletRequest, selectedProtocol, maxFramePayloadLength, principal,
selectedExtensions, pathParams, endpoint,
endpointConfig, serverContainer);
} catch (Exception e) {
throw new HandshakeFailureException(
"Servlet request failed to upgrade to WebSocket: " + servletRequest.getRequestURL(), e);
}
}
/**
* handle WebSocket request,then, the the RPC could happen in WebSocket.
*
* @param ctx
* @param request
*/
protected void handleWebSocket(final ChannelHandlerContext ctx, FullHttpRequest request) {
if (logger.isDebugEnabled()) {
logger.debug("handleWebSocket request: uri={}", request.uri());
}
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(request.uri(), null, true);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(request);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
return;
}
ChannelFutureListener callback = websocketHandshakeListener(ctx, request);
ChannelFuture future = handshaker.handshake(ctx.channel(), request);
if (callback != null) {
future.addListener(callback);
}
ChannelPipeline pipe = ctx.pipeline();
if (pipe.get(WebsocketFrameHandler.class) == null) {
pipe.addAfter(ctx.name(), "wsFrameHandler", new WebsocketFrameHandler(handshaker));
ChannelHandler handlerAws = pipe.get(AwsProxyProtocolDecoder.class);
if (handlerAws != null) {
pipe.remove(handlerAws);
}
pipe.remove(ctx.name());// Remove current Handler
}
}
private void handleWebSocketResponse(ChannelHandlerContext ctx, Outgoing out) {
WebSocketHttpResponse response = (WebSocketHttpResponse) out.message;
WebSocketServerHandshaker handshaker = response.handshakerFactory().newHandshaker(lastRequest);
if (handshaker == null) {
HttpResponse res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
HttpResponseStatus.UPGRADE_REQUIRED);
res.headers().set(HttpHeaderNames.SEC_WEBSOCKET_VERSION, WebSocketVersion.V13.toHttpHeaderValue());
HttpUtil.setContentLength(res, 0);
super.unbufferedWrite(ctx, new Outgoing(res, out.promise));
response.subscribe(new CancelledSubscriber<>());
} else {
// First, insert new handlers in the chain after us for handling the websocket
ChannelPipeline pipeline = ctx.pipeline();
HandlerPublisher<WebSocketFrame> publisher = new HandlerPublisher<>(ctx.executor(), WebSocketFrame.class);
HandlerSubscriber<WebSocketFrame> subscriber = new HandlerSubscriber<>(ctx.executor());
pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-subscriber", subscriber);
pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-publisher", publisher);
// Now remove ourselves from the chain
ctx.pipeline().remove(ctx.name());
// Now do the handshake
// Wrap the request in an empty request because we don't need the WebSocket handshaker ignoring the body,
// we already have handled the body.
handshaker.handshake(ctx.channel(), new EmptyHttpRequest(lastRequest));
// And hook up the subscriber/publishers
response.subscribe(subscriber);
publisher.subscribe(response);
}
}
private void handleHandshake(ChannelHandlerContext ctx, HttpRequest req) {
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketURL(req), null, true);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}
else {
handshaker.handshake(ctx.channel(), req);
}
}
protected WebSocketServerHandshaker getWebSocketHandshaker(ChannelHandlerContext ctx) {
return ctx.attr(ATTR_WEBSOCKET_HANDLER).get();
}
protected WebsocketFrameHandler(WebSocketServerHandshaker handshaker) {
this.handshaker = handshaker;
}