下面列出了 io.netty.handler.codec.http2.Http2FrameListener #software.amazon.awssdk.http.Protocol 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private NettyNioAsyncHttpClient(DefaultBuilder builder, AttributeMap serviceDefaultsMap) {
this.configuration = new NettyConfiguration(serviceDefaultsMap);
Protocol protocol = serviceDefaultsMap.get(SdkHttpConfigurationOption.PROTOCOL);
this.sdkEventLoopGroup = eventLoopGroup(builder);
Http2Configuration http2Configuration = builder.http2Configuration;
long maxStreams = resolveMaxHttp2Streams(builder.maxHttp2Streams, http2Configuration);
int initialWindowSize = resolveInitialWindowSize(http2Configuration);
this.pools = AwaitCloseChannelPoolMap.builder()
.sdkChannelOptions(builder.sdkChannelOptions)
.configuration(configuration)
.protocol(protocol)
.maxStreams(maxStreams)
.initialWindowSize(initialWindowSize)
.healthCheckPingPeriod(resolveHealthCheckPingPeriod(http2Configuration))
.sdkEventLoopGroup(sdkEventLoopGroup)
.sslProvider(resolveSslProvider(builder))
.proxyConfiguration(builder.proxyConfiguration)
.build();
}
public ChannelPipelineInitializer(Protocol protocol,
SslContext sslCtx,
SslProvider sslProvider,
long clientMaxStreams,
int clientInitialWindowSize,
Duration healthCheckPingPeriod,
AtomicReference<ChannelPool> channelPoolRef,
NettyConfiguration configuration,
URI poolKey) {
this.protocol = protocol;
this.sslCtx = sslCtx;
this.sslProvider = sslProvider;
this.clientMaxStreams = clientMaxStreams;
this.clientInitialWindowSize = clientInitialWindowSize;
this.healthCheckPingPeriod = healthCheckPingPeriod;
this.channelPoolRef = channelPoolRef;
this.configuration = configuration;
this.poolKey = poolKey;
}
/**
* Configures the headers in the specified Netty HTTP request.
*/
private void addHeadersToRequest(DefaultHttpRequest httpRequest, SdkHttpRequest request) {
httpRequest.headers().add(HOST, getHostHeaderValue(request));
String scheme = request.getUri().getScheme();
if (Protocol.HTTP2 == protocol && !StringUtils.isBlank(scheme)) {
httpRequest.headers().add(ExtensionHeaderNames.SCHEME.text(), scheme);
}
// Copy over any other headers already in our request
request.headers().entrySet().stream()
/*
* Skip the Host header to avoid sending it twice, which will
* interfere with some signing schemes.
*/
.filter(e -> !IGNORE_HEADERS.contains(e.getKey()))
.forEach(e -> e.getValue().forEach(h -> httpRequest.headers().add(e.getKey(), h)));
}
@Before
public void setUp() throws Exception {
executeFuture = new CompletableFuture<>();
fullHttpResponse = mock(DefaultHttpContent.class);
when(fullHttpResponse.content()).thenReturn(new EmptyByteBuf(ByteBufAllocator.DEFAULT));
requestContext = new RequestContext(channelPool,
eventLoopGroup,
AsyncExecuteRequest.builder().responseHandler(responseHandler).build(),
null);
channel = new MockChannel();
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP1_1));
channel.attr(REQUEST_CONTEXT_KEY).set(requestContext);
channel.attr(EXECUTE_FUTURE_KEY).set(executeFuture);
when(ctx.channel()).thenReturn(channel);
nettyResponseHandler = ResponseHandler.getInstance();
DefaultHttpResponse defaultFullHttpResponse = mock(DefaultHttpResponse.class);
when(defaultFullHttpResponse.headers()).thenReturn(EmptyHttpHeaders.INSTANCE);
when(defaultFullHttpResponse.status()).thenReturn(HttpResponseStatus.CREATED);
when(defaultFullHttpResponse.protocolVersion()).thenReturn(HttpVersion.HTTP_1_1);
nettyResponseHandler.channelRead0(ctx, defaultFullHttpResponse);
}
@Test
public void close_underlyingPoolsShouldBeClosed() {
channelPoolMap = AwaitCloseChannelPoolMap.builder()
.sdkChannelOptions(new SdkChannelOptions())
.sdkEventLoopGroup(SdkEventLoopGroup.builder().build())
.configuration(new NettyConfiguration(GLOBAL_HTTP_DEFAULTS))
.protocol(Protocol.HTTP1_1)
.maxStreams(100)
.sslProvider(SslProvider.OPENSSL)
.build();
int numberOfChannelPools = 5;
List<SimpleChannelPoolAwareChannelPool> channelPools = new ArrayList<>();
for (int i = 0; i < numberOfChannelPools; i++) {
channelPools.add(
channelPoolMap.get(URI.create("http://" + RandomStringUtils.randomAlphabetic(2) + i + "localhost:" + numberOfChannelPools)));
}
assertThat(channelPoolMap.pools().size()).isEqualTo(numberOfChannelPools);
channelPoolMap.close();
channelPools.forEach(channelPool -> {
assertThat(channelPool.underlyingSimpleChannelPool().closeFuture()).isDone();
assertThat(channelPool.underlyingSimpleChannelPool().closeFuture().join()).isTrue();
});
}
@Test
public void usingProxy_noSchemeGiven_defaultsToHttp() {
ProxyConfiguration proxyConfiguration = ProxyConfiguration.builder()
.host("localhost")
.port(mockProxy.port())
.build();
channelPoolMap = AwaitCloseChannelPoolMap.builder()
.proxyConfiguration(proxyConfiguration)
.sdkChannelOptions(new SdkChannelOptions())
.sdkEventLoopGroup(SdkEventLoopGroup.builder().build())
.configuration(new NettyConfiguration(GLOBAL_HTTP_DEFAULTS))
.protocol(Protocol.HTTP1_1)
.maxStreams(100)
.sslProvider(SslProvider.OPENSSL)
.build();
SimpleChannelPoolAwareChannelPool simpleChannelPoolAwareChannelPool = channelPoolMap.newPool(
URI.create("https://some-awesome-service:443"));
simpleChannelPoolAwareChannelPool.acquire().awaitUninterruptibly();
String requests = recorder.requests().toString();
assertThat(requests).contains("CONNECT some-awesome-service:443");
}
@Setup(Level.Trial)
public void setup() throws Exception {
mockServer = new MockH2Server(false);
mockServer.start();
SslProvider sslProvider = getSslProvider(sslProviderValue);
sdkHttpClient = NettyNioAsyncHttpClient.builder()
.sslProvider(sslProvider)
.buildWithDefaults(trustAllTlsAttributeMapBuilder()
.put(PROTOCOL, Protocol.HTTP2)
.build());
client = ProtocolRestJsonAsyncClient.builder()
.endpointOverride(mockServer.getHttpsUri())
.httpClient(sdkHttpClient)
.build();
// Making sure the request actually succeeds
client.allTypes().join();
}
private CodeBlock serviceSpecificHttpConfigMethodBody(String serviceDefaultFqcn, boolean supportsH2) {
CodeBlock.Builder builder = CodeBlock.builder();
if (serviceDefaultFqcn != null) {
builder.addStatement("$T result = $T.defaultHttpConfig()",
AttributeMap.class,
PoetUtils.classNameFromFqcn(model.getCustomizationConfig().getServiceSpecificHttpConfig()));
} else {
builder.addStatement("$1T result = $1T.empty()", AttributeMap.class);
}
if (supportsH2) {
builder.addStatement("return result.merge(AttributeMap.builder()"
+ ".put($T.PROTOCOL, $T.HTTP2)"
+ ".build())",
SdkHttpConfigurationOption.class, Protocol.class);
} else {
builder.addStatement("return result");
}
return builder.build();
}
private boolean tryConfigurePipeline() {
Protocol protocol = ChannelAttributeKey.getProtocolNow(channel);
ChannelPipeline pipeline = channel.pipeline();
switch (protocol) {
case HTTP2:
pipeline.addLast(new Http2ToHttpInboundAdapter());
pipeline.addLast(new HttpToHttp2OutboundAdapter());
pipeline.addLast(Http2StreamExceptionHandler.create());
requestAdapter = REQUEST_ADAPTER_HTTP2;
break;
case HTTP1_1:
requestAdapter = REQUEST_ADAPTER_HTTP1_1;
break;
default:
String errorMsg = "Unknown protocol: " + protocol;
closeAndRelease(channel);
handleFailure(() -> errorMsg, new RuntimeException(errorMsg));
return false;
}
pipeline.addLast(LastHttpContentHandler.create());
if (Protocol.HTTP2.equals(protocol)) {
pipeline.addLast(FlushOnReadHandler.getInstance());
}
pipeline.addLast(new HttpStreamsClientHandler());
pipeline.addLast(ResponseHandler.getInstance());
// It's possible that the channel could become inactive between checking it out from the pool, and adding our response
// handler (which will monitor for it going inactive from now on).
// Make sure it's active here, or the request will never complete: https://github.com/aws/aws-sdk-java-v2/issues/1207
if (!channel.isActive()) {
String errorMessage = "Channel was closed before it could be written to.";
closeAndRelease(channel);
handleFailure(() -> errorMessage, new IOException(errorMessage));
return false;
}
return true;
}
private void start(Protocol protocol, ChannelHandlerContext ctx) {
if (protocol == Protocol.HTTP2 && periodicPing == null) {
periodicPing = ctx.channel()
.eventLoop()
.scheduleAtFixedRate(() -> doPeriodicPing(ctx.channel()), 0, pingTimeoutMillis, MILLISECONDS);
}
}
private void acquireStreamOnFreshConnection(Promise<Channel> promise, Channel parentChannel, Protocol protocol) {
try {
Long maxStreams = parentChannel.attr(MAX_CONCURRENT_STREAMS).get();
Validate.isTrue(protocol == Protocol.HTTP2,
"Protocol negotiated on connection (%s) was expected to be HTTP/2, but it "
+ "was %s.", parentChannel, Protocol.HTTP1_1);
Validate.isTrue(maxStreams != null,
"HTTP/2 was negotiated on the connection (%s), but the maximum number of "
+ "streams was not initialized.", parentChannel);
Validate.isTrue(maxStreams > 0, "Maximum streams were not positive on channel (%s).", parentChannel);
MultiplexedChannelRecord multiplexedChannel = new MultiplexedChannelRecord(parentChannel, maxStreams,
idleConnectionTimeout);
parentChannel.attr(MULTIPLEXED_CHANNEL).set(multiplexedChannel);
Promise<Channel> streamPromise = parentChannel.eventLoop().newPromise();
if (!acquireStreamOnInitializedConnection(multiplexedChannel, streamPromise)) {
failAndCloseParent(promise, parentChannel,
new IOException("Connection was closed while creating a new stream."));
return;
}
streamPromise.addListener(f -> {
if (!streamPromise.isSuccess()) {
promise.setFailure(streamPromise.cause());
return;
}
Channel stream = streamPromise.getNow();
cacheConnectionForFutureStreams(stream, multiplexedChannel, promise);
});
} catch (Throwable e) {
failAndCloseParent(promise, parentChannel, e);
}
}
private void completeProtocolConfiguration(Channel newChannel, Protocol protocol) {
doInEventLoop(eventLoop, () -> {
if (closed) {
closeAndRelease(newChannel, new IllegalStateException("Pool closed"));
} else {
try {
protocolImplPromise.setSuccess(configureProtocol(newChannel, protocol));
} catch (Throwable e) {
closeAndRelease(newChannel, e);
}
}
});
}
private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) {
if (Protocol.HTTP1_1 == protocol) {
// For HTTP/1.1 we use a traditional channel pool without multiplexing
protocolImpl = BetterFixedChannelPool.builder()
.channelPool(delegatePool)
.executor(eventLoop)
.acquireTimeoutAction(BetterFixedChannelPool.AcquireTimeoutAction.FAIL)
.acquireTimeoutMillis(configuration.connectionAcquireTimeoutMillis())
.maxConnections(maxConcurrency)
.maxPendingAcquires(configuration.maxPendingConnectionAcquires())
.build();
} else {
Duration idleConnectionTimeout = configuration.reapIdleConnections()
? Duration.ofMillis(configuration.idleTimeoutMillis()) : null;
ChannelPool h2Pool = new Http2MultiplexedChannelPool(delegatePool, eventLoopGroup, idleConnectionTimeout);
protocolImpl = BetterFixedChannelPool.builder()
.channelPool(h2Pool)
.executor(eventLoop)
.acquireTimeoutAction(BetterFixedChannelPool.AcquireTimeoutAction.FAIL)
.acquireTimeoutMillis(configuration.connectionAcquireTimeoutMillis())
.maxConnections(maxConcurrency)
.maxPendingAcquires(configuration.maxPendingConnectionAcquires())
.build();
}
// Give the channel back so it can be acquired again by protocolImpl
delegatePool.release(newChannel);
return protocolImpl;
}
@Test
public void connectionCloseAfterResponse_shouldNotReuseConnection() throws Exception {
server = new Server();
server.init();
netty = NettyNioAsyncHttpClient.builder()
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(2).build())
.protocol(Protocol.HTTP1_1)
.buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, true).build());
sendGetRequest().join();
sendGetRequest().join();
assertThat(server.channels.size()).isEqualTo(2);
}
@Test
public void channelConfigOptionCheck() throws SSLException {
targetUri = URI.create("https://some-awesome-service-1234.amazonaws.com:8080");
SslContext sslContext = SslContextBuilder.forClient()
.sslProvider(SslProvider.JDK)
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.build();
AtomicReference<ChannelPool> channelPoolRef = new AtomicReference<>();
NettyConfiguration nettyConfiguration = new NettyConfiguration(GLOBAL_HTTP_DEFAULTS);
pipelineInitializer = new ChannelPipelineInitializer(Protocol.HTTP1_1,
sslContext,
SslProvider.JDK,
100,
1024,
Duration.ZERO,
channelPoolRef,
nettyConfiguration,
targetUri);
Channel channel = new EmbeddedChannel();
pipelineInitializer.channelCreated(channel);
assertThat(channel.config().getOption(ChannelOption.ALLOCATOR), is(UnpooledByteBufAllocator.DEFAULT));
}
@Test(timeout = 5_000)
public void invalidProtocolConfig_shouldFailPromise() throws Exception {
HttpOrHttp2ChannelPool invalidChannelPool = new HttpOrHttp2ChannelPool(mockDelegatePool,
eventLoopGroup,
4,
new NettyConfiguration(AttributeMap.builder()
.put(CONNECTION_ACQUIRE_TIMEOUT, Duration.ofSeconds(1))
.put(MAX_PENDING_CONNECTION_ACQUIRES, 0)
.build()));
Promise<Channel> acquirePromise = eventLoopGroup.next().newPromise();
when(mockDelegatePool.acquire()).thenReturn(acquirePromise);
Thread.sleep(500);
Channel channel = new MockChannel();
eventLoopGroup.register(channel);
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP1_1));
acquirePromise.setSuccess(channel);
Future<Channel> p = invalidChannelPool.acquire();
assertThat(p.await().cause().getMessage()).contains("maxPendingAcquires: 0 (expected: >= 1)");
verify(mockDelegatePool).release(channel);
assertThat(channel.isOpen()).isFalse();
}
@Test(timeout = 5_000)
public void protocolConfigInProgress_poolClosed_closesChannelAndDelegatePoolOnAcquireSuccess() throws InterruptedException {
Promise<Channel> acquirePromise = eventLoopGroup.next().newPromise();
when(mockDelegatePool.acquire()).thenReturn(acquirePromise);
// initiate the configuration
httpOrHttp2ChannelPool.acquire();
// close the pool before the config can complete (we haven't completed acquirePromise yet)
httpOrHttp2ChannelPool.close();
Thread.sleep(500);
Channel channel = new NioSocketChannel();
eventLoopGroup.register(channel);
try {
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP1_1));
acquirePromise.setSuccess(channel);
assertThat(channel.closeFuture().await().isDone()).isTrue();
Thread.sleep(500);
verify(mockDelegatePool).release(eq(channel));
verify(mockDelegatePool).close();
} finally {
channel.close();
}
}
@Test(timeout = 5_000)
public void protocolConfigComplete_poolClosed_closesDelegatePool() throws InterruptedException {
Promise<Channel> acquirePromise = eventLoopGroup.next().newPromise();
when(mockDelegatePool.acquire()).thenReturn(acquirePromise);
// initiate the configuration
httpOrHttp2ChannelPool.acquire();
Thread.sleep(500);
Channel channel = new NioSocketChannel();
eventLoopGroup.register(channel);
try {
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP1_1));
// this should complete the protocol config
acquirePromise.setSuccess(channel);
Thread.sleep(500);
// close the pool
httpOrHttp2ChannelPool.close();
Thread.sleep(500);
verify(mockDelegatePool).close();
} finally {
channel.close();
}
}
public static EmbeddedChannel newHttp2Channel(ChannelHandler channelHandler) {
Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient().initialSettings(
Http2Settings.defaultSettings().initialWindowSize(INITIAL_WINDOW_SIZE))
.frameLogger(new Http2FrameLogger(LogLevel.DEBUG)).build();
EmbeddedChannel channel = new EmbeddedChannel(http2FrameCodec,
new Http2MultiplexHandler(channelHandler));
channel.attr(ChannelAttributeKey.HTTP2_CONNECTION).set(http2FrameCodec.connection());
channel.attr(ChannelAttributeKey.HTTP2_INITIAL_WINDOW_SIZE).set(INITIAL_WINDOW_SIZE);
channel.attr(ChannelAttributeKey.PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP2));
return channel;
}
@Test
public void channelRead_useServerMaxStreams() {
long serverMaxStreams = 50L;
Http2SettingsFrame http2SettingsFrame = http2SettingsFrame(serverMaxStreams);
handler.channelRead0(context, http2SettingsFrame);
assertThat(channel.attr(MAX_CONCURRENT_STREAMS).get()).isEqualTo(serverMaxStreams);
assertThat(protocolCompletableFuture).isDone();
assertThat(protocolCompletableFuture.join()).isEqualTo(Protocol.HTTP2);
}
@Test
public void channelRead_useClientMaxStreams() {
long serverMaxStreams = 10000L;
Http2SettingsFrame http2SettingsFrame = http2SettingsFrame(serverMaxStreams);
handler.channelRead0(context, http2SettingsFrame);
assertThat(channel.attr(MAX_CONCURRENT_STREAMS).get()).isEqualTo(clientMaxStreams);
assertThat(protocolCompletableFuture).isDone();
assertThat(protocolCompletableFuture.join()).isEqualTo(Protocol.HTTP2);
}
@Before
public void setup() throws Exception {
server = new Server();
server.init();
netty = NettyNioAsyncHttpClient.builder()
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(2).build())
.protocol(Protocol.HTTP1_1)
.buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, true).build());
}
@Before
public void setup() throws Exception {
server = new Server();
server.init();
netty = NettyNioAsyncHttpClient.builder()
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(3).build())
.protocol(Protocol.HTTP2)
.buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, true).build());
}
@Before
public void setup() throws Exception {
server = new Server();
server.init();
netty = NettyNioAsyncHttpClient.builder()
.readTimeout(Duration.ofMillis(500))
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(3).build())
.protocol(Protocol.HTTP2)
.buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, true).build());
}
@Before
public void setup() throws Exception {
server = new Server();
server.init();
netty = NettyNioAsyncHttpClient.builder()
.readTimeout(Duration.ofMillis(1000))
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(3).build())
.http2Configuration(h -> h.healthCheckPingPeriod(Duration.ofMillis(200)))
.protocol(Protocol.HTTP2)
.buildWithDefaults(AttributeMap.builder().put(TRUST_ALL_CERTIFICATES, true).build());
}
@Bean
@Primary
public KinesisAsyncClient kinesisAsyncClient(final @Value("${test.environment:local}") String testEnvironment,
final AwsCredentialsProvider credentialsProvider,
final RetryPolicy kinesisRetryPolicy) {
// kinesalite does not support cbor at the moment (v1.11.6)
System.setProperty("aws.cborEnabled", "false");
LOG.info("kinesis client for local tests");
final KinesisAsyncClient kinesisClient;
if (testEnvironment.equals("local")) {
kinesisClient = KinesisAsyncClient.builder()
.httpClient(
// Disables HTTP2 because of problems with LocalStack
NettyNioAsyncHttpClient.builder()
.protocol(Protocol.HTTP1_1)
.build())
.endpointOverride(URI.create("http://localhost:4568"))
.region(Region.EU_CENTRAL_1)
.credentialsProvider(credentialsProvider)
.overrideConfiguration(ClientOverrideConfiguration.builder().retryPolicy(kinesisRetryPolicy).build())
.build();
} else {
kinesisClient = KinesisAsyncClient.builder()
.credentialsProvider(credentialsProvider)
.build();
}
createChannelIfNotExists(kinesisClient, KINESIS_INTEGRATION_TEST_CHANNEL, EXPECTED_NUMBER_OF_SHARDS);
return kinesisClient;
}
@Bean
@Primary
public KinesisAsyncClient kinesisAsyncClient(final @Value("${test.environment:local}") String testEnvironment,
final AwsCredentialsProvider credentialsProvider) {
// kinesalite does not support cbor at the moment (v1.11.6)
System.setProperty("aws.cborEnabled", "false");
LOG.info("kinesis client for local tests");
final KinesisAsyncClient kinesisClient;
if (testEnvironment.equals("local")) {
kinesisClient = KinesisAsyncClient.builder()
.httpClient(
// Disables HTTP2 because of problems with LocalStack
NettyNioAsyncHttpClient.builder()
.protocol(Protocol.HTTP1_1)
.build())
.endpointOverride(URI.create("http://localhost:4568"))
.region(Region.EU_CENTRAL_1)
.credentialsProvider(credentialsProvider)
.build();
} else {
kinesisClient = KinesisAsyncClient.builder()
.credentialsProvider(credentialsProvider)
.build();
}
createChannelIfNotExists(kinesisClient, INTEGRATION_TEST_STREAM, 2);
return kinesisClient;
}
public void setProtocol(Protocol protocol) {
protocol(protocol);
}
@Override
public void channelCreated(Channel ch) {
ch.attr(PROTOCOL_FUTURE).set(new CompletableFuture<>());
ChannelPipeline pipeline = ch.pipeline();
if (sslCtx != null) {
// Need to provide host and port to enable SNI
// https://github.com/netty/netty/issues/3801#issuecomment-104274440
SslHandler sslHandler = sslCtx.newHandler(ch.alloc(), poolKey.getHost(), poolKey.getPort());
configureSslEngine(sslHandler.engine());
pipeline.addLast(sslHandler);
pipeline.addLast(SslCloseCompletionEventHandler.getInstance());
// Use unpooled allocator to avoid increased heap memory usage from Netty 4.1.43.
// See https://github.com/netty/netty/issues/9768
if (sslProvider == SslProvider.JDK) {
ch.config().setOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
}
}
if (protocol == Protocol.HTTP2) {
configureHttp2(ch, pipeline);
} else {
configureHttp11(ch, pipeline);
}
if (configuration.reapIdleConnections()) {
pipeline.addLast(new IdleConnectionReaperHandler(configuration.idleTimeoutMillis()));
}
if (configuration.connectionTtlMillis() > 0) {
pipeline.addLast(new OldConnectionReaperHandler(configuration.connectionTtlMillis()));
}
pipeline.addLast(FutureCancelHandler.getInstance());
// Only add it for h1 channel because it does not apply to
// h2 connection channel. It will be attached
// to stream channels when they are created.
if (protocol == Protocol.HTTP1_1) {
pipeline.addLast(UnusedChannelExceptionHandler.getInstance());
}
pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
}
private void configureHttp11(Channel ch, ChannelPipeline pipeline) {
pipeline.addLast(new HttpClientCodec());
ch.attr(PROTOCOL_FUTURE).get().complete(Protocol.HTTP1_1);
}