下面列出了怎么用io.netty.util.concurrent.FutureListener的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void removeAllStreamsWhileIteratingActiveStreams() throws InterruptedException, Http2Exception {
final Endpoint<Http2RemoteFlowController> remote = client.remote();
final Endpoint<Http2LocalFlowController> local = client.local();
for (int c = 3, s = 2; c < 5000; c += 2, s += 2) {
local.createStream(c, false);
remote.createStream(s, false);
}
final Promise<Void> promise = group.next().newPromise();
final CountDownLatch latch = new CountDownLatch(client.numActiveStreams());
client.forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) {
client.close(promise).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
assertTrue(promise.isDone());
latch.countDown();
}
});
return true;
}
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Override
protected void doResolve(final String inetHost, final Promise<InetAddress> promise) throws Exception {
// hijack the doResolve request, but do a doResolveAll request under the hood.
// Note that InetSocketAddress.getHostName() will never incur a reverse lookup here,
// because an unresolved address always has a host name.
nameResolver.resolveAll(inetHost).addListener(new FutureListener<List<InetAddress>>() {
@Override
public void operationComplete(Future<List<InetAddress>> future) throws Exception {
if (future.isSuccess()) {
List<InetAddress> inetAddresses = future.getNow();
int numAddresses = inetAddresses.size();
if (numAddresses > 0) {
// if there are multiple addresses: we shall pick one by one
// to support the round robin distribution
promise.setSuccess(inetAddresses.get(randomIndex(numAddresses)));
} else {
promise.setFailure(new UnknownHostException(inetHost));
}
} else {
promise.setFailure(future.cause());
}
}
});
}
@Override
protected void doResolveAll(String inetHost, final Promise<List<InetAddress>> promise) throws Exception {
nameResolver.resolveAll(inetHost).addListener(new FutureListener<List<InetAddress>>() {
@Override
public void operationComplete(Future<List<InetAddress>> future) throws Exception {
if (future.isSuccess()) {
List<InetAddress> inetAddresses = future.getNow();
if (!inetAddresses.isEmpty()) {
// create a copy to make sure that it's modifiable random access collection
List<InetAddress> result = new ArrayList<InetAddress>(inetAddresses);
// rotate by different distance each time to force round robin distribution
Collections.rotate(result, randomIndex(inetAddresses.size()));
promise.setSuccess(result);
} else {
promise.setSuccess(inetAddresses);
}
} else {
promise.setFailure(future.cause());
}
}
});
}
@Override
protected void doResolve(final InetSocketAddress unresolvedAddress, final Promise<InetSocketAddress> promise)
throws Exception {
// Note that InetSocketAddress.getHostName() will never incur a reverse lookup here,
// because an unresolved address always has a host name.
nameResolver.resolve(unresolvedAddress.getHostName())
.addListener(new FutureListener<InetAddress>() {
@Override
public void operationComplete(Future<InetAddress> future) throws Exception {
if (future.isSuccess()) {
promise.setSuccess(new InetSocketAddress(future.getNow(), unresolvedAddress.getPort()));
} else {
promise.setFailure(future.cause());
}
}
});
}
@Override
protected void doResolveAll(final InetSocketAddress unresolvedAddress,
final Promise<List<InetSocketAddress>> promise) throws Exception {
// Note that InetSocketAddress.getHostName() will never incur a reverse lookup here,
// because an unresolved address always has a host name.
nameResolver.resolveAll(unresolvedAddress.getHostName())
.addListener(new FutureListener<List<InetAddress>>() {
@Override
public void operationComplete(Future<List<InetAddress>> future) throws Exception {
if (future.isSuccess()) {
List<InetAddress> inetAddresses = future.getNow();
List<InetSocketAddress> socketAddresses =
new ArrayList<InetSocketAddress>(inetAddresses.size());
for (InetAddress inetAddress : inetAddresses) {
socketAddresses.add(new InetSocketAddress(inetAddress, unresolvedAddress.getPort()));
}
promise.setSuccess(socketAddresses);
} else {
promise.setFailure(future.cause());
}
}
});
}
private void doResolveRec(final String inetHost,
final Promise<T> promise,
final int resolverIndex,
Throwable lastFailure) throws Exception {
if (resolverIndex >= resolvers.length) {
promise.setFailure(lastFailure);
} else {
NameResolver<T> resolver = resolvers[resolverIndex];
resolver.resolve(inetHost).addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (future.isSuccess()) {
promise.setSuccess(future.getNow());
} else {
doResolveRec(inetHost, promise, resolverIndex + 1, future.cause());
}
}
});
}
}
private void doResolveAllRec(final String inetHost,
final Promise<List<T>> promise,
final int resolverIndex,
Throwable lastFailure) throws Exception {
if (resolverIndex >= resolvers.length) {
promise.setFailure(lastFailure);
} else {
NameResolver<T> resolver = resolvers[resolverIndex];
resolver.resolveAll(inetHost).addListener(new FutureListener<List<T>>() {
@Override
public void operationComplete(Future<List<T>> future) throws Exception {
if (future.isSuccess()) {
promise.setSuccess(future.getNow());
} else {
doResolveAllRec(inetHost, promise, resolverIndex + 1, future.cause());
}
}
});
}
}
private void doQuery0() {
assertInEventloop();
if (closed) {
// best effort check to cleanup state after close.
handleTerminalError0(new ClosedServiceDiscovererException(DefaultDnsClient.this +
" has been closed!"));
} else {
final DnsResolutionObserver resolutionObserver = newResolutionObserver();
LOGGER.trace("DnsClient {}, querying DNS for {}", DefaultDnsClient.this, AbstractDnsPublisher.this);
final Future<DnsAnswer<T>> addressFuture = doDnsQuery();
cancellableForQuery = () -> addressFuture.cancel(true);
if (addressFuture.isDone()) {
handleResolveDone0(addressFuture, resolutionObserver);
} else {
addressFuture.addListener((FutureListener<DnsAnswer<T>>) f ->
handleResolveDone0(f, resolutionObserver));
}
}
}
@Override
public void writeAndFlush(final Object obj) {
Future future = channel.writeAndFlush(obj);
future.addListener(new FutureListener() {
@Override
public void operationComplete(Future future1) throws Exception {
if (!future1.isSuccess()) {
Throwable throwable = future1.cause();
LOGGER.error("Failed to send to "
+ NetUtils.channelToString(localAddress(), remoteAddress())
+ " for msg : " + obj
+ ", Cause by:", throwable);
}
}
});
}
private void scheduleSentinelDNSCheck() {
monitorFuture = group.schedule(new Runnable() {
@Override
public void run() {
AtomicInteger sentinelsCounter = new AtomicInteger(sentinelHosts.size());
FutureListener<List<InetSocketAddress>> commonListener = new FutureListener<List<InetSocketAddress>>() {
@Override
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
if (sentinelsCounter.decrementAndGet() == 0) {
scheduleSentinelDNSCheck();
}
}
};
performSentinelDNSCheck(commonListener);
}
}, config.getDnsMonitoringInterval(), TimeUnit.MILLISECONDS);
}
private void acquireWithRetry(AsyncRetryLoop retry, DefaultPromise<Channel> result) {
Future<Channel> poolResult = simpleChannelPool.acquire();
poolResult.addListener(
new FutureListener<Channel>() {
public void operationComplete(Future<Channel> f) {
if (f.isSuccess()) {
result.setSuccess(f.getNow());
} else {
// deal with connection failure here.
if (retry.canRetry()) {
retry.attempt(() -> acquireWithRetry(retry, result));
} else {
result.setFailure(f.cause());
}
}
}
});
}
void resolve(final Promise<T> promise) {
final String[] searchDomains = parent.searchDomains();
if (searchDomains.length == 0 || parent.ndots() == 0 || StringUtil.endsWith(hostname, '.')) {
internalResolve(promise);
} else {
final boolean startWithoutSearchDomain = hasNDots();
final String initialHostname = startWithoutSearchDomain ? hostname : hostname + '.' + searchDomains[0];
final int initialSearchDomainIdx = startWithoutSearchDomain ? 0 : 1;
doSearchDomainQuery(initialHostname, new FutureListener<T>() {
private int searchDomainIdx = initialSearchDomainIdx;
@Override
public void operationComplete(Future<T> future) throws Exception {
Throwable cause = future.cause();
if (cause == null) {
promise.trySuccess(future.getNow());
} else {
if (DnsNameResolver.isTransportOrTimeoutError(cause)) {
promise.tryFailure(new SearchDomainUnknownHostException(cause, hostname));
} else if (searchDomainIdx < searchDomains.length) {
doSearchDomainQuery(hostname + '.' + searchDomains[searchDomainIdx++], this);
} else if (!startWithoutSearchDomain) {
internalResolve(promise);
} else {
promise.tryFailure(new SearchDomainUnknownHostException(cause, hostname));
}
}
}
});
}
}
private void doSearchDomainQuery(String hostname, FutureListener<T> listener) {
DnsNameResolverContext<T> nextContext = newResolverContext(parent, hostname, additionals, resolveCache,
nameServerAddrs);
Promise<T> nextPromise = parent.executor().newPromise();
nextContext.internalResolve(nextPromise);
nextPromise.addListener(listener);
}
@Test
public void removeAllStreamsWhileIteratingActiveStreamsAndExceptionOccurs()
throws InterruptedException, Http2Exception {
final Endpoint<Http2RemoteFlowController> remote = client.remote();
final Endpoint<Http2LocalFlowController> local = client.local();
for (int c = 3, s = 2; c < 5000; c += 2, s += 2) {
local.createStream(c, false);
remote.createStream(s, false);
}
final Promise<Void> promise = group.next().newPromise();
final CountDownLatch latch = new CountDownLatch(1);
try {
client.forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
// This close call is basically a noop, because the following statement will throw an exception.
client.close(promise);
// Do an invalid operation while iterating.
remote.createStream(3, false);
return true;
}
});
} catch (Http2Exception ignored) {
client.close(promise).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
assertTrue(promise.isDone());
latch.countDown();
}
});
}
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
private void testRemoveAllStreams() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final Promise<Void> promise = group.next().newPromise();
client.close(promise).addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
assertTrue(promise.isDone());
latch.countDown();
}
});
assertTrue(latch.await(5, TimeUnit.SECONDS));
}
private void applyHandshakeTimeout(Promise<Channel> p) {
final Promise<Channel> promise = p == null ? handshakePromise : p;
// Set timeout if necessary.
final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
if (handshakeTimeoutMillis <= 0 || promise.isDone()) {
return;
}
final ScheduledFuture<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
if (promise.isDone()) {
return;
}
try {
if (handshakePromise.tryFailure(HANDSHAKE_TIMED_OUT)) {
SslUtils.handleHandshakeFailure(ctx, HANDSHAKE_TIMED_OUT, true);
}
} finally {
releaseAndFailAll(HANDSHAKE_TIMED_OUT);
}
}
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
// Cancel the handshake timeout when handshake is finished.
promise.addListener(new FutureListener<Channel>() {
@Override
public void operationComplete(Future<Channel> f) throws Exception {
timeoutFuture.cancel(false);
}
});
}
private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
final Future<Boolean> f = healthCheck.isHealthy(channel);
if (f.isDone()) {
releaseAndOfferIfHealthy(channel, promise, f);
} else {
f.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
releaseAndOfferIfHealthy(channel, promise, f);
}
});
}
}
/**
* Returns the {@link AddressResolver} associated with the specified {@link EventExecutor}. If there's no associated
* resolved found, this method creates and returns a new resolver instance created by
* {@link #newResolver(EventExecutor)} so that the new resolver is reused on another
* {@link #getResolver(EventExecutor)} call with the same {@link EventExecutor}.
* 返回与指定的EventExecutor关联的地址解析器。如果没有找到关联的解析,该方法将创建并返回由newResolver(EventExecutor)创建的一个新的解析器实例,以便新的解析器在使用相同的EventExecutor的另一个getResolver(EventExecutor)调用中重用。
*/
public AddressResolver<T> getResolver(final EventExecutor executor) {
if (executor == null) {
throw new NullPointerException("executor");
}
if (executor.isShuttingDown()) {
throw new IllegalStateException("executor not accepting a task");
}
AddressResolver<T> r;
synchronized (resolvers) {
r = resolvers.get(executor);
if (r == null) {
final AddressResolver<T> newResolver;
try {
newResolver = newResolver(executor);
} catch (Exception e) {
throw new IllegalStateException("failed to create a new resolver", e);
}
resolvers.put(executor, newResolver);
executor.terminationFuture().addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
synchronized (resolvers) {
resolvers.remove(executor);
}
newResolver.close();
}
});
r = newResolver;
}
}
return r;
}
@Override
public Future<Void> release(final Channel channel, final Promise<Void> promise) {
// We do not call this.throwIfClosed because a channel may be released back to the pool during close
super.release(channel, this.executor.<Void>newPromise().addListener((FutureListener<Void>)future -> {
checkState(this.executor.inEventLoop());
if (this.isClosed()) {
// Since the pool is closed, we have no choice but to close the channel
promise.setFailure(POOL_CLOSED_ON_RELEASE);
channel.close();
return;
}
if (future.isSuccess()) {
this.decrementAndRunTaskQueue();
promise.setSuccess(null);
} else {
final Throwable cause = future.cause();
if (!(cause instanceof IllegalArgumentException)) {
this.decrementAndRunTaskQueue();
}
promise.setFailure(cause);
}
}));
return promise;
}
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
final Bootstrap cb = new Bootstrap();
cb.group(group).channel(NioSocketChannel.class);
InetSocketAddress addr1 = new InetSocketAddress("10.0.0.10", 8888);
InetSocketAddress addr2 = new InetSocketAddress("10.0.0.11", 8888);
//连接池map
ChannelPoolMap<InetSocketAddress, SimpleChannelPool> poolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() {
@Override
protected SimpleChannelPool newPool(InetSocketAddress key) {
return new SimpleChannelPool(cb.remoteAddress(key), new TestChannelPoolHandler());
}
};
final SimpleChannelPool pool1 = poolMap.get(addr1);//取出連接addr1地址的连接池
final SimpleChannelPool pool2 = poolMap.get(addr2);//取出連接addr2地址的连接池
Future<Channel> f1 = pool1.acquire();//获取一个连接
f1.addListener(new FutureListener<Channel>() {
@Override
public void operationComplete(Future<Channel> f) {
if (f.isSuccess()) {
Channel ch = f.getNow();
//连接地址1的某个channel
//使用连接发送消息
// ch.write(msg)
//用完释放
pool1.release(ch);
}
}
});
}
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpRequest request) {
Promise<Channel> promise = ctx.executor().newPromise();
Bootstrap bootstrap = initBootStrap(promise, ctx.channel().eventLoop());
var address = HostPort.parse(request.uri());
bootstrap.connect(address.host(), address.ensurePort()).addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HTTP_1_1, BAD_GATEWAY));
NettyUtils.closeOnFlush(ctx.channel());
}
});
promise.addListener((FutureListener<Channel>) future -> {
if (!future.isSuccess()) {
ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HTTP_1_1, BAD_GATEWAY));
NettyUtils.closeOnFlush(ctx.channel());
return;
}
Channel outboundChannel = future.getNow();
ChannelFuture responseFuture = ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HTTP_1_1, OK));
responseFuture.addListener((ChannelFutureListener) channelFuture -> {
logger.debug("try to remove HttpConnectProxyInitializer, pipeline: {}", ctx.pipeline());
//FIXME: throw NoSuchElementException
if (removed) {
logger.warn("HttpConnectProxyInitializer removed by others?");
ctx.close();
return;
}
ctx.pipeline().remove(HttpConnectProxyInitializer.this);
ctx.pipeline().remove(HttpServerCodec.class);
initTcpProxyHandlers(ctx, address, outboundChannel);
});
});
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Socks4Message socksRequest) {
Socks4CommandRequest command = (Socks4CommandRequest) socksRequest;
if (command.type() != Socks4CommandType.CONNECT) {
NettyUtils.closeOnFlush(ctx.channel());
logger.error("unsupported socks4 command: {}", command.type());
return;
}
Promise<Channel> promise = ctx.executor().newPromise();
Bootstrap bootstrap = initBootStrap(promise, ctx.channel().eventLoop());
bootstrap.connect(command.dstAddr(), command.dstPort()).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
ctx.channel().writeAndFlush(new DefaultSocks4CommandResponse(REJECTED_OR_FAILED));
NettyUtils.closeOnFlush(ctx.channel());
}
});
promise.addListener((FutureListener<Channel>) future -> {
Channel outboundChannel = future.getNow();
if (!future.isSuccess()) {
ctx.channel().writeAndFlush(new DefaultSocks4CommandResponse(REJECTED_OR_FAILED));
NettyUtils.closeOnFlush(ctx.channel());
return;
}
ChannelFuture responseFuture = ctx.channel().writeAndFlush(new DefaultSocks4CommandResponse(SUCCESS));
responseFuture.addListener((ChannelFutureListener) channelFuture -> {
ctx.pipeline().remove(Socks4ProxyHandler.this);
ctx.pipeline().remove(Socks4ServerEncoder.class);
ctx.pipeline().remove(Socks4ServerDecoder.class);
var address = HostPort.of(command.dstAddr(), command.dstPort());
initTcpProxyHandlers(ctx, address, outboundChannel);
});
});
}
private void closeAsync(CompletableFuture<?> future) {
final List<CompletableFuture<?>> dependencies = new ArrayList<>(pools.size());
for (final Iterator<HttpChannelPool> i = pools.values().iterator(); i.hasNext();) {
dependencies.add(i.next().closeAsync());
i.remove();
}
addressResolverGroup.close();
CompletableFuture.allOf(dependencies.toArray(EMPTY_FUTURES)).handle((unused, cause) -> {
if (cause != null) {
logger.warn("Failed to close {}s:", HttpChannelPool.class.getSimpleName(), cause);
}
if (shutdownWorkerGroupOnClose) {
workerGroup.shutdownGracefully().addListener((FutureListener<Object>) f -> {
if (f.cause() != null) {
logger.warn("Failed to shut down a worker group:", f.cause());
}
future.complete(null);
});
} else {
future.complete(null);
}
return null;
});
}
private static <T, U> CompletionStage<U> execute(ThrowingFunction<T, U> task, @Nullable T arg) {
final CompletableFuture<U> future = new CompletableFuture<>();
rule.get().submit(() -> task.run(arg)).addListener((FutureListener<U>) f -> {
if (f.isSuccess()) {
future.complete(f.getNow());
} else {
future.completeExceptionally(f.cause());
}
});
return future;
}
public RFuture<Void> shutdownAsync() {
RPromise<Void> result = new RedissonPromise<Void>();
if (channels.isEmpty()) {
shutdown(result);
return result;
}
ChannelGroupFuture channelsFuture = channels.newCloseFuture();
channelsFuture.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
result.tryFailure(future.cause());
return;
}
shutdown(result);
}
});
for (Channel channel : channels) {
RedisConnection connection = RedisConnection.getFrom(channel);
if (connection != null) {
connection.closeAsync();
}
}
return result;
}
private void monitorMasters(AtomicInteger counter) {
for (Entry<RedisURI, InetSocketAddress> entry : masters.entrySet()) {
log.debug("Request sent to resolve ip address for master host: {}", entry.getKey().getHost());
Future<InetSocketAddress> resolveFuture = resolver.resolve(InetSocketAddress.createUnresolved(entry.getKey().getHost(), entry.getKey().getPort()));
resolveFuture.addListener(new FutureListener<InetSocketAddress>() {
@Override
public void operationComplete(Future<InetSocketAddress> future) throws Exception {
if (counter.decrementAndGet() == 0) {
monitorDnsChange();
}
if (!future.isSuccess()) {
log.error("Unable to resolve " + entry.getKey().getHost(), future.cause());
return;
}
log.debug("Resolved ip: {} for master host: {}", future.getNow().getAddress(), entry.getKey().getHost());
InetSocketAddress currentMasterAddr = entry.getValue();
InetSocketAddress newMasterAddr = future.getNow();
if (!newMasterAddr.getAddress().equals(currentMasterAddr.getAddress())) {
log.info("Detected DNS change. Master {} has changed ip from {} to {}",
entry.getKey(), currentMasterAddr.getAddress().getHostAddress(), newMasterAddr.getAddress().getHostAddress());
MasterSlaveEntry masterSlaveEntry = connectionManager.getEntry(currentMasterAddr);
if (masterSlaveEntry == null) {
if (connectionManager instanceof SingleConnectionManager) {
log.error("Unable to find master entry for {}. Switch Redisson configuration to proxy mode to use multiple IPs resolved by Redis hostname. More details: https://github.com/redisson/redisson/wiki/2.-Configuration#29-proxy-mode", currentMasterAddr);
} else {
log.error("Unable to find master entry for {}", currentMasterAddr);
}
return;
}
masterSlaveEntry.changeMaster(newMasterAddr, entry.getKey());
masters.put(entry.getKey(), newMasterAddr);
}
}
});
}
}
private void performSentinelDNSCheck(FutureListener<List<InetSocketAddress>> commonListener) {
for (RedisURI host : sentinelHosts) {
Future<List<InetSocketAddress>> allNodes = sentinelResolver.resolveAll(InetSocketAddress.createUnresolved(host.getHost(), host.getPort()));
allNodes.addListener(new FutureListener<List<InetSocketAddress>>() {
@Override
public void operationComplete(Future<List<InetSocketAddress>> future) throws Exception {
if (!future.isSuccess()) {
log.error("Unable to resolve " + host.getHost(), future.cause());
return;
}
Set<RedisURI> newUris = future.getNow().stream()
.map(addr -> toURI(addr.getAddress().getHostAddress(), "" + addr.getPort()))
.collect(Collectors.toSet());
for (RedisURI uri : newUris) {
if (!sentinels.containsKey(uri)) {
registerSentinel(uri, getConfig(), host.getHost());
}
}
}
});
if (commonListener != null) {
allNodes.addListener(commonListener);
}
}
}
public Future<Void> send(Object message) {
DefaultPromise<Void> promise = new DefaultPromise<>(eventLoopGroup.next());
log.debug("Acquiring Node: " + this);
Future<Channel> channelResult = connectionPool.acquire();
channelResult.addListener(
new FutureListener<Channel>() {
public void operationComplete(Future<Channel> future) {
if (future.isSuccess()) {
Channel channel = future.getNow();
channel
.writeAndFlush(message)
.addListener(
new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) {
if (channelFuture.isSuccess()) {
log.debug("write finished for " + message);
promise.setSuccess(null);
} else {
log.error("Write error: ", channelFuture.cause());
promise.setFailure(channelFuture.cause());
}
}
});
} else {
log.error("Could not connect to client for write: " + future.cause());
promise.setFailure(future.cause());
}
}
});
return promise;
}
public Future<WriteResult> send(ChicagoMessage message, Promise<WriteResult> result) {
// TODO schedule a timeout to fail this write
resultMap.put(message.id, result);
Future<Channel> channelResult = connectionPool.acquire();
System.out.println("Acquiring Node");
channelResult.addListener(
new FutureListener<Channel>() {
public void operationComplete(Future<Channel> future) {
if (future.isSuccess()) {
System.out.println("Node acquired!");
Channel channel = future.getNow();
// TODO could maybe put a listener here to track successful writes
channel
.writeAndFlush(message)
.addListener(
new ChannelFutureListener() {
public void operationComplete(ChannelFuture channelFuture) {
System.out.println("write finished for " + message.id);
}
});
} else {
result.setFailure(future.cause());
}
}
});
return result;
}
final synchronized ListenableFuture<OperationResult> sendMessage(final Message message, final S requestId,
final Metadata metadata) {
final io.netty.util.concurrent.Future<Void> f = this.session.sendMessage(message);
this.listenerState.updateStatefulSentMsg(message);
final PCEPRequest req = new PCEPRequest(metadata);
this.requests.put(requestId, req);
final short rpcTimeout = this.serverSessionManager.getRpcTimeout();
LOG.trace("RPC response timeout value is {} seconds", rpcTimeout);
if (rpcTimeout > 0) {
setupTimeoutHandler(requestId, req, rpcTimeout);
}
f.addListener((FutureListener<Void>) future -> {
if (!future.isSuccess()) {
synchronized (AbstractTopologySessionListener.this) {
AbstractTopologySessionListener.this.requests.remove(requestId);
}
req.done(OperationResults.UNSENT);
LOG.info("Failed to send request {}, instruction cancelled", requestId, future.cause());
} else {
req.sent();
LOG.trace("Request {} sent to peer (object {})", requestId, req);
}
});
return req.getFuture();
}