下面列出了怎么用 io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory 的API类实例代码及写法,或者点击链接到github查看源代码。
private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req)
throws Exception {
// Handle a bad request.
if (!req.decoderResult().isSuccess()) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
return;
}
// Allow only GET methods.
if (req.method() != GET) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
return;
}
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(req), null, false, Integer.MAX_VALUE);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
}
@Override
public void write(Response response) {
if (response instanceof UpgradeResponse) {
UpgradeResponse upgradeResponse = (UpgradeResponse) response;
if (upgradeResponse.isUpgrade()) {
String wsUrl = "ws://" + headers().get(HttpHeaderNames.HOST) + uri();
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
wsUrl, null, false
);
webSocketServerHandshaker = wsFactory.newHandshaker(this);
if (webSocketServerHandshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(context.channel());
} else {
webSocketServerHandshaker.handshake(context.channel(), this);
}
} else {
HttpResponse httpResponse = new HttpResponse(response.getMessage());
httpResponse.setAlive(false);
httpResponse.setZip(true);
super.write(httpResponse);
}
} else {
super.write(response);
}
}
/**
* 处理Http请求,完成WebSocket握手<br/>
* 注意:WebSocket连接第一次请求使用的是Http
*
* @param ctx
* @param request
* @throws Exception
*/
private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest request)
throws Exception {
// 如果HTTP解码失败,返回HHTP异常
if (!request.getDecoderResult().isSuccess()
|| (!"websocket".equals(request.headers().get("Upgrade")))) {
sendHttpResponse(ctx, request,
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
// 正常WebSocket的Http连接请求,构造握手响应返回
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
"ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false);
handshaker = wsFactory.newHandshaker(request);
if (handshaker == null) {
// 无法处理的websocket版本
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
// 向客户端发送websocket握手,完成握手
logger.debug("向客户端发送websocket握手,完成握手");
handshaker.handshake(ctx.channel(), request);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof FullHttpRequest){
FullHttpRequest req = (FullHttpRequest) msg;
//check if websocket upgrade encountered
if(req.headers().contains("Upgrade") || req.headers().contains("upgrade")) {
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(req, ctx), null, true, 1024 * 1024 * 1024);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
return;
}
}
super.channelRead(ctx, msg);
}
private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req)
throws Exception {
// Handle a bad request.
if (!req.getDecoderResult().isSuccess()) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
return;
}
// Allow only GET methods.
if (req.getMethod() != GET) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
return;
}
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(req), null, false, Integer.MAX_VALUE);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
}
private void handshake(final ChannelHandlerContext ctx, final FullHttpRequest req, final String requestPath) {
WebSocketServerHandshakerFactory wsFactory =
new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, true, maxWebSocketFrameSize);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
if (handshaker != null) {
handshaker.handshake(ctx.channel(), req).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
final String sessionId = PipelineUtils.getSessionId(requestPath);
if (future.isSuccess()) {
ctx.channel().pipeline().addBefore(
SocketIOChannelInitializer.SOCKETIO_WEBSOCKET_HANDLER,
SocketIOChannelInitializer.WEBSOCKET_FRAME_AGGREGATOR,
new WebSocketFrameAggregator(maxWebSocketFrameSize));
connect(ctx, req, sessionId);
} else {
log.error("Can't handshake: {}", sessionId, future.cause());
}
}
});
} else {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}
}
protected WebSocketServerHandshaker createWebSocketHandshaker(ChannelHandlerContext ctx, FullHttpRequest request) {
// TODO verify it isn't already open...
// TODO inject the factory
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(request),
null,
false,
serverConfig.getMaxFrameSize()
);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(request);
return handshaker;
}
@Override
public void sendResponse(FullHttpRequest req, ChannelHandlerContext ctx) throws Exception {
Client client = clientFactory.get(ctx.channel());
// Handshake
WebSocketServerHandshaker handshaker = createWebSocketHandshaker(ctx, req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
// The chunked write handler interferes with large websocket messages
// so it needs to be removed from the pipeline since we are setting up
// a websocket here.
ctx.pipeline().remove(Bridge10ChannelInitializer.CHUNKED_WRITE_HANDLER);
// Only create the session after the handshake.
// at this point the session is not fully initialized
// because we haven't gotten a message from the device that
// can identify it. We only put the session in the registry when it
// is initialized
metrics.incSocketCounter();
Session socketSession = createAndSetSocketSession(client, ctx.channel(), metrics);
updateClientInfo(req, socketSession);
try(MdcContextReference ref = BridgeMdcUtil.captureAndInitializeContext(socketSession)) {
logger.trace("Getting ready to call session listeners [{}]", sessionListeners);
sessionListeners.forEach((l) -> { l.onSessionCreated(socketSession); });
if(socketSession.isInitialized()) {
sessionRegistry.putSession(socketSession);
}
}
}
}
/**
* handle WebSocket request,then, the the RPC could happen in WebSocket.
*
* @param ctx
* @param request
*/
protected void handleWebSocket(final ChannelHandlerContext ctx, FullHttpRequest request) {
if (logger.isDebugEnabled()) {
logger.debug("handleWebSocket request: uri={}", request.uri());
}
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(request.uri(), null, true);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(request);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
return;
}
ChannelFutureListener callback = websocketHandshakeListener(ctx, request);
ChannelFuture future = handshaker.handshake(ctx.channel(), request);
if (callback != null) {
future.addListener(callback);
}
ChannelPipeline pipe = ctx.pipeline();
if (pipe.get(WebsocketFrameHandler.class) == null) {
pipe.addAfter(ctx.name(), "wsFrameHandler", new WebsocketFrameHandler(handshaker));
ChannelHandler handlerAws = pipe.get(AwsProxyProtocolDecoder.class);
if (handlerAws != null) {
pipe.remove(handlerAws);
}
pipe.remove(ctx.name());// Remove current Handler
}
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
// Handle a bad request.
if (!req.getDecoderResult().isSuccess()) {
logger.warn(String.format("Bad request: %s", req.getUri()));
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
return;
}
// Allow only GET methods.
if (req.getMethod() != GET) {
logger.warn(String.format("Unsupported HTTP method: %s", req.getMethod()));
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
return;
}
// enable subclasses to do additional processing
if (!additionalHttpRequestHandler(ctx, req)) {
return;
}
// Handshake
WebSocketServerHandshakerFactory wsFactory
= new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, true);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
WebsocketSinkServer.channels.add(ctx.channel());
}
}
public DefaultWebSocketHttpResponse(HttpVersion version, HttpResponseStatus status,
Processor<WebSocketFrame, WebSocketFrame> processor,
WebSocketServerHandshakerFactory handshakerFactory) {
super(version, status);
this.processor = processor;
this.handshakerFactory = handshakerFactory;
}
public DefaultWebSocketHttpResponse(HttpVersion version, HttpResponseStatus status, boolean validateHeaders,
Processor<WebSocketFrame, WebSocketFrame> processor,
WebSocketServerHandshakerFactory handshakerFactory) {
super(version, status, validateHeaders);
this.processor = processor;
this.handshakerFactory = handshakerFactory;
}
@Override
public void handle(RakamHttpRequest request) {
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(request), null, true);
handshaker = wsFactory.newHandshaker(request.getRequest());
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(request.context().channel());
} else {
HttpRequest request1 = request.getRequest();
DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(request1.getProtocolVersion(), request1.getMethod(), request1.getUri());
defaultFullHttpRequest.headers().set(request1.headers());
handshaker.handshake(request.context().channel(), defaultFullHttpRequest);
onOpen(new WebSocketRequest(request));
}
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
// Allow only GET methods.
if (req.method() != GET) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
return;
}
// Handshake
String supportedProtocolsCSV = StringUtil.joinStringList(supportedProtocols, ",");
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV, false, maxFramePayloadLength);
this.httpRequest = req;
this.handshaker = wsFactory.newHandshaker(req);
if (this.handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
ChannelFuture handshake = this.handshaker.handshake(ctx.channel(), req);
handshake.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// we need to insert an encoder that takes the underlying ChannelBuffer of a StompFrame.toActiveMQBuffer and
// wrap it in a binary web socket frame before letting the wsencoder send it on the wire
WebSocketFrameEncoder encoder = new WebSocketFrameEncoder(maxFramePayloadLength);
future.channel().pipeline().addAfter("wsencoder", "websocket-frame-encoder", encoder);
} else {
// Handshake failed, fire an exceptionCaught event
future.channel().pipeline().fireExceptionCaught(future.cause());
}
}
});
}
}
private void handleHandshake(ChannelHandlerContext ctx, HttpRequest req) {
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketURL(req), null, true);
WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}
else {
handshaker.handshake(ctx.channel(), req);
}
}
private void doHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
if (request.decoderResult().isFailure()) {
ctx.close();
return;
}
// 握手使用get方法,所以我们控制只接受get方法
if (HttpMethod.GET != request.method()) {
ctx.close();
return;
}
String wsUrl = "ws://" + request.headers().get(HttpHeaderNames.HOST) + AKAXIN_WS_PATH;
WebSocketServerHandshakerFactory webSocketFactory = new WebSocketServerHandshakerFactory(wsUrl, null, true);
wsHandshaker = webSocketFactory.newHandshaker(request);
if (wsHandshaker != null) {
//
ChannelFuture channelFuture = wsHandshaker.handshake(ctx.channel(), request);
if (channelFuture.isSuccess()) {
// 握手并且验证用户webSessionId
QueryStringDecoder queryDecoder = new QueryStringDecoder(request.uri());
List<String> sessionIds = queryDecoder.parameters().get("sessionId");
if (sessionIds != null && sessionIds.size() > 0) {
String sessionId = sessionIds.get(0);
String siteUserId = WebSessionCache.getSiteUserId(sessionId);
// test siteUserId
siteUserId = "77151873-0fc7-4cf1-8bd6-67d00190fcf6";
if (StringUtils.isNotBlank(siteUserId)) {
ChannelSession channelSession = ctx.channel().attr(ChannelConst.CHANNELSESSION).get();
// siteUserId && sessionId 放入Channel缓存中
channelSession.setUserId(siteUserId);
WebChannelManager.addChannelSession(siteUserId, channelSession);
} else {
// cant get authed message ,so close the channel
// ctx.close();
}
} else {
ctx.close();
}
System.out.println("client handshaker success parm=" + queryDecoder.parameters());
}
} else {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
// Handle a bad request.
if (!req.decoderResult().isSuccess()) {
sendHttpResponse(ctx, req,
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
// Allow only GET methods.
if (req.method() != HttpMethod.GET) {
sendHttpResponse(ctx, req,
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN));
return;
}
String uri = req.uri();
int index = uri.indexOf("?");
String paramterStr = "";
String path = null;
if (index == -1) {
path = uri;
} else {
path = uri.substring(0, index);
paramterStr = uri.substring(index+1);
}
if (websocketPath != null && websocketPath.trim().length() > 0 && websocketPath.equals(path)) {
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
WebSocketUtil.getWebSocketLocation(ctx.pipeline(), req, websocketPath), null, true, 5 * 1024 * 1024);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
if (!ctx.channel().hasAttr(WEBSOCKET_KEY)) {
Attribute<String> attr = ctx.channel().attr(WEBSOCKET_KEY);
attr.set("");
}
if (webSocketEvent != null) {
Map<String, Object> paramter = MapUrlParamsUtil.getUrlParams(paramterStr);
webSocketEvent.onOpenEvent(baseServer, new WebSocketSession(ctx.channel()), paramter);
}
}
} else {
if (httpResource != null) {
String resFileName = path;
ByteBuf res = httpResource.buildWebSocketRes(req, resFileName);
if (null == res) {
sendHttpResponse(ctx, req,
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND));
return;
} else {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
res));
}
}
return;
}
}
private void doHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
if (request.decoderResult().isFailure()) {
ctx.close();
return;
}
// 握手使用get方法,所以我们控制只接受get方法
if (HttpMethod.GET != request.method()) {
ctx.close();
return;
}
String wsUrl = "ws://" + request.headers().get(HttpHeaderNames.HOST) + AKAXIN_WS_PATH;
WebSocketServerHandshakerFactory webSocketFactory = new WebSocketServerHandshakerFactory(wsUrl, null, true);
wsHandshaker = webSocketFactory.newHandshaker(request);
if (wsHandshaker != null) {
//
ChannelFuture channelFuture = wsHandshaker.handshake(ctx.channel(), request);
if (channelFuture.isSuccess()) {
// 握手并且验证用户webSessionId
QueryStringDecoder queryDecoder = new QueryStringDecoder(request.uri());
List<String> sessionIds = queryDecoder.parameters().get("sessionId");
if (sessionIds != null && sessionIds.size() > 0) {
String sessionId = sessionIds.get(0);
String siteUserId = WebSessionCache.getSiteUserId(sessionId);
// test siteUserId
siteUserId = "77151873-0fc7-4cf1-8bd6-67d00190fcf6";
if (StringUtils.isNotBlank(siteUserId)) {
ChannelSession channelSession = ctx.channel().attr(ChannelConst.CHANNELSESSION).get();
// siteUserId && sessionId 放入Channel缓存中
channelSession.setUserId(siteUserId);
WebChannelManager.addChannelSession(siteUserId, channelSession);
} else {
// cant get authed message ,so close the channel
// ctx.close();
}
} else {
ctx.close();
}
System.out.println("client handshaker success parm=" + queryDecoder.parameters());
}
} else {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}
}
private void doHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
if (request.decoderResult().isFailure()) {
ctx.close();
return;
}
// 握手使用get方法,所以我们控制只接受get方法
if (HttpMethod.GET != request.method()) {
ctx.close();
return;
}
String wsUrl = "ws://" + request.headers().get(HttpHeaderNames.HOST) + AKAXIN_WS_PATH;
WebSocketServerHandshakerFactory webSocketFactory = new WebSocketServerHandshakerFactory(wsUrl, null, true);
wsHandshaker = webSocketFactory.newHandshaker(request);
if (wsHandshaker != null) {
//
ChannelFuture channelFuture = wsHandshaker.handshake(ctx.channel(), request);
if (channelFuture.isSuccess()) {
// 握手并且验证用户webSessionId
QueryStringDecoder queryDecoder = new QueryStringDecoder(request.uri());
List<String> sessionIds = queryDecoder.parameters().get("sessionId");
if (sessionIds != null && sessionIds.size() > 0) {
String sessionId = sessionIds.get(0);
String siteUserId = WebSessionCache.getSiteUserId(sessionId);
// test siteUserId
siteUserId = "77151873-0fc7-4cf1-8bd6-67d00190fcf6";
if (StringUtils.isNotBlank(siteUserId)) {
ChannelSession channelSession = ctx.channel().attr(ChannelConst.CHANNELSESSION).get();
// siteUserId && sessionId 放入Channel缓存中
channelSession.setUserId(siteUserId);
WebChannelManager.addChannelSession(siteUserId, channelSession);
} else {
// cant get authed message ,so close the channel
// ctx.close();
}
} else {
ctx.close();
}
System.out.println("client handshaker success parm=" + queryDecoder.parameters());
}
} else {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object req) throws Exception {
if (req instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) req;
// ----- Client authenticity check code -----
String origin = request.headers().get(HttpHeaders.ORIGIN);
if (origin != null) {
switch (origin) {
case "http://slither.io":
case "https://slither.io":
case "http://localhost":
case "https://localhost":
case "http://127.0.0.1":
case "https://127.0.0.1":
break;
default:
ctx.channel().close();
return;
}
}
// -----/Client authenticity check code -----
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.HOST) + "/", null, true);
handshaker = wsFactory.newHandshaker(request);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), request);
}
} else if (req instanceof WebSocketFrame) {
WebSocketFrame frame = (WebSocketFrame) req;
if (req instanceof CloseWebSocketFrame) {
if (handshaker != null) {
handshaker.close(ctx.channel(), ((CloseWebSocketFrame) req).retain());
}
} else if (req instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
} else {
ctx.fireChannelRead(frame.retain());
}
}
}
@SuppressWarnings("FutureReturnValueIgnored")
WebsocketServerOperations(String wsUrl, WebsocketServerSpec websocketServerSpec, HttpServerOperations replaced) {
super(replaced);
this.proxyPing = websocketServerSpec.handlePing();
Channel channel = replaced.channel();
onCloseState = MonoProcessor.create();
// Handshake
WebSocketServerHandshakerFactory wsFactory =
new WebSocketServerHandshakerFactory(wsUrl, websocketServerSpec.protocols(), true, websocketServerSpec.maxFramePayloadLength());
handshaker = wsFactory.newHandshaker(replaced.nettyRequest);
if (handshaker == null) {
//"FutureReturnValueIgnored" this is deliberate
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channel);
handshakerResult = null;
}
else {
removeHandler(NettyPipeline.HttpTrafficHandler);
removeHandler(NettyPipeline.AccessLogHandler);
removeHandler(NettyPipeline.HttpMetricsHandler);
handshakerResult = channel.newPromise();
HttpRequest request = new DefaultFullHttpRequest(replaced.version(),
replaced.method(),
replaced.uri());
request.headers()
.set(replaced.nettyRequest.headers());
if (websocketServerSpec.compress()) {
removeHandler(NettyPipeline.CompressionHandler);
WebSocketServerCompressionHandler wsServerCompressionHandler =
new WebSocketServerCompressionHandler();
try {
wsServerCompressionHandler.channelRead(channel.pipeline()
.context(NettyPipeline.ReactiveBridge),
request);
addHandlerFirst(NettyPipeline.WsCompressionHandler, wsServerCompressionHandler);
} catch (Throwable e) {
log.error(format(channel(), ""), e);
}
}
handshaker.handshake(channel,
request,
replaced.responseHeaders
.remove(HttpHeaderNames.TRANSFER_ENCODING),
handshakerResult)
.addListener(f -> {
if (replaced.rebind(this)) {
markPersistent(false);
}
else {
log.debug("Cannot bind WebsocketServerOperations after the handshake.");
}
});
}
}
protected void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req)
throws Exception {
// Handle a bad request.
if (!req.getDecoderResult().isSuccess()) {
httpFileHandler.sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
// If you're going to do normal HTTP POST authentication before upgrading the
// WebSocket, the recommendation is to handle it right here
if (req.getMethod() == HttpMethod.POST) {
httpFileHandler.sendError(ctx, HttpResponseStatus.FORBIDDEN);
return;
}
// Allow only GET methods.
if (req.getMethod() != HttpMethod.GET) {
httpFileHandler.sendError(ctx, HttpResponseStatus.FORBIDDEN);
return;
}
// Send the demo page and favicon.ico
if ("/".equals(req.getUri())) {
httpFileHandler.sendRedirect(ctx, "/index.html");
return;
}
// check for websocket upgrade request
String upgradeHeader = req.headers().get("Upgrade");
if (upgradeHeader != null && "websocket".equalsIgnoreCase(upgradeHeader)) {
// Handshake. Ideally you'd want to configure your websocket uri
String url = "ws://" + req.headers().get("Host") + "/wsticker";
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(url, null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), req);
}
} else {
boolean handled = handleREST(ctx, req);
if (!handled) {
httpFileHandler.sendFile(ctx, req);
}
}
}
@Override
public WebSocketServerHandshakerFactory handshakerFactory() {
return handshakerFactory;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object req) throws Exception {
if (req instanceof FullHttpRequest) {
FullHttpRequest request = (FullHttpRequest) req;
// ----- Client authenticity check code -----
// !!!!! WARNING !!!!!
// THE BELOW SECTION OF CODE CHECKS TO ENSURE THAT CONNECTIONS ARE COMING
// FROM THE OFFICIAL AGAR.IO CLIENT. IF YOU REMOVE OR MODIFY THE BELOW
// SECTION OF CODE TO ALLOW CONNECTIONS FROM A CLIENT ON A DIFFERENT DOMAIN,
// YOU MAY BE COMMITTING COPYRIGHT INFRINGEMENT AND LEGAL ACTION MAY BE TAKEN
// AGAINST YOU. THIS SECTION OF CODE WAS ADDED ON JULY 9, 2015 AT THE REQUEST
// OF THE AGAR.IO DEVELOPERS.
String origin = request.headers().get(HttpHeaders.ORIGIN);
if (origin != null) {
switch (origin) {
case "http://agar.io":
case "https://agar.io":
case "http://localhost":
case "https://localhost":
case "http://127.0.0.1":
case "https://127.0.0.1":
break;
default:
ctx.channel().close();
return;
}
}
// -----/Client authenticity check code -----
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.HOST) + "/", null, true);
handshaker = wsFactory.newHandshaker(request);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
handshaker.handshake(ctx.channel(), request);
}
} else if (req instanceof WebSocketFrame) {
WebSocketFrame frame = (WebSocketFrame) req;
if (req instanceof CloseWebSocketFrame) {
if (handshaker != null) {
handshaker.close(ctx.channel(), ((CloseWebSocketFrame) req).retain());
}
} else if (req instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
} else {
ctx.fireChannelRead(frame.retain());
}
}
}
private void handleHttpRequest(ChannelHandlerContext ctx, HttpRequest req) {
// Handle a bad request.
if (!req.decoderResult().isSuccess()) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST, ctx.alloc().buffer(0)));
return;
}
// Allow only GET methods.
if (!GET.equals(req.method())) {
// Let the rest of the pipeline handle this.
ctx.fireChannelRead(req);
return;
}
// Only handle the initial HTTP upgrade request
if (!(req.headers().contains("Connection", "upgrade", true) &&
req.headers().contains("Sec-WebSocket-Version"))) {
ctx.fireChannelRead(req);
return;
}
// Is this something we should try and handle?
Optional<Consumer<Message>> maybeHandler = factory.apply(
req.uri(),
msg -> {
ctx.channel().writeAndFlush(Require.nonNull("Message to send", msg));
});
if (!maybeHandler.isPresent()) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST, ctx.alloc().buffer(0)));
return;
}
// Handshake
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(req), null, false, Integer.MAX_VALUE);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
ChannelFuture future = handshaker.handshake(ctx.channel(), req);
future.addListener((ChannelFutureListener) channelFuture -> {
if (!future.isSuccess()) {
ctx.fireExceptionCaught(future.cause());
} else {
ctx.channel().attr(key).setIfAbsent(maybeHandler.get());
}
});
}
}
/**
* Get the handshaker factory to use to reconfigure the channel.
*
* @return The handshaker factory.
*/
WebSocketServerHandshakerFactory handshakerFactory();