下面列出了怎么用 io.netty.handler.codec.http.HttpObjectAggregator 的API类实例代码及写法,或者点击链接到github查看源代码。
public void initialize() {
logger.info("Initializing the connector");
EventLoopGroup bossGroup = new NioEventLoopGroup(configuration.getIoWorkerThreadCount());
EventLoopGroup workerGroup = new NioEventLoopGroup(configuration.getIoWorkerThreadCount());
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpRequestDecoder(), new HttpResponseEncoder(), new HttpObjectAggregator(configuration.getMaxContentLength()), AbstractHttpConnector.this);
addChannelHandlers(ch.pipeline());
}
});
}
@Override
protected void initChannel(SocketChannel websocketChannel) throws Exception {
if (sslConfiguration != null) {
SslHandler sslHandler = new SSLHandlerFactory(sslConfiguration).create();
websocketChannel.pipeline().addLast("ssl", sslHandler);
}
ChannelPipeline p = websocketChannel.pipeline();
p.addLast("codec", new HttpServerCodec());
p.addLast("aggregator", new HttpObjectAggregator(65536));
p.addLast("frameAggregator", new WebSocketFrameAggregator(Integer.MAX_VALUE));
InboundWebsocketSourceHandler sourceHandler = new InboundWebsocketSourceHandler();
sourceHandler.setClientBroadcastLevel(clientBroadcastLevel);
sourceHandler.setDispatchToCustomSequence(dispatchToCustomSequence);
sourceHandler.setPortOffset(portOffset);
if (outflowDispatchSequence != null)
sourceHandler.setOutflowDispatchSequence(outflowDispatchSequence);
if (outflowErrorSequence != null)
sourceHandler.setOutflowErrorSequence(outflowErrorSequence);
if (subprotocolHandlers != null)
sourceHandler.setSubprotocolHandlers(subprotocolHandlers);
if (pipelineHandler != null)
p.addLast("pipelineHandler", pipelineHandler.getClass().getConstructor().newInstance());
p.addLast("handler", sourceHandler);
}
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
// Create a default pipeline implementation.
CorsConfig corsConfig = CorsConfig.withAnyOrigin().build();
ChannelPipeline pipeline = ch.pipeline();
// Uncomment the following line if you want HTTPS
//SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
//engine.setUseClientMode(false);
//pipeline.addLast("ssl", new SslHandler(engine));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpObjectAggregator(8388608)); // 8MB
//pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("cors", new CorsHandler(corsConfig));
pipeline.addLast("handler", new HttpStaticFileServerHandler());
}
public ChannelFuture run() {
final ServerBootstrap httpServerBootstrap = new ServerBootstrap();
httpServerBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new HttpResponseEncoder(),
new HttpRequestDecoder(),
new HttpObjectAggregator(65536),
new WebSocketServerProtocolHandler("/debug-session"),
new DebugProtocolHandler(debugWebsocketConfiguration));
}
});
LOGG.log(Level.INFO, "starting camunda BPM debug HTTP websocket interface on port "+port+".");
return httpServerBootstrap.bind(port);
}
@Override
protected void initChannel( SocketChannel _channel ) throws Exception {
ChannelPipeline pipeline = _channel.pipeline();
// 是否使用客户端模式
if( CommonConfig.getBoolean( "jees.jsts.websocket.ssl.enable", false ) ){
SSLEngine engine = sslContext1.createSSLEngine();
// 是否需要验证客户端
engine.setUseClientMode(false);
// engine.setNeedClientAuth(false);
pipeline.addFirst("ssl", new SslHandler( engine ));
}
pipeline.addLast( new IdleStateHandler( 100 , 0 , 0 , TimeUnit.SECONDS ) );
pipeline.addLast( new HttpServerCodec() );
pipeline.addLast( new ChunkedWriteHandler() );
pipeline.addLast( new HttpObjectAggregator( 8192 ) );
pipeline.addLast( new WebSocketServerProtocolHandler( CommonConfig.getString( ISocketBase.Netty_WebSocket_Url, "/" ) ) );
pipeline.addLast( CommonContextHolder.getBean( WebSocketHandler.class ) );
}
public static Bootstrap createNettyHttpClientBootstrap() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpClientCodec());
p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
p.addLast("clientResponseHandler", new SimpleChannelInboundHandler<HttpObject>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
throw new RuntimeException("Client response handler was not setup before the call");
}
});
}
});
return bootstrap;
}
protected ChannelPipeline getDefaulHttpChannelPipeline(Channel channel) throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = channel.pipeline();
SslHandler sslHandler = configureServerSSLOnDemand();
if (sslHandler != null) {
LOG.log(Level.FINE,
"Server SSL handler configured and added as an interceptor against the ChannelPipeline: {}",
sslHandler);
pipeline.addLast("ssl", sslHandler);
}
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("aggregator", new HttpObjectAggregator(maxChunkContentSize));
// Remove the following line if you don't want automatic content
// compression.
pipeline.addLast("deflater", new HttpContentCompressor());
// Set up the idle handler
pipeline.addLast("idle", new IdleStateHandler(nettyHttpServerEngine.getReadIdleTime(),
nettyHttpServerEngine.getWriteIdleTime(), 0));
return pipeline;
}
@Override
protected void configure(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
switch (testMode) {
case INTERMEDIARY:
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1));
p.addLast(new HttpIntermediaryHandler());
break;
case TERMINAL:
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1));
p.addLast(new HttpTerminalHandler());
break;
case UNRESPONSIVE:
p.addLast(UnresponsiveHandler.INSTANCE);
break;
}
}
private void switchToHttp(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("http-decoder", new HttpRequestDecoder());
p.addLast("http-aggregator", new HttpObjectAggregator(Integer.MAX_VALUE));
p.addLast("http-encoder", new HttpResponseEncoder());
//create it lazily if and when we need it
if (httpKeepAliveRunnable == null) {
long httpServerScanPeriod = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, TransportConstants.DEFAULT_HTTP_SERVER_SCAN_PERIOD, nettyAcceptor.getConfiguration());
httpKeepAliveRunnable = new HttpKeepAliveRunnable();
Future<?> future = scheduledThreadPool.scheduleAtFixedRate(httpKeepAliveRunnable, httpServerScanPeriod, httpServerScanPeriod, TimeUnit.MILLISECONDS);
httpKeepAliveRunnable.setFuture(future);
}
long httpResponseTime = ConfigurationHelper.getLongProperty(TransportConstants.HTTP_RESPONSE_TIME_PROP_NAME, TransportConstants.DEFAULT_HTTP_RESPONSE_TIME, nettyAcceptor.getConfiguration());
HttpAcceptorHandler httpHandler = new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime, ctx.channel());
ctx.pipeline().addLast("http-handler", httpHandler);
p.addLast(new ProtocolDecoder(false, true));
p.remove(this);
}
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
ctx.pipeline().addLast(new Http2HandlerBuilder().build());
return;
}
if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
ctx.pipeline().addLast(new HttpServerCodec(),
new HttpObjectAggregator(MAX_CONTENT_LENGTH),
new Http1Handler("ALPN Negotiation"));
return;
}
throw new IllegalStateException("unknown protocol: " + protocol);
}
@SuppressWarnings("FutureReturnValueIgnored")
private void releaseUploadChannel(Channel ch) {
if (ch.isOpen()) {
try {
ch.pipeline().remove(IdleTimeoutHandler.class);
ch.pipeline().remove(HttpResponseDecoder.class);
ch.pipeline().remove(HttpObjectAggregator.class);
ch.pipeline().remove(HttpRequestEncoder.class);
ch.pipeline().remove(ChunkedWriteHandler.class);
ch.pipeline().remove(HttpUploadHandler.class);
} catch (NoSuchElementException e) {
// If the channel is in the process of closing but not yet closed, some handlers could have
// been removed and would cause NoSuchElement exceptions to be thrown. Because handlers are
// removed in reverse-order, if we get a NoSuchElement exception, the following handlers
// should have been removed.
}
}
channelPool.release(ch);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
try {
ch.config().setOption(ChannelOption.IP_TOS, 0x18);
} catch (ChannelException ex) {
// IP_TOS not supported by platform, ignore
}
ch.config().setAllocator(PooledByteBufAllocator.DEFAULT);
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(65536));
ch.pipeline().addLast(new Handshaker());
ch.pipeline().addLast(new WebSocketHandler());
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new PacketEncoder());
ch.pipeline().addLast(new ClientHandler(server));
}
/**
* {@link Provides} the list of providers of {@link ChannelHandler}s that are used for https
* protocol.
*/
@Provides
@HttpsWhoisProtocol
static ImmutableList<Provider<? extends ChannelHandler>> providerHttpsWhoisHandlerProviders(
@HttpsWhoisProtocol
Provider<SslClientInitializer<NioSocketChannel>> sslClientInitializerProvider,
Provider<HttpClientCodec> httpClientCodecProvider,
Provider<HttpObjectAggregator> httpObjectAggregatorProvider,
Provider<WebWhoisMessageHandler> messageHandlerProvider,
Provider<WebWhoisActionHandler> webWhoisActionHandlerProvider) {
return ImmutableList.of(
sslClientInitializerProvider,
httpClientCodecProvider,
httpObjectAggregatorProvider,
messageHandlerProvider,
webWhoisActionHandlerProvider);
}
@Override
public void channelCreated(Channel channel) {
NioSocketChannel nioSocketChannel = (NioSocketChannel) channel;
nioSocketChannel.config().setTcpNoDelay(true).setKeepAlive(true);
final ChannelPipeline p = nioSocketChannel.pipeline();
//HTTPS
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(channel.alloc()));
}
p.addLast(new HttpClientCodec(Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE));
p.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
}
protected ChannelHandler setupWSChannel(BalancerConfiguration balancerConfig, SslContext sslCtx,
MetricResolver metricResolver, WsClientPool wsClientPool) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ssl", sslCtx.newHandler(ch.alloc()));
ch.pipeline().addLast("httpServer", new HttpServerCodec());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
ch.pipeline().addLast("sessionExtractor", new WebSocketFullRequestHandler());
ch.pipeline().addLast("idle-handler",
new IdleStateHandler(balancerConfig.getWebsocket().getTimeout(), 0, 0));
ch.pipeline().addLast("ws-protocol",
new WebSocketServerProtocolHandler(WS_PATH, null, true, 65536, false, true));
ch.pipeline().addLast("wsDecoder", new WebSocketRequestDecoder(balancerConfig.getSecurity()));
ch.pipeline().addLast("httpRelay", new WsRelayHandler(balancerConfig, metricResolver, wsClientPool));
ch.pipeline().addLast("error", new WSTimelyExceptionHandler());
}
};
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpResponseEncoder());
pipeline.addLast(new HttpRequestDecoder());
// Uncomment the following line if you don't want to handle HttpChunks.
//pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
//p.addLast(new HttpObjectAggregator(1048576));
// Remove the following line if you don't want automatic content compression.
//pipeline.addLast(new HttpContentCompressor());
// Uncomment the following line if you don't want to handle HttpContents.
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(READ_TIMEOUT));
pipeline.addLast("myHandler", new MyHandler());
pipeline.addLast("handler", new HttpServerHandler(listener));
}
public WsServer() {
executor = new SimpleExecutor<Command, CommandResponse>();
loadExecutor(executor);
// 负责对外连接线程
parentGroup = new NioEventLoopGroup();
// 负责对内分发业务的线程
childGroup = new NioEventLoopGroup();
bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup, childGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 30秒空闲时间设置
ch.pipeline().addLast(new IdleStateHandler(30, 0, 60));
// HttpServerCodec:将请求和应答消息解码为HTTP消息
ch.pipeline().addLast(new HttpServerCodec());
// 针对大文件上传时,把 HttpMessage 和 HttpContent 聚合成一个
// FullHttpRequest,并定义可以接受的数据大小64M(可以支持params+multipart)
ch.pipeline().addLast(new HttpObjectAggregator(64 * 1024));
// 针对大文件下发,分块写数据
ch.pipeline().addLast(new ChunkedWriteHandler());
// WebSocket 访问地址
// ch.pipeline().addLast(new WebSocketServerProtocolHandler("/akaxin/ws"));
// 自定义handler
ch.pipeline().addLast(new WsServerHandler(executor));
}
});
}
@Override
public final ChannelInitializer<Channel> initializer() {
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
final ChannelInboundHandlerAdapter exceptionHandler =
new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(
final ChannelHandlerContext ctx, final Throwable cause
) throws Exception {
if (cause instanceof HttpException) {
final HttpException e = (HttpException) cause;
sendResponse(ctx, e.getStatus());
return;
}
if (cause instanceof DecoderException) {
exceptionCaught(ctx, cause.getCause());
return;
}
log.error("error in pipeline: ", cause);
sendResponse(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
};
ch
.pipeline()
.addLast(new HttpRequestDecoder(), new HttpContentDecompressor(),
new HttpObjectAggregator(Integer.MAX_VALUE), decoder, exceptionHandler,
handler);
ch.pipeline().addLast(new HttpResponseEncoder());
}
};
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerHandler());
}
/**
* http协议支持
*/
private void appendHttpCodec(ChannelPipeline pipeline) {
// http支持 webSocket是建立在http上的
pipeline.addLast(new HttpClientCodec());
// http请求和响应可能被分段,利用聚合器将http请求合并为完整的Http请求
pipeline.addLast(new HttpObjectAggregator(65535));
}
@Before
public void setup() throws Exception {
s = new Server(conf);
s.run();
Connector con = mac.getConnector("root", "secret");
con.securityOperations().changeUserAuthorizations("root", new Authorizations("A", "B", "C", "D", "E", "F"));
this.sessionId = UUID.randomUUID().toString();
AuthCache.getCache().put(sessionId, token);
group = new NioEventLoopGroup();
SslContext ssl = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
String cookieVal = ClientCookieEncoder.STRICT.encode(Constants.COOKIE_NAME, sessionId);
HttpHeaders headers = new DefaultHttpHeaders();
headers.add(HttpHeaderNames.COOKIE, cookieVal);
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(LOCATION,
WebSocketVersion.V13, (String) null, false, headers);
handler = new ClientHandler(handshaker);
Bootstrap boot = new Bootstrap();
boot.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ssl", ssl.newHandler(ch.alloc(), "127.0.0.1", WS_PORT));
ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new HttpObjectAggregator(8192));
ch.pipeline().addLast(handler);
}
});
ch = boot.connect("127.0.0.1", WS_PORT).sync().channel();
// Wait until handshake is complete
while (!handshaker.isHandshakeComplete()) {
sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
LOG.debug("Waiting for Handshake to complete");
}
}
public void run(Function<Request, CompletableFuture<Response>> requestHandler) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
int maxContentLength = 64 * 1024;
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(maxContentLength));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpHandler(requestHandler, executorService, timeoutMillis));
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
logger.info("listening on port " + port);
// wait until server socket is closed
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
@Override
protected void addHttp1Handlers(ChannelPipeline pipeline) {
pipeline.addLast(
HTTP_CODEC_HANDLER_NAME,
new HttpServerCodec(
MAX_INITIAL_LINE_LENGTH.get(),
MAX_HEADER_SIZE.get(),
MAX_CHUNK_SIZE.get(),
false));
pipeline.addLast(new HttpObjectAggregator(8192));
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerHandler());
}
@Override
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("codec", new HttpServerCodec());
ch.pipeline().addLast("aggegator", new HttpObjectAggregator(Integer.MAX_VALUE));
ch.pipeline().addLast("harness", new HealthHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind("0.0.0.0", port).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("Exception in HTTP server", e);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("encoder", new HttpResponseEncoder());
p.addLast("decoder", new HttpRequestDecoder());
p.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
p.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
p.addLast("aggregator", new HttpObjectAggregator(65536));
p.addLast("handler", new WhirlpoolServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
logger.info("Whirlpool Server started");
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
logger.info("Whirlpool Server shutdown started");
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
logger.info("Whirlpool Server shutdown completed");
}
}
/**
* {@link Provides} the list of providers of {@link ChannelHandler}s that are used for http
* protocol.
*/
@Provides
@HttpWhoisProtocol
static ImmutableList<Provider<? extends ChannelHandler>> providerHttpWhoisHandlerProviders(
Provider<HttpClientCodec> httpClientCodecProvider,
Provider<HttpObjectAggregator> httpObjectAggregatorProvider,
Provider<WebWhoisMessageHandler> messageHandlerProvider,
Provider<WebWhoisActionHandler> webWhoisActionHandlerProvider) {
return ImmutableList.of(
httpClientCodecProvider,
httpObjectAggregatorProvider,
messageHandlerProvider,
webWhoisActionHandlerProvider);
}
@Test
public void testInitChannel() throws Exception {
// Mock Objects
HttpServerInitializer httpServerInitializer = mock(HttpServerInitializer.class);
SocketChannel socketChannel = mock(SocketChannel.class);
ChannelPipeline channelPipeline = mock(ChannelPipeline.class);
// Mock SocketChannel#pipeline() method
when(socketChannel.pipeline()).thenReturn(channelPipeline);
// HttpServerInitializer#initChannel(SocketChannel) call real method
doCallRealMethod().when(httpServerInitializer).initChannel(socketChannel);
// Start test for HttpServerInitializer#initChannel(SocketChannel)
httpServerInitializer.initChannel(socketChannel);
// Verify 4 times calling ChannelPipeline#addLast() method
verify(channelPipeline, times(4)).addLast(any(ChannelHandler.class));
// Verify the order of calling ChannelPipeline#addLast() method
InOrder inOrder = inOrder(channelPipeline);
inOrder.verify(channelPipeline).addLast(any(HttpRequestDecoder.class));
inOrder.verify(channelPipeline).addLast(any(HttpObjectAggregator.class));
inOrder.verify(channelPipeline).addLast(any(HttpResponseEncoder.class));
inOrder.verify(channelPipeline).addLast(any(HttpServerHandler.class));
}
@Override
public void initChannel(SocketChannel ch) {
CorsConfig corsConfig = CorsConfigBuilder.forAnyOrigin().allowNullOrigin().allowCredentials().build();
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpResponseEncoder());
pipeline.addLast(new HttpRequestDecoder());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new CorsHandler(corsConfig));
pipeline.addLast(new OkResponseHandler());
}
@Override
public void afterStart() {
try {
if (handlerList.isEmpty()) {
throw new Exception("ChannelHandler is null");
}
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.SO_BACKLOG, 12000).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast("aggregator", new HttpObjectAggregator(maxContentLength));
for (ChannelHandler handler : handlerList) {
ch.pipeline().addLast(handler);
}
}
});
b.bind(port);
LOGGER.info("-----> connector started: http://127.0.0.1:{}/", port);
} catch (Exception e) {
LOGGER.error("{}", e);
}
}