下面列出了怎么用io.netty.channel.pool.ChannelPool的API类实例代码及写法,或者点击链接到github查看源代码。
public ChannelPipelineInitializer(Protocol protocol,
SslContext sslCtx,
SslProvider sslProvider,
long clientMaxStreams,
int clientInitialWindowSize,
Duration healthCheckPingPeriod,
AtomicReference<ChannelPool> channelPoolRef,
NettyConfiguration configuration,
URI poolKey) {
this.protocol = protocol;
this.sslCtx = sslCtx;
this.sslProvider = sslProvider;
this.clientMaxStreams = clientMaxStreams;
this.clientInitialWindowSize = clientInitialWindowSize;
this.healthCheckPingPeriod = healthCheckPingPeriod;
this.channelPoolRef = channelPoolRef;
this.configuration = configuration;
this.poolKey = poolKey;
}
private void acquire0(Promise<Channel> promise) {
if (closed) {
promise.setFailure(new IllegalStateException("Channel pool is closed!"));
return;
}
if (protocolImpl != null) {
protocolImpl.acquire(promise);
return;
}
if (protocolImplPromise == null) {
initializeProtocol();
}
protocolImplPromise.addListener((GenericFutureListener<Future<ChannelPool>>) future -> {
if (future.isSuccess()) {
future.getNow().acquire(promise);
} else {
// Couldn't negotiate protocol, fail this acquire.
promise.setFailure(future.cause());
}
});
}
private void close0() {
if (closed) {
return;
}
closed = true;
if (protocolImpl != null) {
protocolImpl.close();
} else if (protocolImplPromise != null) {
protocolImplPromise.addListener((Future<ChannelPool> f) -> {
if (f.isSuccess()) {
f.getNow().close();
} else {
delegatePool.close();
}
});
} else {
delegatePool.close();
}
}
@Test
public void acquire_shouldAcquireAgainIfExistingNotReusable() throws Exception {
Channel channel = new EmbeddedChannel();
try {
ChannelPool connectionPool = Mockito.mock(ChannelPool.class);
loopGroup.register(channel).awaitUninterruptibly();
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());
channelPromise.setSuccess(channel);
Mockito.when(connectionPool.acquire()).thenReturn(channelPromise);
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup,
Collections.emptySet(), null);
h2Pool.acquire().awaitUninterruptibly();
h2Pool.acquire().awaitUninterruptibly();
Mockito.verify(connectionPool, Mockito.times(2)).acquire();
} finally {
channel.close();
}
}
@Test
public void usesProvidedKeyManagersProvider() {
TlsKeyManagersProvider provider = mock(TlsKeyManagersProvider.class);
AttributeMap config = AttributeMap.builder()
.put(TLS_KEY_MANAGERS_PROVIDER, provider)
.build();
channelPoolMap = AwaitCloseChannelPoolMap.builder()
.sdkChannelOptions(new SdkChannelOptions())
.sdkEventLoopGroup(SdkEventLoopGroup.builder().build())
.configuration(new NettyConfiguration(config.merge(GLOBAL_HTTP_DEFAULTS)))
.build();
ChannelPool channelPool = channelPoolMap.newPool(URI.create("https://localhost:" + mockProxy.port()));
channelPool.acquire().awaitUninterruptibly();
verify(provider).keyManagers();
}
@Test
public void closeClient_shouldCloseUnderlyingResources() {
SdkEventLoopGroup eventLoopGroup = SdkEventLoopGroup.builder().build();
ChannelPool channelPool = mock(ChannelPool.class);
SdkChannelPoolMap<URI, ChannelPool> sdkChannelPoolMap = new SdkChannelPoolMap<URI, ChannelPool>() {
@Override
protected ChannelPool newPool(URI key) {
return channelPool;
}
};
sdkChannelPoolMap.get(URI.create("http://blah"));
NettyConfiguration nettyConfiguration = new NettyConfiguration(AttributeMap.empty());
SdkAsyncHttpClient customerClient =
new NettyNioAsyncHttpClient(eventLoopGroup, sdkChannelPoolMap, nettyConfiguration);
customerClient.close();
assertThat(eventLoopGroup.eventLoopGroup().isShuttingDown()).isTrue();
assertThat(eventLoopGroup.eventLoopGroup().isTerminated()).isTrue();
assertThat(sdkChannelPoolMap).isEmpty();
Mockito.verify(channelPool).close();
}
StreamingChannel(
Channel channel,
ChannelPool pool,
ObjectHolder<Boolean> callActiveHolder,
ObjectHolder<Boolean> downstreamLastChunkSentHolder,
Deque<Span> distributedTracingSpanStack,
Map<String, String> distributedTracingMdcInfo,
Span spanForDownstreamCall,
ProxyRouterSpanNamingAndTaggingStrategy<Span> proxySpanTaggingStrategy
) {
this.channel = channel;
this.pool = pool;
this.callActiveHolder = callActiveHolder;
this.downstreamLastChunkSentHolder = downstreamLastChunkSentHolder;
this.distributedTracingSpanStack = distributedTracingSpanStack;
this.distributedTracingMdcInfo = distributedTracingMdcInfo;
this.spanForDownstreamCall = spanForDownstreamCall;
this.proxySpanTaggingStrategy = proxySpanTaggingStrategy;
}
protected static void releaseChannelBackToPoolIfCallIsActive(Channel ch, ChannelPool pool,
ObjectHolder<Boolean> callActiveHolder,
String contextReason,
Deque<Span> distributedTracingStack,
Map<String, String> distributedTracingMdcInfo) {
if (callActiveHolder.heldObject) {
if (logger.isDebugEnabled()) {
runnableWithTracingAndMdc(
() -> logger.debug(
"Marking call as inactive and releasing channel back to pool. "
+ "channel_release_reason=\"{}\"", contextReason
),
distributedTracingStack, distributedTracingMdcInfo
).run();
}
callActiveHolder.heldObject = false;
pool.release(ch);
}
}
/**
* <p>Acquires a {@link Channel} from the {@link ChannelPool}</p>
*
* @param message
* An {@link AbstractRequest} that will be used as the lookup reference for the {@link
* io.netty.channel.pool.ChannelPoolMap} key
*
* @return A {@link CompletableFuture} containing the acquired {@link Channel}
*/
@Override
public CompletableFuture<Channel> getChannel(M message) {
final CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
//Retrieve our channel pool based on the message
final ChannelPool pool = poolMap.get(message);
log.debug("Acquiring channel from pool '{}' for message : {}", pool, message);
//Acquire a channel from the pool and listen for completion
pool.acquire().addListener((Future<Channel> future) -> {
if (future.isSuccess()) {
log.debug("Successfully acquired Channel from pool");
Channel channel = future.get();
channel.attr(ChannelAttributes.CHANNEL_POOL).set(pool);
channelFuture.complete(channel);
} else {
log.debug("Failed to acquire Channel from Pool");
channelFuture.completeExceptionally(new ConnectException(future.cause()));
}
});
return channelFuture;
}
@SdkTestInternalApi
NettyNioAsyncHttpClient(SdkEventLoopGroup sdkEventLoopGroup,
SdkChannelPoolMap<URI, ? extends ChannelPool> pools,
NettyConfiguration configuration) {
this.sdkEventLoopGroup = sdkEventLoopGroup;
this.pools = pools;
this.configuration = configuration;
}
@SdkTestInternalApi
public ProxyTunnelInitHandler(ChannelPool sourcePool, URI remoteHost, Promise<Channel> initPromise,
Supplier<HttpClientCodec> httpCodecSupplier) {
this.sourcePool = sourcePool;
this.remoteHost = remoteHost;
this.initPromise = initPromise;
this.httpCodecSupplier = httpCodecSupplier;
}
@Override
protected SimpleChannelPoolAwareChannelPool newPool(URI key) {
SslContext sslContext = sslContext(key);
Bootstrap bootstrap = createBootstrap(key);
AtomicReference<ChannelPool> channelPoolRef = new AtomicReference<>();
ChannelPipelineInitializer pipelineInitializer = new ChannelPipelineInitializer(protocol,
sslContext,
sslProvider,
maxStreams,
initialWindowSize,
healthCheckPingPeriod,
channelPoolRef,
configuration,
key);
BetterSimpleChannelPool tcpChannelPool;
ChannelPool baseChannelPool;
if (shouldUseProxyForHost(key)) {
tcpChannelPool = new BetterSimpleChannelPool(bootstrap, NOOP_HANDLER);
baseChannelPool = new Http1TunnelConnectionPool(bootstrap.config().group().next(), tcpChannelPool,
sslContext, proxyAddress(key), key, pipelineInitializer);
} else {
tcpChannelPool = new BetterSimpleChannelPool(bootstrap, pipelineInitializer);
baseChannelPool = tcpChannelPool;
}
ChannelPool wrappedPool = wrapBaseChannelPool(bootstrap, baseChannelPool);
channelPoolRef.set(wrappedPool);
return new SimpleChannelPoolAwareChannelPool(wrappedPool, tcpChannelPool);
}
private ChannelPool wrapBaseChannelPool(Bootstrap bootstrap, ChannelPool channelPool) {
// Wrap the channel pool such that the ChannelAttributeKey.CLOSE_ON_RELEASE flag is honored.
channelPool = new HonorCloseOnReleaseChannelPool(channelPool);
// Wrap the channel pool such that HTTP 2 channels won't be released to the underlying pool while they're still in use.
channelPool = new HttpOrHttp2ChannelPool(channelPool,
bootstrap.config().group(),
configuration.maxConnections(),
configuration);
// Wrap the channel pool such that we remove request-specific handlers with each request.
channelPool = new HandlerRemovingChannelPool(channelPool);
// Wrap the channel pool such that an individual channel can only be released to the underlying pool once.
channelPool = new ReleaseOnceChannelPool(channelPool);
// Wrap the channel pool to guarantee all channels checked out are healthy, and all unhealthy channels checked in are
// closed.
channelPool = new HealthCheckedChannelPool(bootstrap.config().group(), configuration, channelPool);
// Wrap the channel pool such that if the Promise given to acquire(Promise) is done when the channel is acquired
// from the underlying pool, the channel is closed and released.
channelPool = new CancellableAcquireChannelPool(bootstrap.config().group().next(), channelPool);
return channelPool;
}
/**
* @param connectionPool Connection pool for parent channels (i.e. the socket channel).
*/
Http2MultiplexedChannelPool(ChannelPool connectionPool, EventLoopGroup eventLoopGroup, Duration idleConnectionTimeout) {
this.connectionPool = connectionPool;
this.eventLoopGroup = eventLoopGroup;
this.connections = ConcurrentHashMap.newKeySet();
this.idleConnectionTimeout = idleConnectionTimeout;
}
@SdkTestInternalApi
Http2MultiplexedChannelPool(ChannelPool connectionPool,
EventLoopGroup eventLoopGroup,
Set<MultiplexedChannelRecord> connections,
Duration idleConnectionTimeout) {
this(connectionPool, eventLoopGroup, idleConnectionTimeout);
this.connections.addAll(connections);
}
public HttpOrHttp2ChannelPool(ChannelPool delegatePool,
EventLoopGroup group,
int maxConcurrency,
NettyConfiguration configuration) {
this.delegatePool = delegatePool;
this.maxConcurrency = maxConcurrency;
this.eventLoopGroup = group;
this.eventLoop = group.next();
this.configuration = configuration;
}
private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) {
if (Protocol.HTTP1_1 == protocol) {
// For HTTP/1.1 we use a traditional channel pool without multiplexing
protocolImpl = BetterFixedChannelPool.builder()
.channelPool(delegatePool)
.executor(eventLoop)
.acquireTimeoutAction(BetterFixedChannelPool.AcquireTimeoutAction.FAIL)
.acquireTimeoutMillis(configuration.connectionAcquireTimeoutMillis())
.maxConnections(maxConcurrency)
.maxPendingAcquires(configuration.maxPendingConnectionAcquires())
.build();
} else {
Duration idleConnectionTimeout = configuration.reapIdleConnections()
? Duration.ofMillis(configuration.idleTimeoutMillis()) : null;
ChannelPool h2Pool = new Http2MultiplexedChannelPool(delegatePool, eventLoopGroup, idleConnectionTimeout);
protocolImpl = BetterFixedChannelPool.builder()
.channelPool(h2Pool)
.executor(eventLoop)
.acquireTimeoutAction(BetterFixedChannelPool.AcquireTimeoutAction.FAIL)
.acquireTimeoutMillis(configuration.connectionAcquireTimeoutMillis())
.maxConnections(maxConcurrency)
.maxPendingAcquires(configuration.maxPendingConnectionAcquires())
.build();
}
// Give the channel back so it can be acquired again by protocolImpl
delegatePool.release(newChannel);
return protocolImpl;
}
public HealthCheckedChannelPool(EventLoopGroup eventLoopGroup,
NettyConfiguration configuration,
ChannelPool delegate) {
this.eventLoopGroup = eventLoopGroup;
this.acquireTimeoutMillis = configuration.connectionAcquireTimeoutMillis();
this.delegate = delegate;
}
@SdkTestInternalApi
Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext,
URI proxyAddress, URI remoteAddress, ChannelPoolHandler handler,
InitHandlerSupplier initHandlerSupplier) {
this.eventLoop = eventLoop;
this.delegate = delegate;
this.sslContext = sslContext;
this.proxyAddress = proxyAddress;
this.remoteAddress = remoteAddress;
this.handler = handler;
this.initHandlerSupplier = initHandlerSupplier;
}
public RequestContext(ChannelPool channelPool,
EventLoopGroup eventLoopGroup,
AsyncExecuteRequest executeRequest,
NettyConfiguration configuration) {
this.channelPool = channelPool;
this.eventLoopGroup = eventLoopGroup;
this.executeRequest = executeRequest;
this.configuration = configuration;
}
@Test
public void channelConfigOptionCheck() throws SSLException {
targetUri = URI.create("https://some-awesome-service-1234.amazonaws.com:8080");
SslContext sslContext = SslContextBuilder.forClient()
.sslProvider(SslProvider.JDK)
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.build();
AtomicReference<ChannelPool> channelPoolRef = new AtomicReference<>();
NettyConfiguration nettyConfiguration = new NettyConfiguration(GLOBAL_HTTP_DEFAULTS);
pipelineInitializer = new ChannelPipelineInitializer(Protocol.HTTP1_1,
sslContext,
SslProvider.JDK,
100,
1024,
Duration.ZERO,
channelPoolRef,
nettyConfiguration,
targetUri);
Channel channel = new EmbeddedChannel();
pipelineInitializer.channelCreated(channel);
assertThat(channel.config().getOption(ChannelOption.ALLOCATOR), is(UnpooledByteBufAllocator.DEFAULT));
}
@Test
public void releaseDoesntCloseIfNotFlagged() throws Exception {
ChannelPool channelPool = Mockito.mock(ChannelPool.class);
MockChannel channel = new MockChannel();
channel.attr(ChannelAttributeKey.CLOSE_ON_RELEASE).set(false);
new HonorCloseOnReleaseChannelPool(channelPool).release(channel);
channel.runAllPendingTasks();
assertThat(channel.isOpen()).isTrue();
Mockito.verify(channelPool, new Times(0)).release(any());
Mockito.verify(channelPool, new Times(1)).release(any(), any());
}
@Test
public void releaseClosesIfFlagged() throws Exception {
ChannelPool channelPool = Mockito.mock(ChannelPool.class);
MockChannel channel = new MockChannel();
channel.attr(ChannelAttributeKey.CLOSE_ON_RELEASE).set(true);
new HonorCloseOnReleaseChannelPool(channelPool).release(channel);
channel.runAllPendingTasks();
assertThat(channel.isOpen()).isFalse();
Mockito.verify(channelPool, new Times(0)).release(any());
Mockito.verify(channelPool, new Times(1)).release(any(), any());
}
@Test
public void failedConnectionAcquireNotifiesPromise() throws InterruptedException {
IOException exception = new IOException();
ChannelPool connectionPool = mock(ChannelPool.class);
when(connectionPool.acquire()).thenReturn(new FailedFuture<>(loopGroup.next(), exception));
ChannelPool pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup.next(), null);
Future<Channel> acquirePromise = pool.acquire().await();
assertThat(acquirePromise.isSuccess()).isFalse();
assertThat(acquirePromise.cause()).isEqualTo(exception);
}
@Test
public void releaseParentChannelIfReleasingLastChildChannelOnGoAwayChannel() {
SocketChannel channel = new NioSocketChannel();
try {
loopGroup.register(channel).awaitUninterruptibly();
ChannelPool connectionPool = mock(ChannelPool.class);
ArgumentCaptor<Promise> releasePromise = ArgumentCaptor.forClass(Promise.class);
when(connectionPool.release(eq(channel), releasePromise.capture())).thenAnswer(invocation -> {
Promise<?> promise = releasePromise.getValue();
promise.setSuccess(null);
return promise;
});
MultiplexedChannelRecord record = new MultiplexedChannelRecord(channel, 8, null);
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup,
Collections.singleton(record), null);
h2Pool.close();
InOrder inOrder = Mockito.inOrder(connectionPool);
inOrder.verify(connectionPool).release(eq(channel), isA(Promise.class));
inOrder.verify(connectionPool).close();
} finally {
channel.close().awaitUninterruptibly();
}
}
@Test
public void acquireAfterCloseFails() throws InterruptedException {
ChannelPool connectionPool = mock(ChannelPool.class);
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup.next(), null);
h2Pool.close();
Future<Channel> acquireResult = h2Pool.acquire().await();
assertThat(acquireResult.isSuccess()).isFalse();
assertThat(acquireResult.cause()).isInstanceOf(IOException.class);
}
@Test
public void closeWaitsForConnectionToBeReleasedBeforeClosingConnectionPool() {
SocketChannel channel = new NioSocketChannel();
try {
loopGroup.register(channel).awaitUninterruptibly();
ChannelPool connectionPool = mock(ChannelPool.class);
ArgumentCaptor<Promise> releasePromise = ArgumentCaptor.forClass(Promise.class);
when(connectionPool.release(eq(channel), releasePromise.capture())).thenAnswer(invocation -> {
Promise<?> promise = releasePromise.getValue();
promise.setSuccess(null);
return promise;
});
MultiplexedChannelRecord record = new MultiplexedChannelRecord(channel, 8, null);
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup,
Collections.singleton(record), null);
h2Pool.close();
InOrder inOrder = Mockito.inOrder(connectionPool);
inOrder.verify(connectionPool).release(eq(channel), isA(Promise.class));
inOrder.verify(connectionPool).close();
} finally {
channel.close().awaitUninterruptibly();
}
}
@Test(timeout = 5_000)
public void interruptDuringClosePreservesFlag() throws InterruptedException {
SocketChannel channel = new NioSocketChannel();
try {
loopGroup.register(channel).awaitUninterruptibly();
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());
channelPromise.setSuccess(channel);
ChannelPool connectionPool = mock(ChannelPool.class);
Promise<Void> releasePromise = Mockito.spy(new DefaultPromise<>(loopGroup.next()));
when(connectionPool.release(eq(channel))).thenReturn(releasePromise);
MultiplexedChannelRecord record = new MultiplexedChannelRecord(channel, 8, null);
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup,
Collections.singleton(record), null);
CompletableFuture<Boolean> interrupteFlagPreserved = new CompletableFuture<>();
Thread t = new Thread(() -> {
try {
h2Pool.close();
} catch (Exception e) {
if (e.getCause() instanceof InterruptedException && Thread.currentThread().isInterrupted()) {
interrupteFlagPreserved.complete(true);
}
}
});
t.start();
t.interrupt();
t.join();
assertThat(interrupteFlagPreserved.join()).isTrue();
} finally {
channel.close().awaitUninterruptibly();
}
}
@Before
public void setup() {
mockChannelPool = mock(ChannelPool.class);
eventLoopGroup = new NioEventLoopGroup();
requestContext = new RequestContext(mockChannelPool,
eventLoopGroup,
AsyncExecuteRequest.builder().build(),
new NettyConfiguration(AttributeMap.empty()));
nettyRequestExecutor = new NettyRequestExecutor(requestContext);
}
@Before
public void beforeMethod() {
channelMock = mock(Channel.class);
channelPoolMock = mock(ChannelPool.class);
eventLoopMock = mock(EventLoop.class);
contentChunkMock = mock(HttpContent.class);
callActiveHolder = new ObjectHolder<>();
callActiveHolder.heldObject = true;
downstreamLastChunkSentHolder = new ObjectHolder<>();
downstreamLastChunkSentHolder.heldObject = false;
spanForDownstreamCallMock = mock(Span.class);
proxySpanTaggingStrategyMock = mock(ProxyRouterSpanNamingAndTaggingStrategy.class);
streamingChannelSpy = spy(new StreamingChannel(
channelMock, channelPoolMock, callActiveHolder, downstreamLastChunkSentHolder, null, null,
spanForDownstreamCallMock, proxySpanTaggingStrategyMock
));
writeAndFlushChannelFutureMock = mock(ChannelFuture.class);
doReturn(eventLoopMock).when(channelMock).eventLoop();
doReturn(writeAndFlushChannelFutureMock).when(channelMock).writeAndFlush(contentChunkMock);
channelIsBrokenAttrMock = mock(Attribute.class);
doReturn(channelIsBrokenAttrMock).when(channelMock).attr(CHANNEL_IS_BROKEN_ATTR);
streamChunkChannelPromiseMock = mock(ChannelPromise.class);
doReturn(streamChunkChannelPromiseMock).when(channelMock).newPromise();
failedFutureMock = mock(ChannelFuture.class);
doReturn(failedFutureMock).when(channelMock).newFailedFuture(any(Throwable.class));
resetTracing();
}