下面列出了io.netty.util.concurrent.Future#addListener ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void disconnect() {
// logger.info("close tcp socket, Disconnecting.");
synchronized (this.clientBoot) {
this.channelPromise = null;
final Future<Void> channelCloseFuture;
if (this.channelPromise != null) {
channelCloseFuture = this.channelPromise.channel().close();
} else {
channelCloseFuture = new SucceededFuture<Void>(GlobalEventExecutor.INSTANCE, null);
}
channelCloseFuture.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
NettyClient2.this.clientBoot.config().group().shutdownGracefully();
}
});
}
// logger.info("close netty tcp socket connection");
}
private void acquireStreamOnNewConnection(Promise<Channel> promise) {
Future<Channel> newConnectionAcquire = connectionPool.acquire();
newConnectionAcquire.addListener(f -> {
if (!newConnectionAcquire.isSuccess()) {
promise.setFailure(newConnectionAcquire.cause());
return;
}
Channel parentChannel = newConnectionAcquire.getNow();
try {
parentChannel.attr(ChannelAttributeKey.HTTP2_MULTIPLEXED_CHANNEL_POOL).set(this);
// When the protocol future is completed on the new connection, we're ready for new streams to be added to it.
parentChannel.attr(ChannelAttributeKey.PROTOCOL_FUTURE).get()
.thenAccept(protocol -> acquireStreamOnFreshConnection(promise, parentChannel, protocol))
.exceptionally(throwable -> failAndCloseParent(promise, parentChannel, throwable));
} catch (Throwable e) {
failAndCloseParent(promise, parentChannel, e);
}
});
}
private void acquireStreamOnNewConnection(Promise<Channel> promise) {
log.trace("Creating new connection, number of connections: {}", parentConnections.size());
http2ClientMetrics.http2NewConnectionCount.inc();
long startTime = System.currentTimeMillis();
Future<Channel> newConnectionAcquire = parentConnectionPool.acquire();
newConnectionAcquire.addListener(f -> {
if (!newConnectionAcquire.isSuccess()) {
http2ClientMetrics.http2NewConnectionFailureCount.inc();
promise.setFailure(newConnectionAcquire.cause());
return;
}
http2ClientMetrics.http2ConnectionAcquireTime.update(System.currentTimeMillis() - startTime);
Channel parentChannel = newConnectionAcquire.getNow();
try {
parentChannel.attr(HTTP2_MULTIPLEXED_CHANNEL_POOL).set(this);
// When the protocol future is completed on the new connection, we're ready for new streams to be added to it.
acquireStreamOnFreshConnection(promise, parentChannel);
} catch (Throwable e) {
failAndCloseParent(promise, parentChannel, e);
}
});
}
public void disconnect() {
// logger.info("close tcp socket, Disconnecting.");
synchronized (this.clientBoot) {
this.channelPromise = null;
final Future<Void> channelCloseFuture;
if (this.channelPromise != null) {
channelCloseFuture = this.channelPromise.channel().close();
} else {
channelCloseFuture = new SucceededFuture<Void>(GlobalEventExecutor.INSTANCE, null);
}
channelCloseFuture.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
PlatformSSLClient.this.clientBoot.config().group().shutdownGracefully();
}
});
}
// logger.info("close netty tcp socket connection");
}
public void disconnect() {
// logger.info("close tcp socket, Disconnecting.");
synchronized (this.clientBoot) {
this.channelPromise = null;
final Future<Void> channelCloseFuture;
if (this.channelPromise != null) {
channelCloseFuture = this.channelPromise.channel().close();
} else {
channelCloseFuture = new SucceededFuture<Void>(GlobalEventExecutor.INSTANCE, null);
}
channelCloseFuture.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
NettyClient2.this.clientBoot.config().group().shutdownGracefully();
}
});
}
// logger.info("close netty tcp socket connection");
}
public void disconnect() {
// logger.info("close tcp socket, Disconnecting.");
synchronized (this.clientBoot) {
this.channelPromise = null;
final Future<Void> channelCloseFuture;
if (this.channelPromise != null) {
channelCloseFuture = this.channelPromise.channel().close();
} else {
channelCloseFuture = new SucceededFuture<Void>(GlobalEventExecutor.INSTANCE, null);
}
channelCloseFuture.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(final Future<Void> future) throws Exception {
PlatformSSLClient.this.clientBoot.config().group().shutdownGracefully();
}
});
}
// logger.info("close netty tcp socket connection");
}
private void doCancel(){
if (LOG.isTraceEnabled())
LOG.trace("Cancelled job "+ job.getUniqueName());
Future<Channel> channelFuture=channelPool.acquire();
channelFuture.addListener(new CancelCommand(job.getUniqueName()));
cancelled=true;
if(keepAlive != null)
keepAlive.cancel();
signal();
}
@Override
public void close(final Consumer<AsyncResult<Channel>> consumer) {
super.close(o -> {
List<Future> futures = new LinkedList<>();
if (bossGroup != null) {
futures.add(bossGroup.shutdownGracefully());
}
if (workerGroup != null) {
futures.add(workerGroup.shutdownGracefully());
}
if (consumer != null && futures.isEmpty()) {
//不需要等到
consumer.accept(o.isSuccess() ? new AsyncResult<>(this) : new AsyncResult<>(this, o.getThrowable()));
} else if (consumer != null) {
//等待线程关闭
LinkedList<Throwable> throwables = new LinkedList<>();
if (!o.isSuccess()) {
throwables.add(o.getThrowable());
}
AtomicInteger counter = new AtomicInteger(futures.size());
for (Future future : futures) {
future.addListener(f -> {
if (!f.isSuccess()) {
throwables.add(f.cause() == null ? new TransportException(("unknown exception.")) : f.cause());
}
if (counter.decrementAndGet() == 0) {
if (!throwables.isEmpty()) {
consumer.accept(new AsyncResult<>(this, throwables.peek()));
} else {
consumer.accept(new AsyncResult<>(this));
}
}
});
}
}
});
}
static void connectToSubscriber(final Subscriber subscriber, final Future<?> future) {
future.addListener(f -> {
Throwable cause = f.cause();
if (cause == null) {
subscriber.onComplete();
} else {
subscriber.onError(cause);
}
});
}
private void sendQueries(List<DnsQuestion> questions) {
if (isClosing()) {
return;
}
final Future<List<DnsRecord>> future = resolver.sendQueries(questions, logPrefix);
attemptsSoFar++;
future.addListener(this::onDnsRecords);
}
private void performSentinelDNSCheck(FutureListener<List<InetSocketAddress>> commonListener) {
for (RedisURI host : sentinelHosts) {
Future<List<InetSocketAddress>> allNodes = sentinelResolver.resolveAll(InetSocketAddress.createUnresolved(host.getHost(), host.getPort()));
allNodes.addListener(new FutureListener<List<InetSocketAddress>>() {
@Override
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
if (!future.isSuccess()) {
log.error("Unable to resolve " + host.getHost(), future.cause());
return;
}
Set<RedisURI> newUris = future.getNow().stream()
.map(addr -> toURI(addr.getAddress().getHostAddress(), "" + addr.getPort()))
.collect(Collectors.toSet());
for (RedisURI uri : newUris) {
if (!sentinels.containsKey(uri)) {
registerSentinel(uri, getConfig(), host.getHost());
}
}
}
});
if (commonListener != null) {
allNodes.addListener(commonListener);
}
}
}
private Future<Channel> createConnection(ConnectionKey key)
{
Future<Channel> future = connectionFactory.getConnection(key.getConnectionParameters(), key.getAddress());
// remove connection from cache when it is closed
future.addListener(channelFuture -> {
if (future.isSuccess()) {
future.getNow().closeFuture().addListener(closeFuture -> cachedConnections.asMap().remove(key, future));
}
});
return future;
}
private static void closeConnection(Future<Channel> future)
{
future.addListener(ignored -> {
if (future.isSuccess()) {
Channel channel = future.getNow();
channel.close();
}
});
}
@Override
public void close () throws Exception
{
synchronized ( this )
{
if ( this.channel != null )
{
this.channel.close ();
this.channel = null;
}
for ( final Module module : this.modules )
{
module.dispose ();
}
}
logger.debug ( "Shutting down main group" );
final Future<?> f = this.group.shutdownGracefully ();
f.addListener ( new GenericFutureListener<Future<Object>> () {
@Override
public void operationComplete ( final Future<Object> arg0 ) throws Exception
{
disposeExecutor ();
}
} );
}
@Override
public void shutdown() {
if (group != null) {
Future<?> futureShutdown = group.shutdownGracefully();
futureShutdown.addListener(future -> startListenerHandle(future, finishListener));
}
}
private void query(final DnsServerAddressStream nameServerAddrStream,
final int nameServerAddrStreamIndex,
final DnsQuestion question,
final DnsQueryLifecycleObserver queryLifecycleObserver,
final Promise<T> promise,
final Throwable cause) {
if (nameServerAddrStreamIndex >= nameServerAddrStream.size() || allowedQueries == 0 || promise.isCancelled()) {
tryToFinishResolve(nameServerAddrStream, nameServerAddrStreamIndex, question, queryLifecycleObserver,
promise, cause);
return;
}
--allowedQueries;
final InetSocketAddress nameServerAddr = nameServerAddrStream.next();
final ChannelPromise writePromise = parent.ch.newPromise();
final Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> f = parent.query0(
nameServerAddr, question, additionals, writePromise,
parent.ch.eventLoop().<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>>newPromise());
queriesInProgress.add(f);
queryLifecycleObserver.queryWritten(nameServerAddr, writePromise);
f.addListener(new FutureListener<AddressedEnvelope<DnsResponse, InetSocketAddress>>() {
@Override
public void operationComplete(Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> future) {
queriesInProgress.remove(future);
if (promise.isDone() || future.isCancelled()) {
queryLifecycleObserver.queryCancelled(allowedQueries);
// Check if we need to release the envelope itself. If the query was cancelled the getNow() will
// return null as well as the Future will be failed with a CancellationException.
AddressedEnvelope<DnsResponse, InetSocketAddress> result = future.getNow();
if (result != null) {
result.release();
}
return;
}
final Throwable queryCause = future.cause();
try {
if (queryCause == null) {
onResponse(nameServerAddrStream, nameServerAddrStreamIndex, question, future.getNow(),
queryLifecycleObserver, promise);
} else {
// Server did not respond or I/O error occurred; try again.
queryLifecycleObserver.queryFailed(queryCause);
query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, promise, queryCause);
}
} finally {
tryToFinishResolve(nameServerAddrStream, nameServerAddrStreamIndex, question,
// queryLifecycleObserver has already been terminated at this point so we must
// not allow it to be terminated again by tryToFinishResolve.
NoopDnsQueryLifecycleObserver.INSTANCE,
promise, queryCause);
}
}
});
}
static <T> ListenableFuture<T> listenable(final Future<T> future) {
final ListenableFutureAdapter<T> adapter = new ListenableFutureAdapter<T>();
future.addListener(adapter);
return adapter;
}
private void sendQueries(List<DnsQuestion> questions, String hostname,
CompletableFuture<CacheEntry> result) {
final Future<List<DnsRecord>> recordsFuture = resolver.sendQueries(questions, hostname);
recordsFuture.addListener(f -> {
if (!f.isSuccess()) {
final Throwable cause = f.cause();
// TODO(minwoox): In Netty, DnsNameResolver only caches if the failure was not because of an
// IO error / timeout that was caused by the query itself.
// To figure that out, we need to check the cause of the UnknownHostException.
// If it's null, then we can cache the cause. However, this is very fragile
// because Netty can change the behavior while we are not noticing that.
// So sending a PR to upstream would be the best solution.
final boolean hasCacheableCause;
if (cause instanceof UnknownHostException) {
final UnknownHostException unknownHostException = (UnknownHostException) cause;
hasCacheableCause = unknownHostException.getCause() == null;
} else {
hasCacheableCause = false;
}
result.complete(new CacheEntry(null, -1, questions, cause, hasCacheableCause));
return;
}
@SuppressWarnings("unchecked")
final List<DnsRecord> records = (List<DnsRecord>) f.getNow();
InetAddress inetAddress = null;
long ttlMillis = -1;
try {
for (DnsRecord r : records) {
final byte[] addrBytes = extractAddressBytes(r, logger, hostname);
if (addrBytes == null) {
continue;
}
try {
inetAddress = InetAddress.getByAddress(hostname, addrBytes);
ttlMillis = TimeUnit.SECONDS.toMillis(
Math.max(Math.min(r.timeToLive(), maxTtl), minTtl));
break;
} catch (UnknownHostException e) {
// Should never reach here because we already validated it in extractAddressBytes.
result.complete(new CacheEntry(null, -1, questions, new IllegalArgumentException(
"Invalid address: " + hostname, e), false));
return;
}
}
} finally {
records.forEach(ReferenceCountUtil::safeRelease);
}
final CacheEntry cacheEntry;
if (inetAddress == null) {
cacheEntry = new CacheEntry(null, -1, questions, new UnknownHostException(
"failed to receive DNS records for " + hostname), true);
} else {
cacheEntry = new CacheEntry(inetAddress, ttlMillis, questions, null, false);
}
result.complete(cacheEntry);
});
}
public void shutdown() {
if (group != null) {
Future<?> futureShutdown = group.shutdownGracefully();
futureShutdown.addListener(future -> startListenerHandle(future, finishListener));
}
}
private static <T> void addFutureListener(final InetSocketAddress localAddress, final Future<T> future) {
future.addListener(future1 -> Preconditions.checkArgument(future1.isSuccess(),
"Unable to start bgp session on %s", localAddress, future1.cause()));
}