下面列出了io.netty.channel.pool.AbstractChannelPoolMap#io.netty.util.concurrent.Promise 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
FakeClockScheduledNettyFuture(
EventLoop eventLoop, final Runnable command, long delay, TimeUnit timeUnit) {
super(eventLoop);
Runnable wrap = new Runnable() {
@Override
public void run() {
try {
command.run();
} catch (Throwable t) {
setFailure(t);
return;
}
if (!isDone()) {
Promise<Void> unused = setSuccess(null);
}
// else: The command itself, such as a shutdown task, might have cancelled all the
// scheduled tasks already.
}
};
future = fakeClock.getScheduledExecutorService().schedule(wrap, delay, timeUnit);
}
/**
* Creates a {@link BiConsumer} that notifies the promise of any failures either via the throwable passed into the BiConsumer
* or as a result of running the successConsumer. This assumes that the successConsumer will notify the promise when it
* completes successfully.
*
* @param successConsumer BiConsumer to call if the result is successful. Promise is also passed and must be notified on
* success.
* @param promise Promise to notify.
* @param <SuccessT> Success type.
* @param <PromiseT> Type being fulfilled by the Promise.
* @return BiConsumer that can be used in a {@link CompletableFuture#whenComplete(BiConsumer)} method.
*/
public static <SuccessT, PromiseT> BiConsumer<SuccessT, ? super Throwable> asyncPromiseNotifyingBiConsumer(
BiConsumer<SuccessT, Promise<PromiseT>> successConsumer, Promise<PromiseT> promise) {
return (success, fail) -> {
if (fail != null) {
promise.setFailure(fail);
} else {
try {
successConsumer.accept(success, promise);
} catch (Throwable e) {
// If the successConsumer fails synchronously then we can notify the promise. If it fails asynchronously
// it's up to the successConsumer to notify.
promise.setFailure(e);
}
}
};
}
@Test
public void promiseCancelledBeforeAcquireComplete_closesAndReleasesChannel() throws InterruptedException {
Promise<Channel> acquireFuture = eventExecutor.newPromise();
acquireFuture.setFailure(new RuntimeException("Changed my mind!"));
when(mockDelegatePool.acquire(any(Promise.class))).thenAnswer((Answer<Promise>) invocationOnMock -> {
Promise p = invocationOnMock.getArgumentAt(0, Promise.class);
p.setSuccess(channel);
return p;
});
cancellableAcquireChannelPool.acquire(acquireFuture);
Thread.sleep(500);
verify(mockDelegatePool).release(eq(channel));
assertThat(channel.closeFuture().isDone()).isTrue();
}
/**
* Constructor
*
* @param retryPolicy the policy for retries
* @param connectionState which contains current connection details
* @param retryHandler handler for retries
*/
public ResponsePromise(RetryPolicy retryPolicy, ConnectionState connectionState, RetryHandler retryHandler) {
this.connectionState = connectionState;
this.retryHandler = retryHandler;
this.retryPolicy = retryPolicy;
promiseHandler = new GenericFutureListener<Promise<T>>() {
@Override
public void operationComplete(Promise<T> future) throws Exception {
handlePromise(future);
}
};
this.connectionFailHandler = new ConnectionFailHandler() {
@Override
public void catchException(IOException exception) {
handleRetry(exception);
}
};
}
private void acquireStreamOnNewConnection(Promise<Channel> promise) {
Future<Channel> newConnectionAcquire = connectionPool.acquire();
newConnectionAcquire.addListener(f -> {
if (!newConnectionAcquire.isSuccess()) {
promise.setFailure(newConnectionAcquire.cause());
return;
}
Channel parentChannel = newConnectionAcquire.getNow();
try {
parentChannel.attr(ChannelAttributeKey.HTTP2_MULTIPLEXED_CHANNEL_POOL).set(this);
// When the protocol future is completed on the new connection, we're ready for new streams to be added to it.
parentChannel.attr(ChannelAttributeKey.PROTOCOL_FUTURE).get()
.thenAccept(protocol -> acquireStreamOnFreshConnection(promise, parentChannel, protocol))
.exceptionally(throwable -> failAndCloseParent(promise, parentChannel, throwable));
} catch (Throwable e) {
failAndCloseParent(promise, parentChannel, e);
}
});
}
@Test
public void recordsWithoutReservedStreamsAreClosedAfterTimeout() throws InterruptedException {
int idleTimeoutMillis = 1000;
EmbeddedChannel channel = newHttp2Channel();
MultiplexedChannelRecord record = new MultiplexedChannelRecord(channel, 1, Duration.ofMillis(idleTimeoutMillis));
Promise<Channel> streamPromise = channel.eventLoop().newPromise();
record.acquireStream(streamPromise);
channel.runPendingTasks();
assertThat(streamPromise.isSuccess()).isTrue();
assertThat(channel.isOpen()).isTrue();
record.closeAndReleaseChild(streamPromise.getNow());
assertThat(channel.isOpen()).isTrue();
Thread.sleep(idleTimeoutMillis * 2);
channel.runPendingTasks();
assertThat(channel.isOpen()).isFalse();
}
@Override
public Future<Channel> acquire(Promise<Channel> promise) {
http2ClientMetrics.http2NewStreamCount.inc();
if (closed.get()) {
return promise.setFailure(new IOException("Channel pool is closed!"));
}
// Only when number of connections reach http2MinConnectionPerPort, we reuse connections.
if (parentConnections.size() >= http2ClientConfig.http2MinConnectionPerPort) {
List<MultiplexedChannelRecord> multiplexedChannelRecords = new ArrayList<>(parentConnections);
Collections.shuffle(multiplexedChannelRecords);
// Attempt at most multiplexedChannelRecords.size(). No slip acquire expected.
for (MultiplexedChannelRecord multiplexedChannelRecord : multiplexedChannelRecords) {
if (acquireStreamOnInitializedConnection(multiplexedChannelRecord, promise)) {
return promise;
}
log.warn("Stream slip acquire on {}", inetSocketAddress);
http2ClientMetrics.http2StreamSlipAcquireCount.inc();
}
}
// No connection or No available streams on existing connections, establish new connection and add it to set.
acquireStreamOnNewConnection(promise);
return promise;
}
@Test(timeout = 3000)
public void testHandlerRemovedExceptionFromChildHandlerIsPropagated() {
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
try {
final Promise<Void> promise = group1.next().newPromise();
String handlerName = "foo";
final Exception exception = new RuntimeException();
ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.addLast(handlerName, new ChannelHandlerAdapter() {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
throw exception;
}
});
pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
group.register(pipeline.channel()).syncUninterruptibly();
pipeline.remove(handlerName);
promise.syncUninterruptibly();
} finally {
group1.shutdownGracefully();
}
}
@Override
protected void doResolve(final InetSocketAddress unresolvedAddress, final Promise<InetSocketAddress> promise)
throws Exception {
// Note that InetSocketAddress.getHostName() will never incur a reverse lookup here,
// because an unresolved address always has a host name.
nameResolver.resolve(unresolvedAddress.getHostName())
.addListener(new FutureListener<InetAddress>() {
@Override
public void operationComplete(Future<InetAddress> future) throws Exception {
if (future.isSuccess()) {
promise.setSuccess(new InetSocketAddress(future.getNow(), unresolvedAddress.getPort()));
} else {
promise.setFailure(future.cause());
}
}
});
}
@Override
public Future<T> asyncExecute(Promise<T> promise, Object... request) {
if (promise == null) {
throw new IllegalArgumentException("promise should not be null");
}
// async execute
eventExecutor.execute(new Runnable() {
@Override
public void run() {
try {
T response = doExecute(request);
promise.setSuccess(response);
} catch (Exception e) {
promise.setFailure(e);
}
}
});
// return the promise back
return promise;
}
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
checkNotNull(channel, "channel");
checkNotNull(promise, "promise");
try {
EventLoop loop = channel.eventLoop();
if (loop.inEventLoop()) {
doReleaseChannel(channel, promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
// 释放channel
doReleaseChannel(channel, promise);
}
});
}
} catch (Throwable cause) {
// 关闭channel,发布promise失败事件
closeAndFail(channel, cause, promise);
}
return promise;
}
@Test
public void cancelExecuteFuture_channelAcquired_submitsRunnable() {
EventLoop mockEventLoop = mock(EventLoop.class);
Channel mockChannel = mock(Channel.class);
when(mockChannel.eventLoop()).thenReturn(mockEventLoop);
when(mockChannelPool.acquire(any(Promise.class))).thenAnswer((Answer<Promise>) invocationOnMock -> {
Promise p = invocationOnMock.getArgumentAt(0, Promise.class);
p.setSuccess(mockChannel);
return p;
});
CompletableFuture<Void> executeFuture = nettyRequestExecutor.execute();
executeFuture.cancel(true);
verify(mockEventLoop).submit(any(Runnable.class));
}
@Override
protected void doResolveAll(final InetSocketAddress unresolvedAddress,
final Promise<List<InetSocketAddress>> promise) throws Exception {
// Note that InetSocketAddress.getHostName() will never incur a reverse lookup here,
// because an unresolved address always has a host name.
nameResolver.resolveAll(unresolvedAddress.getHostName())
.addListener(new FutureListener<List<InetAddress>>() {
@Override
public void operationComplete(Future<List<InetAddress>> future) throws Exception {
if (future.isSuccess()) {
List<InetAddress> inetAddresses = future.getNow();
List<InetSocketAddress> socketAddresses =
new ArrayList<InetSocketAddress>(inetAddresses.size());
for (InetAddress inetAddress : inetAddresses) {
socketAddresses.add(new InetSocketAddress(inetAddress, unresolvedAddress.getPort()));
}
promise.setSuccess(socketAddresses);
} else {
promise.setFailure(future.cause());
}
}
});
}
private void doReleaseChannel(Channel channel, Promise<Void> promise) {
assert channel.eventLoop().inEventLoop();
// Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.从通道中删除POOL_KEY属性,如果没有失败,则检查它是否从这个池中获得。
if (channel.attr(POOL_KEY).getAndSet(null) != this) {
closeAndFail(channel,
// Better include a stacktrace here as this is an user error.最好在这里包含一个stacktrace,因为这是一个用户错误。
new IllegalArgumentException(
"Channel " + channel + " was not acquired from this ChannelPool"),
promise);
} else {
try {
// 健康检查并释放
if (releaseHealthCheck) {
doHealthCheckOnRelease(channel, promise);
} else {
// 直接释放
releaseAndOffer(channel, promise);
}
} catch (Throwable cause) {
closeAndFail(channel, cause, promise);
}
}
}
protected void handleConnectCompletion(
ChannelFuture cf, Promise<PooledConnection> callerPromise, CurrentPassport passport) {
connCreationsInProgress.decrementAndGet();
if (cf.isSuccess()) {
passport.add(PassportState.ORIGIN_CH_CONNECTED);
stats.incrementOpenConnectionsCount();
createConnSucceededCounter.increment();
connsInUse.incrementAndGet();
createConnection(cf, callerPromise, passport);
}
else {
stats.incrementSuccessiveConnectionFailureCount();
stats.addToFailureCount();
stats.decrementActiveRequestsCount();
createConnFailedCounter.increment();
callerPromise.setFailure(new OriginConnectException(cf.cause().getMessage(), OutboundErrorType.CONNECT_ERROR));
}
}
public Future<Http2StreamChannel> open(final Promise<Http2StreamChannel> promise) {
final ChannelHandlerContext ctx = channel.pipeline().context(Http2MultiplexCodec.class);
if (ctx == null) {
if (channel.isActive()) {
promise.setFailure(new IllegalStateException(StringUtil.simpleClassName(Http2MultiplexCodec.class) +
" must be in the ChannelPipeline of Channel " + channel));
} else {
promise.setFailure(new ClosedChannelException());
}
} else {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
open0(ctx, promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
open0(ctx, promise);
}
});
}
}
return promise;
}
@Test
public void removeAllStreamsWhileIteratingActiveStreams() throws InterruptedException, Http2Exception {
final Endpoint<Http2RemoteFlowController> remote = client.remote();
final Endpoint<Http2LocalFlowController> local = client.local();
for (int c = 3, s = 2; c < 5000; c += 2, s += 2) {
local.createStream(c, false);
remote.createStream(s, false);
}
final Promise<Void> promise = group.next().newPromise();
final CountDownLatch latch = new CountDownLatch(client.numActiveStreams());
client.forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) {
client.close(promise).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
assertTrue(promise.isDone());
latch.countDown();
}
});
return true;
}
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test(timeout = 3000)
public void testHandlerAddedExceptionFromChildHandlerIsPropagated() {
final EventExecutorGroup group1 = new DefaultEventExecutorGroup(1);
try {
final Promise<Void> promise = group1.next().newPromise();
final AtomicBoolean handlerAdded = new AtomicBoolean();
final Exception exception = new RuntimeException();
ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.addLast(group1, new CheckExceptionHandler(exception, promise));
pipeline.addFirst(new ChannelHandlerAdapter() {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handlerAdded.set(true);
throw exception;
}
});
assertFalse(handlerAdded.get());
group.register(pipeline.channel());
promise.syncUninterruptibly();
} finally {
group1.shutdownGracefully();
}
}
/**
* Performs TLS renegotiation.
*/
public Future<Channel> renegotiate(final Promise<Channel> promise) {
if (promise == null) {
throw new NullPointerException("promise");
}
ChannelHandlerContext ctx = this.ctx;
if (ctx == null) {
throw new IllegalStateException();
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
handshake(promise);
}
});
return promise;
}
handshake(promise);
return promise;
}
@Override
public Future<Channel> acquire(final Promise<Channel> promise) {
try {
if (executor.inEventLoop()) {
// 获取channel
acquire0(promise);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
acquire0(promise);
}
});
}
} catch (Throwable cause) {
promise.setFailure(cause);
}
return promise;
}
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
ObjectUtil.checkNotNull(promise, "promise");
Promise<Void> p = executor.newPromise();
delegateChannelPool.release(channel, p.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
assert executor.inEventLoop();
if (closed) {
// Since the pool is closed, we have no choice but to close the channel
channel.close();
promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
return;
}
if (future.isSuccess()) {
decrementAndRunTaskQueue();
promise.setSuccess(null);
} else {
Throwable cause = future.cause();
// Check if the exception was not because of we passed the Channel to the wrong pool.
if (!(cause instanceof IllegalArgumentException)) {
decrementAndRunTaskQueue();
}
promise.setFailure(future.cause());
}
}
}));
return promise;
}
private void closeChannel(final Promise<Void> promise) {
channel.close().addListener(
future -> {
if (future
.isSuccess()) {
promise.setSuccess(null);
} else {
promise.setFailure(future
.cause());
}
});
}
/**
* Creates a {@link BiConsumer} that notifies the promise of any failures either via the {@link Throwable} passed into the
* BiConsumer of as a result of running the successFunction.
*
* @param successFunction Function called to process the successful result and map it into the result to notify the promise
* with.
* @param promise Promise to notify of success or failure.
* @param <SuccessT> Success type.
* @param <PromiseT> Type being fulfilled by the promise.
* @return BiConsumer that can be used in a {@link CompletableFuture#whenComplete(BiConsumer)} method.
*/
public static <SuccessT, PromiseT> BiConsumer<SuccessT, ? super Throwable> promiseNotifyingBiConsumer(
Function<SuccessT, PromiseT> successFunction, Promise<PromiseT> promise) {
return (success, fail) -> {
if (fail != null) {
promise.setFailure(fail);
} else {
try {
promise.setSuccess(successFunction.apply(success));
} catch (Throwable e) {
promise.setFailure(e);
}
}
};
}
@SuppressWarnings("unchecked")
public CompletableFuture<Void> execute() {
Promise<Channel> channelFuture = context.eventLoopGroup().next().newPromise();
context.channelPool().acquire(channelFuture);
executeFuture = createExecuteFuture(channelFuture);
channelFuture.addListener((GenericFutureListener) this::makeRequestListener);
return executeFuture;
}
@Override
@SafeVarargs
public final Promise<T> addListeners(
GenericFutureListener<? extends Future<? super T>>... listeners) {
for (GenericFutureListener<? extends Future<? super T>> l : listeners) {
delegate.addListeners(RequestContextAwareFutureListener.of(context, l));
}
return this;
}
/**
* Remove all subscriptions for the given topic.
* If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)}
*
* @param topic The topic to unsubscribe for
* @return A future which will be completed when the server acknowledges our unsubscribe request
*/
@Override
public Future<Void> off(String topic) {
Promise<Void> future = new DefaultPromise<>(this.eventLoop.next());
ImmutableSet<MqttSubscription> subscriptions = ImmutableSet.copyOf(this.subscriptions.get(topic));
for (MqttSubscription subscription : subscriptions) {
for (MqttSubscription handSub : this.handlerToSubscribtion.get(subscription.getHandler())) {
this.subscriptions.remove(topic, handSub);
}
this.handlerToSubscribtion.remove(subscription.getHandler(), subscription);
}
this.checkSubscribtions(topic, future);
return future;
}
/**
* Attach Netty Promise
*
* @param promise netty promise to set up response promise with
*/
public void attachNettyPromise(Promise<T> promise) {
promise.addListener(promiseHandler);
Promise<T> oldPromise = this.promise;
this.promise = promise;
if (oldPromise != null) {
oldPromise.removeListener(promiseHandler);
oldPromise.cancel(true);
}
}
@Test
public void requestWriteFails_failsPromise() {
DefaultChannelPromise writePromise = new DefaultChannelPromise(mockChannel, GROUP.next());
writePromise.setFailure(new IOException("boom"));
when(mockChannel.writeAndFlush(anyObject())).thenReturn(writePromise);
Promise<Channel> promise = GROUP.next().newPromise();
ProxyTunnelInitHandler handler = new ProxyTunnelInitHandler(mockChannelPool, REMOTE_HOST, promise);
handler.handlerAdded(mockCtx);
assertThat(promise.awaitUninterruptibly().isSuccess()).isFalse();
}
ClientHandler(Promise<Boolean> promise, long[] minimalWaitBetween, int[] multipleMessage,
int[] autoRead) {
this.minimalWaitBetween = minimalWaitBetween;
this.multipleMessage = Arrays.copyOf(multipleMessage, multipleMessage.length);
this.promise = promise;
this.autoRead = autoRead;
}
private void setupChannel(Channel ch, Promise<Channel> acquirePromise) {
if (isTunnelEstablished(ch)) {
log.debug(() -> String.format("Tunnel already established for %s", ch.id().asShortText()));
acquirePromise.setSuccess(ch);
return;
}
log.debug(() -> String.format("Tunnel not yet established for channel %s. Establishing tunnel now.",
ch.id().asShortText()));
Promise<Channel> tunnelEstablishedPromise = eventLoop.newPromise();
SslHandler sslHandler = createSslHandlerIfNeeded(ch.alloc());
if (sslHandler != null) {
ch.pipeline().addLast(sslHandler);
}
ch.pipeline().addLast(initHandlerSupplier.newInitHandler(delegate, remoteAddress, tunnelEstablishedPromise));
tunnelEstablishedPromise.addListener((Future<Channel> f) -> {
if (f.isSuccess()) {
Channel tunnel = f.getNow();
handler.channelCreated(tunnel);
tunnel.attr(TUNNEL_ESTABLISHED_KEY).set(true);
acquirePromise.setSuccess(tunnel);
} else {
ch.close();
delegate.release(ch);
Throwable cause = f.cause();
log.error(() -> String.format("Unable to establish tunnel for channel %s", ch.id().asShortText()), cause);
acquirePromise.setFailure(cause);
}
});
}