下面列出了怎么用 io.netty.handler.codec.http.HttpContentDecompressor 的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast("ssl", sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast("codec", new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
pipeline.addLast("inflater", new HttpContentDecompressor());
// to be used since huge file transfer
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new HttpUploadClientHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// Enable HTTPS if necessary.
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
p.addLast(new HttpContentDecompressor());
// Uncomment the following line if you don't want to handle HttpContents.
//p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpSnoopClientHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// 启用 HTTPS .
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpClientCodec());
// 如果不想自动处理内容压缩请移除下面这一行.
p.addLast(new HttpContentDecompressor());
// 如果不想处理HttpContents就放开下面这行注释.
//p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpSnoopClientHandler());
}
@Override
public void afterResponse(Channel clientChannel, Channel proxyChannel, HttpResponse httpResponse,
HttpProxyInterceptPipeline pipeline) throws Exception {
if (match(httpResponse, pipeline)) {
isMatch = true;
//解压gzip响应
if ("gzip".equalsIgnoreCase(httpResponse.headers().get(HttpHeaderNames.CONTENT_ENCODING))) {
isGzip = true;
pipeline.reset3();
proxyChannel.pipeline().addAfter("httpCodec", "decompress", new HttpContentDecompressor());
proxyChannel.pipeline().fireChannelRead(httpResponse);
} else {
if (isGzip) {
httpResponse.headers().set(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP);
}
contentBuf = PooledByteBufAllocator.DEFAULT.buffer();
}
//直接调用默认拦截器,跳过下载拦截器
pipeline.getDefault()
.afterResponse(clientChannel, proxyChannel, httpResponse, pipeline);
} else {
isMatch = false;
pipeline.afterResponse(clientChannel, proxyChannel, httpResponse);
}
}
@Override
public void channelCreated(Channel ch) throws Exception {
logger.debug("channelCreated. Channel ID: {}", ch.id());
NioSocketChannel channel = (NioSocketChannel) ch;
channel.config().setKeepAlive(true);
channel.config().setTcpNoDelay(true);
ChannelPipeline pipeline = channel.pipeline();
if (sslCtx != null) {
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpContentDecompressor());
pipeline.addLast(new HttpObjectAggregator(1024 * 1024 * 64));
pipeline.addLast(new ChunkedWriteHandler());
// pipeline.addLast(new ReadTimeoutHandler(requestHolder.route.getTimeoutInMilliseconds(), TimeUnit.MILLISECONDS));
pipeline.addLast(new BackHandler());
}
private Bootstrap bootstrap(ClientHandler handler) {
Bootstrap b = new Bootstrap();
b.group(new NioEventLoopGroup(1))
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10 * 1000)
.handler(
new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new ReadTimeoutHandler(10 * 60 * 1000));
p.addLast(new HttpClientCodec());
p.addLast(new HttpContentDecompressor());
p.addLast(new ChunkedWriteHandler());
p.addLast(new HttpObjectAggregator(6553600));
p.addLast(handler);
}
});
return b;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast("ssl", sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast("codec", new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
pipeline.addLast("inflater", new HttpContentDecompressor());
// to be used since huge file transfer
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new HttpUploadClientHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// Enable HTTPS if necessary.
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
p.addLast(new HttpContentDecompressor());
// Uncomment the following line if you don't want to handle HttpContents.
//p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpSnoopClientHandler());
}
@Override
protected ChannelHandler setupHttpChannel(Configuration config, SslContext sslCtx) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ssl", new NonSslRedirectHandler(config.getHttp(), sslCtx));
ch.pipeline().addLast("decompressor", new HttpContentDecompressor());
ch.pipeline().addLast("decoder", new HttpRequestDecoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(8192));
ch.pipeline().addLast("queryDecoder",
new timely.netty.http.HttpRequestDecoder(config.getSecurity(), config.getHttp()));
ch.pipeline().addLast("capture", httpRequests);
}
};
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// Enable HTTPS if necessary.
// if (sslCtx != null) {
// pipeline.addLast(sslCtx.newHandler(ch.alloc()));
// }
pipeline.addLast(new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
pipeline.addLast(new HttpContentDecompressor());
// Uncomment the following line if you don't want to handle HttpContents.
//pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new HttpObjectAggregator(65536 * 3));
// pipeline.addLast(new HttpClientHandler(null, mHttpClientListener));
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
pipeline.addLast("ssl", sslCtx.newHandler(ch.alloc()));
}
pipeline.addLast("codec", new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
pipeline.addLast("inflater", new HttpContentDecompressor());
// to be used since huge file transfer
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new HttpUploadClientHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// Enable HTTPS if necessary.
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
p.addLast(new HttpContentDecompressor());
// Uncomment the following line if you don't want to handle HttpContents.
//p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpSnoopClientHandler());
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// Enable HTTPS if necessary.
// if (sslCtx != null) {
// pipeline.addLast(sslCtx.newHandler(ch.alloc()));
// }
pipeline.addLast(new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
pipeline.addLast(new HttpContentDecompressor());
// Uncomment the following line if you don't want to handle HttpContents.
//pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new HttpObjectAggregator(65536 * 3));
// pipeline.addLast(new HttpClientHandler(null, mHttpClientListener));
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// Enable HTTPS if necessary.
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
p.addLast(new HttpContentDecompressor());
// Uncomment the following line if you don't want to handle HttpContents.
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(nettyHttpClientHandler);
}
@SuppressWarnings("FutureReturnValueIgnored")
private void releaseDownloadChannel(Channel ch) {
if (ch.isOpen()) {
// The channel might have been closed due to an error, in which case its pipeline
// has already been cleared. Closed channels can't be reused.
try {
ch.pipeline().remove(IdleTimeoutHandler.class);
ch.pipeline().remove(HttpClientCodec.class);
ch.pipeline().remove(HttpContentDecompressor.class);
ch.pipeline().remove(HttpDownloadHandler.class);
} catch (NoSuchElementException e) {
// If the channel is in the process of closing but not yet closed, some handlers could have
// been removed and would cause NoSuchElement exceptions to be thrown. Because handlers are
// removed in reverse-order, if we get a NoSuchElement exception, the following handlers
// should have been removed.
}
}
channelPool.release(ch);
}
@Override
public EventLoopGroup init(Bootstrap bootstrap, final DockerClientConfig dockerClientConfig) {
EventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(0, new DefaultThreadFactory(threadPrefix));
InetAddress addr = InetAddress.getLoopbackAddress();
final SocketAddress proxyAddress = new InetSocketAddress(addr, 8008);
Security.addProvider(new BouncyCastleProvider());
ChannelFactory<NioSocketChannel> factory = () -> configure(new NioSocketChannel());
bootstrap.group(nioEventLoopGroup).channelFactory(factory)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(final SocketChannel channel) throws Exception {
// channel.pipeline().addLast(new
// HttpProxyHandler(proxyAddress));
channel.pipeline().addLast(new HttpClientCodec());
channel.pipeline().addLast(new HttpContentDecompressor());
}
});
return nioEventLoopGroup;
}
@Before
public void setup() throws Exception {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpClientCodec());
p.addLast(new HttpContentDecompressor());
p.addLast(new SimpleChannelInboundHandler<HttpObject>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
response = prepareResponse(ctx, msg, response);
}
});
}
});
channel = b.connect(HOST, PORT)
.sync()
.channel();
}
public static Channel connect(
boolean management, ConfigManager configManager, int readTimeOut) {
Logger logger = LoggerFactory.getLogger(ModelServerTest.class);
final Connector connector = configManager.getListener(management);
try {
Bootstrap b = new Bootstrap();
final SslContext sslCtx =
SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
b.group(Connector.newEventLoopGroup(1))
.channel(connector.getClientChannel())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.handler(
new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) {
ChannelPipeline p = ch.pipeline();
if (connector.isSsl()) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new ReadTimeoutHandler(readTimeOut));
p.addLast(new HttpClientCodec());
p.addLast(new HttpContentDecompressor());
p.addLast(new ChunkedWriteHandler());
p.addLast(new HttpObjectAggregator(6553600));
p.addLast(new TestHandler());
}
});
return b.connect(connector.getSocketAddress()).sync().channel();
} catch (Throwable t) {
logger.warn("Connect error.", t);
}
return null;
}
/**
* Since we're impersonating a {@code Zipkin} server we need to support the same set of features.
* One of the features is request compression, which we handle here by adding a {@link HttpContentDecompressor} to the {@code Netty} pipeline.
*/
@Bean
public ReactiveWebServerFactory reactiveWebServerFactory() {
NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory();
factory.addServerCustomizers(builder -> builder
.tcpConfiguration(tcpServer -> {
return tcpServer.doOnConnection(connection -> connection.addHandler("decompressor", new HttpContentDecompressor()));
}));
return factory;
}
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpClientCodec());
// Remove the following line if you don't want automatic content decompression.
p.addLast(new HttpContentDecompressor());
}
private Channel connect(boolean management) {
Logger logger = LoggerFactory.getLogger(ModelServerTest.class);
final Connector connector = configManager.getListener(management);
try {
Bootstrap b = new Bootstrap();
final SslContext sslCtx =
SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
b.group(Connector.newEventLoopGroup(1))
.channel(connector.getClientChannel())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.handler(
new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) {
ChannelPipeline p = ch.pipeline();
if (connector.isSsl()) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new ReadTimeoutHandler(30));
p.addLast(new HttpClientCodec());
p.addLast(new HttpContentDecompressor());
p.addLast(new ChunkedWriteHandler());
p.addLast(new HttpObjectAggregator(6553600));
p.addLast(new TestHandler());
}
});
return b.connect(connector.getSocketAddress()).sync().channel();
} catch (Throwable t) {
logger.warn("Connect error.", t);
}
return null;
}
private static void addChannelHandlers(Channel channel, HttpConfig httpConfig, SslContext sslContext, boolean sendSni, String targetHost) {
ChannelPipeline pipeline = channel.pipeline();
if (sslContext != null) {
SslHandler sslHandler = sendSni
? sslContext.newHandler(channel.alloc(), targetHost, IGNORED_PORT_NUMBER)
: sslContext.newHandler(channel.alloc());
pipeline.addLast("ssl", sslHandler);
}
pipeline.addLast("http-codec", new HttpClientCodec(httpConfig.maxInitialLength(), httpConfig.maxHeadersSize(), httpConfig.maxChunkSize()));
if (httpConfig.compress()) {
pipeline.addLast("decompressor", new HttpContentDecompressor());
}
}
protected ChannelHandler setupHttpChannel(Configuration config, SslContext sslCtx) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ssl", new NonSslRedirectHandler(config, sslCtx));
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
ch.pipeline().addLast("decoder", new HttpRequestDecoder());
ch.pipeline().addLast("compressor", new HttpContentCompressor());
ch.pipeline().addLast("decompressor", new HttpContentDecompressor());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(8192));
ch.pipeline().addLast("chunker", new ChunkedWriteHandler());
final Configuration.Cors corsCfg = config.getHttp().getCors();
final CorsConfigBuilder ccb;
if (corsCfg.isAllowAnyOrigin()) {
ccb = CorsConfigBuilder.forAnyOrigin();
} else {
ccb = CorsConfigBuilder.forOrigins(corsCfg.getAllowedOrigins().stream().toArray(String[]::new));
}
if (corsCfg.isAllowNullOrigin()) {
ccb.allowNullOrigin();
}
if (corsCfg.isAllowCredentials()) {
ccb.allowCredentials();
}
corsCfg.getAllowedMethods().stream().map(HttpMethod::valueOf).forEach(ccb::allowedRequestMethods);
corsCfg.getAllowedHeaders().forEach(ccb::allowedRequestHeaders);
CorsConfig cors = ccb.build();
LOG.trace("Cors configuration: {}", cors);
ch.pipeline().addLast("cors", new CorsHandler(cors));
ch.pipeline().addLast("queryDecoder", new qonduit.netty.http.HttpRequestDecoder(config));
ch.pipeline().addLast("strict", new StrictTransportHandler(config));
ch.pipeline().addLast("login", new X509LoginRequestHandler(config));
ch.pipeline().addLast("doLogin", new BasicAuthLoginRequestHandler(config));
ch.pipeline().addLast("error", new HttpExceptionHandler());
}
};
}
@Override
protected ChannelHandler setupHttpChannel(Configuration config, SslContext sslCtx) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ssl", new NonSslRedirectHandler(config, sslCtx));
ch.pipeline().addLast("decompressor", new HttpContentDecompressor());
ch.pipeline().addLast("decoder", new HttpRequestDecoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(8192));
ch.pipeline().addLast("queryDecoder", new qonduit.netty.http.HttpRequestDecoder(config));
ch.pipeline().addLast("capture", httpRequests);
}
};
}
@Override
protected void initChannel(final SocketChannel channel) {
final ChannelPipeline pipeline = channel.pipeline();
final ChannelHandlerAdapter handler;
final boolean degzip;
if (Handlers.isActive("capture")) {
degzip = true;
handler = new DefaultResponseLocatorCapturingHandler(api);
} else if (Handlers.isActive("passthrough")) {
degzip = false;
handler = new PassthroughHandler(api);
} else {
degzip = true;
handler = new ServingProxyHandler(api);
}
pipeline
.addLast("logging", new LoggingHandler(LogLevel.valueOf(api.getLogLevel())))
.addLast("http-decoder", new HttpRequestDecoder());
if (degzip) {
pipeline.addLast("gzip-decompressor", new HttpContentDecompressor());
}
pipeline
.addLast("http-encoder", new HttpResponseEncoder())
.addLast("gzip-compressor", new HttpContentCompressor())
.addLast("aggregator", new HttpObjectAggregator(Integer.MAX_VALUE))
.addLast("chunked-writer", new ChunkedWriteHandler())
.addLast("talend-junit-api-server", handler);
}
protected ChannelHandler setupHttpChannel(BalancerConfiguration balancerConfig, SslContext sslCtx,
MetricResolver metricResolver, HttpClientPool httpClientPool) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ssl", new NonSslRedirectHandler(balancerConfig.getHttp(), sslCtx));
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
ch.pipeline().addLast("decoder", new HttpRequestDecoder());
ch.pipeline().addLast("compressor", new HttpContentCompressor());
ch.pipeline().addLast("decompressor", new HttpContentDecompressor());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
ch.pipeline().addLast("chunker", new ChunkedWriteHandler());
ch.pipeline().addLast("queryDecoder", new timely.netty.http.HttpRequestDecoder(
balancerConfig.getSecurity(), balancerConfig.getHttp()));
ch.pipeline().addLast("fileServer", new HttpStaticFileServerHandler().setIgnoreSslHandshakeErrors(
balancerConfig.getSecurity().getServerSsl().isUseGeneratedKeypair()));
ch.pipeline().addLast("login",
new X509LoginRequestHandler(balancerConfig.getSecurity(), balancerConfig.getHttp()));
ch.pipeline().addLast("httpRelay", new HttpRelayHandler(metricResolver, httpClientPool));
ch.pipeline().addLast("error", new TimelyExceptionHandler().setIgnoreSslHandshakeErrors(
balancerConfig.getSecurity().getServerSsl().isUseGeneratedKeypair()));
}
};
}
protected ChannelHandler setupHttpChannel(GrafanaAuthConfiguration config, SslContext sslCtx,
HttpClientPool httpClientPool) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("ssl", new NonSslRedirectHandler(config.getHttp(), sslCtx));
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
ch.pipeline().addLast("decoder", new HttpRequestDecoder());
ch.pipeline().addLast("compressor", new HttpContentCompressor());
ch.pipeline().addLast("decompressor", new HttpContentDecompressor());
// high maximum contentLength so that grafana snapshots can be delivered
// might not be necessary if inbound chunking (while proxying) is handled
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(2097152));
ch.pipeline().addLast("chunker", new ChunkedWriteHandler());
ch.pipeline().addLast("grafanaDecoder",
new GrafanaRequestDecoder(config.getSecurity(), config.getHttp()));
ch.pipeline().addLast("fileServer", new HttpStaticFileServerHandler());
ch.pipeline().addLast("login", new X509LoginRequestHandler(config.getSecurity(), config.getHttp()));
ch.pipeline().addLast("httpRelay", new GrafanaRelayHandler(config, httpClientPool));
ch.pipeline().addLast("error", new TimelyExceptionHandler()
.setIgnoreSslHandshakeErrors(config.getSecurity().getServerSsl().isUseGeneratedKeypair()));
}
};
}
static void configureHttp11Pipeline(ChannelPipeline p,
boolean acceptGzip,
HttpResponseDecoderSpec decoder,
@Nullable Supplier<? extends ChannelMetricsRecorder> metricsRecorder,
@Nullable Function<String, String> uriTagValue) {
p.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpCodec,
new HttpClientCodec(
decoder.maxInitialLineLength(),
decoder.maxHeaderSize(),
decoder.maxChunkSize(),
decoder.failOnMissingResponse,
decoder.validateHeaders(),
decoder.initialBufferSize(),
decoder.parseHttpAfterConnectRequest));
if (acceptGzip) {
p.addAfter(NettyPipeline.HttpCodec, NettyPipeline.HttpDecompressor, new HttpContentDecompressor());
}
if (metricsRecorder != null) {
ChannelMetricsRecorder channelMetricsRecorder = metricsRecorder.get();
if (channelMetricsRecorder instanceof HttpClientMetricsRecorder) {
p.addBefore(NettyPipeline.ReactiveBridge,
NettyPipeline.HttpMetricsHandler,
new HttpClientMetricsHandler((HttpClientMetricsRecorder) channelMetricsRecorder, uriTagValue));
}
}
}
@Test
public void testIssue525() {
disposableServer =
HttpServer.create()
.port(0)
.doOnConnection(c -> c.addHandlerFirst("decompressor", new HttpContentDecompressor()))
.handle((req, res) -> res.send(req.receive()
.retain()))
.wiretap(true)
.bindNow(Duration.ofSeconds(30));
byte[] bytes = "test".getBytes(Charset.defaultCharset());
String response =
HttpClient.create()
.port(disposableServer.port())
.wiretap(true)
.headers(h -> h.add("Content-Encoding", "gzip"))
.post()
.uri("/")
.send(Mono.just(Unpooled.wrappedBuffer(compress(bytes))))
.responseContent()
.aggregate()
.asString()
.block(Duration.ofSeconds(30));
assertThat(response).isEqualTo("test");
}
@Override
public final ChannelInitializer<Channel> initializer() {
return new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
final ChannelInboundHandlerAdapter exceptionHandler =
new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(
final ChannelHandlerContext ctx, final Throwable cause
) throws Exception {
if (cause instanceof HttpException) {
final HttpException e = (HttpException) cause;
sendResponse(ctx, e.getStatus());
return;
}
if (cause instanceof DecoderException) {
exceptionCaught(ctx, cause.getCause());
return;
}
log.error("error in pipeline: ", cause);
sendResponse(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
};
ch
.pipeline()
.addLast(new HttpRequestDecoder(), new HttpContentDecompressor(),
new HttpObjectAggregator(Integer.MAX_VALUE), decoder, exceptionHandler,
handler);
ch.pipeline().addLast(new HttpResponseEncoder());
}
};
}