下面列出了怎么用 io.netty.handler.codec.http.HttpServerCodec 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void initChannel(SocketChannel websocketChannel) throws Exception {
if (sslConfiguration != null) {
SslHandler sslHandler = new SSLHandlerFactory(sslConfiguration).create();
websocketChannel.pipeline().addLast("ssl", sslHandler);
}
ChannelPipeline p = websocketChannel.pipeline();
p.addLast("codec", new HttpServerCodec());
p.addLast("aggregator", new HttpObjectAggregator(65536));
p.addLast("frameAggregator", new WebSocketFrameAggregator(Integer.MAX_VALUE));
InboundWebsocketSourceHandler sourceHandler = new InboundWebsocketSourceHandler();
sourceHandler.setClientBroadcastLevel(clientBroadcastLevel);
sourceHandler.setDispatchToCustomSequence(dispatchToCustomSequence);
sourceHandler.setPortOffset(portOffset);
if (outflowDispatchSequence != null)
sourceHandler.setOutflowDispatchSequence(outflowDispatchSequence);
if (outflowErrorSequence != null)
sourceHandler.setOutflowErrorSequence(outflowErrorSequence);
if (subprotocolHandlers != null)
sourceHandler.setSubprotocolHandlers(subprotocolHandlers);
if (pipelineHandler != null)
p.addLast("pipelineHandler", pipelineHandler.getClass().getConstructor().newInstance());
p.addLast("handler", sourceHandler);
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
ChannelConfig channelConfig = new ChannelConfig();
//判断是否是https
//是https获取对应的sslengine 添加sslhandler
if (sslContext != null)
{
// DeterMineHttpsHandler deterMineHttpsHandler = new DeterMineHttpsHandler();
// p.addLast("determine",deterMineHttpsHandler);
// p.addLast("sslcontroller",new SslController(sslContext, deterMineHttpsHandler, socketChannel,channelConfig));
channelConfig.setPosibleHttps(true);
p.addLast("sslHandler",sslContext.newHandler(socketChannel.alloc()));
}
p.addLast("httpservercode",new HttpServerCodec(4096, 8192, 8192,false));
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpDispatcherServerHandler(channelConfig));
p.addLast(new HttpRPCHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new IdleStateHandler(0, 0, 60));
pipeline.addLast(new WebSocketServerProtocolHandler(WEB_SOCKET_PATH, null, true));
pipeline.addLast(new WebSocketHandler(litchi));
for (ChannelHandler handler : handlers) {
pipeline.addLast(handler);
}
}
private void initializeWebSocketTransport(final NewNettyMQTTHandler handler, Config conf) {
LOG.debug("Configuring Websocket MQTT transport");
if(!conf.websocketEnabled) {
return;
}
int port = conf.websocketPort;// Integer.parseInt(webSocketPortProp);
final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler();
String host = conf.websocketHost;
initFactory(host, port, "Websocket MQTT", new PipelineInitializer() {
@Override
void init(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("webSocketHandler",
new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST));
pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
configureMQTTPipeline(pipeline, timeoutHandler, handler);
}
});
}
public void addHandlers(final Channel ch) {
ch.pipeline().addBefore(AbstractChannelInitializer.FIRST_ABSTRACT_HANDLER, HTTP_SERVER_CODEC, new HttpServerCodec());
ch.pipeline().addAfter(HTTP_SERVER_CODEC, HTTP_OBJECT_AGGREGATOR, new HttpObjectAggregator(WEBSOCKET_MAX_CONTENT_LENGTH));
final String webSocketPath = websocketListener.getPath();
final String subprotocols = getSubprotocolString();
final boolean allowExtensions = websocketListener.getAllowExtensions();
ch.pipeline().addAfter(HTTP_OBJECT_AGGREGATOR, WEBSOCKET_SERVER_PROTOCOL_HANDLER, new WebSocketServerProtocolHandler(webSocketPath, subprotocols, allowExtensions, Integer.MAX_VALUE));
ch.pipeline().addAfter(WEBSOCKET_SERVER_PROTOCOL_HANDLER, WEBSOCKET_BINARY_FRAME_HANDLER, new WebSocketBinaryFrameHandler());
ch.pipeline().addAfter(WEBSOCKET_BINARY_FRAME_HANDLER, WEBSOCKET_CONTINUATION_FRAME_HANDLER, new WebSocketContinuationFrameHandler());
ch.pipeline().addAfter(WEBSOCKET_BINARY_FRAME_HANDLER, WEBSOCKET_TEXT_FRAME_HANDLER, new WebSocketTextFrameHandler());
ch.pipeline().addAfter(WEBSOCKET_TEXT_FRAME_HANDLER, MQTT_WEBSOCKET_ENCODER, new MQTTWebsocketEncoder());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
PREVIEW_STARTED.inc();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(inboundIpTracking);
if (serverTlsContext != null && serverTlsContext.useTls()) {
SSLEngine engine = serverTlsContext.getContext().newEngine(ch.alloc());
engine.setNeedClientAuth(serverConfig.isTlsNeedClientAuth());
engine.setUseClientMode(false);
pipeline.addLast(FILTER_SSL, new SslHandler(engine));
}
pipeline.addLast(FILTER_CODEC, new HttpServerCodec());
pipeline.addLast(FILTER_HTTP_AGGREGATOR, new HttpObjectAggregator(65536));
pipeline.addLast("ChunkedWriteHandler", new ChunkedWriteHandler());
pipeline.addLast("bind-client-context", bindClient);
pipeline.addLast(FILTER_HANDLER, handlerProvider.get());
pipeline.addLast(outboundIpTracking);
ch.pipeline().addAfter(FILTER_HTTP_AGGREGATOR, "corshandler", new CorsHandler(corsConfig.build()));
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
UPLOAD_STARTED.inc();
pipeline.addLast(inboundIpTracking);
if (serverTlsContext != null && serverTlsContext.useTls()) {
SSLEngine engine = serverTlsContext.getContext().newEngine(ch.alloc());
SslHandler handler = new SslHandler(engine);
handler.setHandshakeTimeout(config.getSslHandshakeTimeout(), TimeUnit.MILLISECONDS);
handler.setCloseNotifyTimeout(config.getSslCloseNotifyTimeout(), TimeUnit.MILLISECONDS);
engine.setNeedClientAuth(serverConfig.isTlsNeedClientAuth());
engine.setUseClientMode(false);
pipeline.addLast(FILTER_SSL, handler);
}
pipeline.addLast(FILTER_CODEC, new HttpServerCodec());
pipeline.addLast(FILTER_HTTP_AGGREGATOR, new HttpObjectAggregator(config.getMaxPreviewSize()));
pipeline.addLast(FILTER_HANDLER, handlerProvider.get());
pipeline.addLast(outboundIpTracking);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(inboundIpTracking);
if (serverTlsContext != null && serverTlsContext.useTls()) {
SSLEngine engine = serverTlsContext.getContext().newEngine(ch.alloc());
engine.setNeedClientAuth(serverConfig.isTlsNeedClientAuth());
engine.setUseClientMode(false);
pipeline.addLast(FILTER_SSL, new SslHandler(engine));
}
pipeline.addLast(FILTER_CODEC, new HttpServerCodec());
pipeline.addLast(FILTER_HTTP_AGGREGATOR, new HttpObjectAggregator(65536));
pipeline.addLast("bind-client-context", oauthBindClientContext);
pipeline.addLast(FILTER_HANDLER, handlerProvider.get());
pipeline.addLast(outboundIpTracking);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
VIDEO_CONNECTIONS.inc();
pipeline.addLast(new IPTrackingInboundHandler());
if (serverTlsContext != null && serverTlsContext.useTls()) {
SSLEngine engine = serverTlsContext.getContext().newEngine(ch.alloc());
engine.setNeedClientAuth(serverConfig.isTlsNeedClientAuth());
engine.setUseClientMode(false);
SslHandler handler = new SslHandler(engine);
handler.setHandshakeTimeout(videoConfig.getVideoSslHandshakeTimeout(), TimeUnit.SECONDS);
handler.setCloseNotifyTimeout(videoConfig.getVideoSslCloseNotifyTimeout(), TimeUnit.SECONDS);
pipeline.addLast(FILTER_SSL, handler);
}
pipeline.addLast(FILTER_CODEC, new HttpServerCodec());
pipeline.addLast(FILTER_HTTP_AGGREGATOR, new HttpObjectAggregator(videoConfig.getVideoHttpMaxContentLength()));
pipeline.addLast(FILTER_HANDLER, channelInboundProvider.get());
pipeline.addLast(new IPTrackingOutboundHandler());
ch.pipeline().addAfter(FILTER_HTTP_AGGREGATOR, "corshandler", new CorsHandler(corsConfig.build()));
}
@Override
protected void initChannel(Channel ch) throws Exception {
Preconditions.checkNotNull(handler, "Must specify a channel handler");
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
if(maxRequestSizeBytes > 0) {
pipeline.addLast(new HttpObjectAggregator(maxRequestSizeBytes));
}
if(chunkedWrites) {
pipeline.addLast(new ChunkedWriteHandler());
}
if(clientFactory != null) {
pipeline.addLast(new BindClientContextHandler(cookieConfig, clientFactory, requestAuthorizer));
}
pipeline.addLast(handler);
}
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
ctx.pipeline().addLast(Http2MultiplexCodecBuilder.forServer(new HelloWorldHttp2Handler()).build());
return;
}
if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
ctx.pipeline().addLast(new HttpServerCodec(),
new HttpObjectAggregator(MAX_CONTENT_LENGTH),
new HelloWorldHttp1Handler("ALPN Negotiation"));
return;
}
throw new IllegalStateException("unknown protocol: " + protocol);
}
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
ctx.pipeline().addLast(Http2FrameCodecBuilder.forServer().build(), new HelloWorldHttp2Handler());
return;
}
if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
ctx.pipeline().addLast(new HttpServerCodec(),
new HttpObjectAggregator(MAX_CONTENT_LENGTH),
new HelloWorldHttp1Handler("ALPN Negotiation"));
return;
}
throw new IllegalStateException("unknown protocol: " + protocol);
}
@Override
protected void configure(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
switch (testMode) {
case INTERMEDIARY:
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1));
p.addLast(new HttpIntermediaryHandler());
break;
case TERMINAL:
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1));
p.addLast(new HttpTerminalHandler());
break;
case UNRESPONSIVE:
p.addLast(UnresponsiveHandler.INSTANCE);
break;
}
}
@Override
protected void initChannel(SocketChannel e) throws Exception {
//protobuf处理器
// e.pipeline().addLast("frameDecoder", new ProtobufVarint32FrameDecoder());//用于Decode前解决半包和粘包问题
//此处需要传入自定义的protobuf的defaultInstance
// e.pipeline().addLast("protobufDecoder",new ProtobufDecoder();
//将请求和应答消息解码为http消息
e.pipeline().addLast("http-codec", new HttpServerCodec());
//HttpObjectAggregator:将Http消息的多个部分合成一条完整的消息
e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
//ChunkedWriteHandler:向客户端发送HTML5文件
e.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
//在管道中添加我们自己实现的接收数据实现方法
e.pipeline().addLast("handler", new WebSocketServerHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//添加闲置处理,60秒没有数据传输,触发事件
pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
//将字节解码为HttpMessage对象,并将HttpMessage对象编码为字节
pipeline.addLast(new HttpServerCodec());
//出站数据压缩
pipeline.addLast(new HttpContentCompressor());
//聚合多个HttpMessage为单个FullHttpRequest
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
//如果被请求的端点是/ws,则处理该升级握手
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//聊天消息处理
pipeline.addLast(new ChatServerHandler());
//心跳处理
pipeline.addLast(new HeartbeatHandler());
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 1) {
return;
}
final int magic = in.getByte(in.readerIndex());
ChannelPipeline p = ctx.pipeline();
//处理http
if (isHttp(magic)) {
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new RpcxHttpHandler(nettyServer));
p.remove(this);
} else {//处理二进制
p.addLast(new NettyEncoder());
p.addLast(new NettyDecoder());
p.addLast(new IdleStateHandler(0, 0, serverChannelMaxIdleTimeSeconds));
p.addLast(new NettyConnetManageHandler(nettyServer));
p.addLast(new NettyServerHandler(nettyServer));
p.remove(this);
}
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
HttpRequestHandler httpRequestHandler = null;
if (httpResourcePath == null) {
httpRequestHandler = new HttpRequestHandler("/ws");
} else {
httpRequestHandler = new HttpRequestHandler("/ws", httpResourcePath);
}
pipeline.addLast(httpRequestHandler);
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TtyWebSocketFrameHandler(group, handler, HttpRequestHandler.class));
}
public static void start(final int port) throws Exception {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup woker = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.channel(NioServerSocketChannel.class).group(boss, woker)
.childOption(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("http-decoder", new HttpServerCodec());
ch.pipeline().addLast(new HttpServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("server start ok port is " + port);
DataCenter.start();
future.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
woker.shutdownGracefully();
}
}
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
ctx.pipeline().addLast(new Http2HandlerBuilder().build());
return;
}
if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
ctx.pipeline().addLast(new HttpServerCodec(),
new HttpObjectAggregator(MAX_CONTENT_LENGTH),
new Http1Handler("ALPN Negotiation"));
return;
}
throw new IllegalStateException("unknown protocol: " + protocol);
}
public WsServer() {
executor = new SimpleExecutor<Command, CommandResponse>();
loadExecutor(executor);
// 负责对外连接线程
parentGroup = new NioEventLoopGroup();
// 负责对内分发业务的线程
childGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup, childGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 30秒空闲时间设置
ch.pipeline().addLast(new IdleStateHandler(30, 0, 60));
// HttpServerCodec:将请求和应答消息解码为HTTP消息
ch.pipeline().addLast(new HttpServerCodec());
// 针对大文件上传时,把 HttpMessage 和 HttpContent 聚合成一个
// FullHttpRequest,并定义可以接受的数据大小64M(可以支持params+multipart)
ch.pipeline().addLast(new HttpObjectAggregator(64 * 1024));
// 针对大文件下发,分块写数据
ch.pipeline().addLast(new ChunkedWriteHandler());
// WebSocket 访问地址
// ch.pipeline().addLast(new WebSocketServerProtocolHandler("/akaxin/ws"));
// 自定义handler
ch.pipeline().addLast(new WsServerHandler(executor));
}
});
}
@Override
public void start() {
ServerBootstrap bootstrap = new ServerBootstrap()
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, DEFAULT_WRITE_LOW_WATER_MARK)
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_WRITE_HIGH_WATER_MARK)
.channel(NioServerSocketChannel.class)
.group(BOSS, WORKER)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pip = ch.pipeline();
pip.addLast(new IdleStateHandler(0, 0, 30 * 60 * 1000))
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(1024 * 1024))
.addLast(new WebSocketServerProtocolHandler("/ws"))
.addLast(new WebSocketFrameAggregator(1024 * 1024 * 1024))
.addLast(new RequestDecoder(new DefaultRequestEncryption(new RSAEncryption())))
.addLast(new WebSocketEncoder())
.addLast(new TabHandler())
.addLast(new HostsValidatorHandler(serverFinder))
.addLast(new UiRequestHandler(
commandStore,
uiConnectionStore,
agentConnectionStore,
sessionManager));
}
});
try {
this.channel = bootstrap.bind(port).sync().channel();
logger.info("client server startup successfully, port {}", port);
} catch (Exception e) {
logger.error("netty server for ui start fail", e);
throw Throwables.propagate(e);
}
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerCompressionHandler());
pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, subProtocols, true));
pipeline.addLast(new WebSocketRemoteServerFrameHandler());
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(NetUtils.READ_TIMEOUT_HANDLER_NAME, new ReadTimeoutHandler(NetUtils.DEFAULT_READ_TIMEOUT));
pipeline.addLast(new HttpServerCodec());
// http请求和响应可能被分段,利用聚合器将http请求合并为完整的Http请求
pipeline.addLast(new HttpObjectAggregator(65535));
pipeline.addLast(new HttpRequestParamDecoder(portExtraInfo));
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 1) {
return;
}
// read one byte to guess protocol
final int magic = in.getByte(in.readerIndex());
ChannelPipeline p = ctx.pipeline();
p.addLast(new LocalHostPermitHandler(acceptForeignIp));
if (isHttp(magic)) {
// no welcome output for http protocol
if (welcomeFuture != null && welcomeFuture.isCancellable()) {
welcomeFuture.cancel(false);
}
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpProcessHandler());
p.remove(this);
} else {
p.addLast(new LineBasedFrameDecoder(2048));
p.addLast(new StringDecoder(CharsetUtil.UTF_8));
p.addLast(new StringEncoder(CharsetUtil.UTF_8));
p.addLast(new IdleStateHandler(0, 0, 5 * 60));
p.addLast(new TelnetProcessHandler());
p.remove(this);
}
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("codec-http", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("handler", new Ipcd10WebSocketServerHandler(false));
}
@Override
public void afterStart() {
try {
if (handlerList.isEmpty()) {
throw new Exception("ChannelHandler is null");
}
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.SO_BACKLOG, 12000).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
for (ChannelHandler handler : handlerList) {
ch.pipeline().addLast(handler);
}
}
});
b.bind(port);
LOGGER.info("-----> connector started: http://127.0.0.1:{}/", port);
} catch (Exception e) {
LOGGER.error("{}", e);
}
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpServerCodec()); /*HTTP 服务的解码器*/
p.addLast(new HttpObjectAggregator(65536)); /*HTTP 消息的合并处理*/
p.addLast(new NettyHttpServerHandler());
}
/** {@inheritDoc} */
@Override
public void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline();
HttpRequestHandlerChain apiDescriptionRequestHandler =
new ApiDescriptionRequestHandler(connectorType);
HttpRequestHandlerChain invalidRequestHandler = new InvalidRequestHandler();
int maxRequestSize = ConfigManager.getInstance().getMaxRequestSize();
if (sslCtx != null) {
pipeline.addLast("ssl", sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast("http", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(maxRequestSize));
HttpRequestHandlerChain httpRequestHandlerChain = apiDescriptionRequestHandler;
if (ConnectorType.BOTH.equals(connectorType)
|| ConnectorType.INFERENCE_CONNECTOR.equals(connectorType)) {
httpRequestHandlerChain =
httpRequestHandlerChain.setNextHandler(
new InferenceRequestHandler(
PluginsManager.getInstance().getInferenceEndpoints()));
}
if (ConnectorType.BOTH.equals(connectorType)
|| ConnectorType.MANAGEMENT_CONNECTOR.equals(connectorType)) {
httpRequestHandlerChain =
httpRequestHandlerChain.setNextHandler(
new ManagementRequestHandler(
PluginsManager.getInstance().getManagementEndpoints()));
}
httpRequestHandlerChain.setNextHandler(invalidRequestHandler);
pipeline.addLast("handler", new HttpRequestHandler(apiDescriptionRequestHandler));
}
@Override
protected void initSocketChannel(SocketChannel ch) {
super.initSocketChannel(ch);
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(65536));
ch.pipeline().addLast(new ChunkedWriteHandler());
HttpServerHandler httpServerHandler = new HttpServerHandler(this, httpResource);
processHttpHandler(httpServerHandler);
ch.pipeline().addLast("http", httpServerHandler);
}
@Override
protected void initSocketChannel(SocketChannel ch) {
super.initSocketChannel(ch);
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "HttpServerCodec", new HttpServerCodec());
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "HttpObjectAggregator", new HttpObjectAggregator(65536));
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "ChunkedWriteHandler", new ChunkedWriteHandler());
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "compressor ", new HttpContentCompressor());
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "protocol", new WebSocketServerProtocolHandler(websocketPath, "mqtt,mqttv3.1,mqttv3.1.1", true, 65536));
ch.pipeline().addBefore(HANDLER_MQTTDECODER, "mqttWebSocket", new MqttWebSocketCodec());
HttpResourceHander httpResourceHander = new HttpResourceHander(httpResource);
ch.pipeline().addLast("httpResource", httpResourceHander);
}