java.nio.channels.NotYetConnectedException#io.netty.channel.ChannelFuture源码实例Demo

下面列出了java.nio.channels.NotYetConnectedException#io.netty.channel.ChannelFuture 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: netty4.0.27Learn   文件: JZlibEncoder.java
@Override
public ChannelFuture close(final ChannelPromise promise) {
    ChannelHandlerContext ctx = ctx();
    EventExecutor executor = ctx.executor();
    if (executor.inEventLoop()) {
        return finishEncode(ctx, promise);
    } else {
        final ChannelPromise p = ctx.newPromise();
        executor.execute(new Runnable() {
            @Override
            public void run() {
                ChannelFuture f = finishEncode(ctx(), p);
                f.addListener(new ChannelPromiseNotifier(promise));
            }
        });
        return p;
    }
}
 
源代码2 项目: bgpcep   文件: AddPathBasePathsTest.java
@Override
@Before
public void setUp() throws Exception {
    super.setUp();
    final TablesKey tk = new TablesKey(Ipv4AddressFamily.class, UnicastSubsequentAddressFamily.class);
    final Map<TablesKey, PathSelectionMode> pathTables = ImmutableMap.of(tk,
        BasePathSelectionModeFactory.createBestPathSelectionStrategy());

    this.ribImpl = new RIBImpl(this.tableRegistry, new RibId("test-rib"), AS_NUMBER, new BgpId(RIB_ID),
            this.ribExtension,
            this.serverDispatcher, this.codecsRegistry, getDomBroker(), getDataBroker(), this.policies,
            TABLES_TYPE, pathTables);
    this.ribImpl.instantiateServiceInstance();
    final ChannelFuture channelFuture = this.serverDispatcher.createServer(
        new InetSocketAddress(RIB_ID, PORT.toJava()));
    waitFutureSuccess(channelFuture);
    this.serverChannel = channelFuture.channel();
}
 
源代码3 项目: Jantent   文件: HttpClient.java
public void start() throws Exception{
    EventLoopGroup group = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new HttpClientInitializer());

        // 发起异步连接
        ChannelFuture future = bootstrap.connect(new InetSocketAddress("127.0.0.1", 3560));
        // 当客户端链路关闭
        future.channel().closeFuture().sync();
    }finally {
        // 优雅退出,释放NIO线程组
        group.shutdownGracefully();
    }
}
 
源代码4 项目: crate   文件: Netty4HttpServerTransport.java
private TransportAddress bindAddress(final InetAddress hostAddress) {
    final AtomicReference<Exception> lastException = new AtomicReference<>();
    final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
    boolean success = port.iterate(portNumber -> {
        try {
            synchronized (serverChannels) {
                ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)).sync();
                serverChannels.add(future.channel());
                boundSocket.set((InetSocketAddress) future.channel().localAddress());
            }
        } catch (Exception e) {
            lastException.set(e);
            return false;
        }
        return true;
    });
    if (!success) {
        throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get());
    }

    if (logger.isDebugEnabled()) {
        logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get()));
    }
    return new TransportAddress(boundSocket.get());
}
 
源代码5 项目: aesh-readline   文件: NettyIoAcceptor.java
@Override
public void bind(SocketAddress address) throws IOException {
    InetSocketAddress inetAddress = (InetSocketAddress) address;
    ChannelFuture f = bootstrap.bind(inetAddress);
    Channel channel = f.channel();
    channelGroup.add(channel);
    try {
        f.sync();
        SocketAddress bound = channel.localAddress();
        boundAddresses.put(bound, channel);
        channel.closeFuture().addListener(fut -> {
            boundAddresses.remove(bound);
        });
    } catch (Exception e) {
        throw Helper.toIOException(e);
    }
}
 
源代码6 项目: netty-4.1.22   文件: WebSocketServerHandler.java
private static void sendHttpResponse(
        ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
    // Generate an error page if response getStatus code is not OK (200).
    if (res.status().code() != 200) {
        ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
        res.content().writeBytes(buf);
        buf.release();
        HttpUtil.setContentLength(res, res.content().readableBytes());
    }

    // Send the response and close the connection if necessary.
    ChannelFuture f = ctx.channel().writeAndFlush(res);
    if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
        f.addListener(ChannelFutureListener.CLOSE);
    }
}
 
源代码7 项目: dfactor   文件: DFSocketManager.java
protected ChannelFuture doTcpConntecSync(DFTcpClientCfg cfg, EventLoopGroup ioGroup, ChannelHandler handler){
	if(ioGroup == null){
		return null;
	}
	Bootstrap boot = new Bootstrap();
	boot.group(ioGroup)
		.option(ChannelOption.ALLOCATOR, 
				PooledByteBufAllocator.DEFAULT)
		.option(ChannelOption.SO_KEEPALIVE, cfg.isKeepAlive())
		.option(ChannelOption.SO_RCVBUF, cfg.getSoRecvBufLen())
		.option(ChannelOption.SO_SNDBUF, cfg.getSoSendBufLen())
		.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)cfg.getConnTimeout())
		.option(ChannelOption.TCP_NODELAY, cfg.isTcpNoDelay())
		.handler(new TcpHandlerInit(false, cfg.getTcpProtocol(), 
				cfg.getTcpMsgMaxLength(), 0, 0, cfg.getWsUri(), null, 
				cfg.getDecoder(), cfg.getEncoder(), cfg.getUserHandler(), cfg.getSslCfg()
				, cfg.getReqData(), handler));
	if(ioGroup instanceof EpollEventLoopGroup){
		boot.channel(EpollSocketChannel.class);
	}else{
		boot.channel(NioSocketChannel.class);
	}
	ChannelFuture future = boot.connect(cfg.host, cfg.port);
	return future;
}
 
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (outboundChannel.isActive()) {
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // was able to flush out data, start to read the next chunk
                    ctx.channel().read();
                } else {
                    future.channel().close();
                }
            }
        });
    }
}
 
源代码9 项目: netty-4.1.22   文件: StreamBufferingEncoderTest.java
@Test
public void receivingGoAwayFailsBufferedStreams() {
    encoder.writeSettingsAck(ctx, newPromise());
    setMaxConcurrentStreams(5);

    int streamId = 3;
    List<ChannelFuture> futures = new ArrayList<ChannelFuture>();
    for (int i = 0; i < 9; i++) {
        futures.add(encoderWriteHeaders(streamId, newPromise()));
        streamId += 2;
    }
    assertEquals(4, encoder.numBufferedStreams());

    connection.goAwayReceived(11, 8, EMPTY_BUFFER);

    assertEquals(5, connection.numActiveStreams());
    int failCount = 0;
    for (ChannelFuture f : futures) {
        if (f.cause() != null) {
            failCount++;
        }
    }
    assertEquals(9, failCount);
    assertEquals(0, encoder.numBufferedStreams());
}
 
源代码10 项目: armeria   文件: ClientHttp2ObjectEncoder.java
@Override
public ChannelFuture doWriteHeaders(int id, int streamId, RequestHeaders headers, boolean endStream) {
    final Http2Connection conn = encoder().connection();
    if (isStreamPresentAndWritable(streamId)) {
        if (keepAliveHandler != null) {
            keepAliveHandler.onReadOrWrite();
        }
        return encoder().writeHeaders(ctx(), streamId, convertHeaders(headers), 0,
                                      endStream, ctx().newPromise());
    }

    final Endpoint<Http2LocalFlowController> local = conn.local();
    if (local.mayHaveCreatedStream(streamId)) {
        final ClosedStreamException closedStreamException =
                new ClosedStreamException("Cannot create a new stream. streamId: " + streamId +
                                          ", lastStreamCreated: " + local.lastStreamCreated());
        return newFailedFuture(UnprocessedRequestException.of(closedStreamException));
    }

    // Client starts a new stream.
    return encoder().writeHeaders(ctx(), streamId, convertHeaders(headers), 0, endStream,
                                  ctx().newPromise());
}
 
源代码11 项目: netty.book.kor   文件: EchoServerV4.java
public static void main(String[] args) throws Exception {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ChannelPipeline p = ch.pipeline();
                p.addLast(new EchoServerV4FirstHandler());
                p.addLast(new EchoServerV4SecondHandler());
            }
        });

        ChannelFuture f = b.bind(8888).sync();
        f.channel().closeFuture().sync();
    }
    finally {
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }
}
 
源代码12 项目: onos   文件: LispControllerBootstrap.java
/**
 * Stitches all channel handlers into server bootstrap.
 */
private void run() {

    try {
        final Bootstrap bootstrap = createServerBootstrap();

        configBootstrapOptions(bootstrap);

        lispPorts.forEach(p -> {
            InetSocketAddress sa = new InetSocketAddress(p);
            channelFutures.add(bootstrap.bind(sa));
            log.info("Listening for LISP router connections on {}", sa);
        });

        for (ChannelFuture f : channelFutures) {
            f.sync();
        }

    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
 
源代码13 项目: rocketmq-4.3.0   文件: ProducerManagerTest.java
@Test
public void scanNotActiveChannel() throws Exception {
    producerManager.registerProducer(group, clientInfo);
    assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();

    Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
    field.setAccessible(true);
    long CHANNEL_EXPIRED_TIMEOUT = field.getLong(producerManager);
    clientInfo.setLastUpdateTimestamp(System.currentTimeMillis() - CHANNEL_EXPIRED_TIMEOUT - 10);
    when(channel.close()).thenReturn(mock(ChannelFuture.class));
    producerManager.scanNotActiveChannel();
    assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
}
 
源代码14 项目: drift   文件: ConnectionFactory.java
private static void notifyConnect(ChannelFuture future, Promise<Channel> promise)
{
    if (future.isSuccess()) {
        Channel channel = future.channel();
        if (!promise.trySuccess(channel)) {
            // Promise was completed in the meantime (likely cancelled), just release the channel again
            channel.close();
        }
    }
    else {
        promise.tryFailure(future.cause());
    }
}
 
源代码15 项目: TakinRPC   文件: RemotingNettyClient.java
private void closeChannel(Channel channel) {
    final String addrRemote = RemotingHelper.parseChannelRemoteAddr(channel);
    channel.close().addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            logger.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote, future.isSuccess());
        }
    });
}
 
源代码16 项目: netty-4.1.22   文件: StreamBufferingEncoderTest.java
@Test
public void headersAfterCloseShouldImmediatelyFail() {
    encoder.writeSettingsAck(ctx, newPromise());
    encoder.close();

    ChannelFuture f = encoderWriteHeaders(3, newPromise());
    assertNotNull(f.cause());
}
 
源代码17 项目: qonduit   文件: HttpHandler.java
default void sendResponse(ChannelHandlerContext ctx, Object msg) {
    ChannelFuture f = ctx.writeAndFlush(msg);
    LOG.trace(Constants.LOG_RETURNING_RESPONSE, msg);
    if (!f.isSuccess()) {
        LOG.error(Constants.ERR_WRITING_RESPONSE, f.cause());
    }
}
 
源代码18 项目: grpc-nebula-java   文件: NettyClientStream.java
@Override
public void writeFrame(
    WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) {
  Preconditions.checkArgument(numMessages >= 0);
  ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
  final int numBytes = bytebuf.readableBytes();
  if (numBytes > 0) {
    // Add the bytes to outbound flow control.
    onSendingBytes(numBytes);
    writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush)
        .addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
            // If the future succeeds when http2stream is null, the stream has been cancelled
            // before it began and Netty is purging pending writes from the flow-controller.
            if (future.isSuccess() && transportState().http2Stream() != null) {
              // Remove the bytes from outbound flow control, optionally notifying
              // the client that they can send more bytes.
              transportState().onSentBytes(numBytes);
              NettyClientStream.this.getTransportTracer().reportMessageSent(numMessages);
            }
          }
        });
  } else {
    // The frame is empty and will not impact outbound flow control. Just send it.
    writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush);
  }
}
 
源代码19 项目: rocketmq   文件: NettyRemotingAbstract.java
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
    throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    request.markOnewayRPC();
    boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
        try {
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    once.release();
                    if (!f.isSuccess()) {
                        PLOG.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
                    }
                }
            });
        } catch (Exception e) {
            once.release();
            PLOG.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
            throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
        }
    } else {
        if (timeoutMillis <= 0) {
            throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
        } else {
            String info = String.format(
                "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
                timeoutMillis, //
                this.semaphoreOneway.getQueueLength(), //
                this.semaphoreOneway.availablePermits()//
            );
            PLOG.warn(info);
            throw new RemotingTimeoutException(info);
        }
    }
}
 
源代码20 项目: netty-learning   文件: SecureChatClient.java
public void run() throws Exception {
    EventLoopGroup group = new NioEventLoopGroup();
    try {
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class)
                .handler(new SecureChatClientInitializer(sslMode));
        // Start the connection attempt.
        Channel ch = b.connect(host, port).sync().channel();
        // Read commands from the stdin.
        ChannelFuture lastWriteFuture = null;
        BufferedReader in = new BufferedReader(new InputStreamReader(
                System.in));
        for (;;) {
            String line = in.readLine();
            if (line == null) {
                break;
            }

            // Sends the received line to the server.
            lastWriteFuture = ch.writeAndFlush(line + "\r\n");

            // If user typed the 'bye' command, wait until the server closes
            // the connection.
            if ("bye".equals(line.toLowerCase())) {
                ch.closeFuture().sync();
                break;
            }
        }

        // Wait until all messages are flushed before closing the channel.
        if (lastWriteFuture != null) {
            lastWriteFuture.sync();
        }
    } finally {
        // The connection is closed automatically on shutdown.
        group.shutdownGracefully();
    }
}
 
源代码21 项目: grpc-nebula-java   文件: ProtocolNegotiators.java
/**
 * Do not rely on channel handlers to propagate exceptions to us.
 * {@link NettyClientHandler} is an example of a class that does not propagate exceptions.
 * Add a listener to the connect future directly and do appropriate error handling.
 */
@Override
public void connect(final ChannelHandlerContext ctx, SocketAddress remoteAddress,
    SocketAddress localAddress, ChannelPromise promise) throws Exception {
  super.connect(ctx, remoteAddress, localAddress, promise);
  promise.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
      if (!future.isSuccess()) {
        fail(ctx, future.cause());
      }
    }
  });
}
 
@Test
public void uncaughtReadFails() throws Exception {
  WriteBufferingAndExceptionHandler handler =
      new WriteBufferingAndExceptionHandler(new ChannelHandlerAdapter() {});
  LocalAddress addr = new LocalAddress("local");
  ChannelFuture cf = new Bootstrap()
      .channel(LocalChannel.class)
      .handler(handler)
      .group(group)
      .register();
  chan = cf.channel();
  cf.sync();
  ChannelFuture sf = new ServerBootstrap()
      .channel(LocalServerChannel.class)
      .childHandler(new ChannelHandlerAdapter() {})
      .group(group)
      .bind(addr);
  server = sf.channel();
  sf.sync();

  ChannelFuture wf = chan.writeAndFlush(new Object());
  chan.connect(addr);
  chan.pipeline().fireChannelRead(Unpooled.copiedBuffer(new byte[] {'a'}));

  try {
    wf.sync();
    fail();
  } catch (Exception e) {
    Status status = Status.fromThrowable(e);
    assertThat(status.getCode()).isEqualTo(Code.INTERNAL);
    assertThat(status.getDescription()).contains("channelRead() missed");
  }
}
 
源代码23 项目: ambry   文件: NettyResponseChannel.java
@Override
public void operationComplete(ChannelFuture future) throws Exception {
  Throwable cause = future.cause() == null ? exception : future.cause();
  if (cause != null) {
    handleChannelWriteFailure(cause, false);
  } else {
    cleanupChunks(null);
  }
  logger.debug("Chunk cleanup complete on channel {}", ctx.channel());
}
 
源代码24 项目: netty-learning   文件: HttpXmlClient.java
public void connect(int port) throws Exception {
    // 配置客户端NIO线程组
    EventLoopGroup group = new NioEventLoopGroup();
    try {
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch)
                            throws Exception {
                        ch.pipeline().addLast("http-decoder",
                                new HttpResponseDecoder());
                        ch.pipeline().addLast("http-aggregator",
                                new HttpObjectAggregator(65536));
                        // XML解码器
                        ch.pipeline().addLast(
                                "xml-decoder",
                                new HttpXmlResponseDecoder(Order.class,
                                        true));
                        ch.pipeline().addLast("http-encoder",
                                new HttpRequestEncoder());
                        ch.pipeline().addLast("xml-encoder",
                                new HttpXmlRequestEncoder());
                        ch.pipeline().addLast("xmlClientHandler",
                                new HttpXmlClientHandler());
                    }
                });

        // 发起异步连接操作
        ChannelFuture f = b.connect(new InetSocketAddress(port)).sync();

        // 当代客户端链路关闭
        f.channel().closeFuture().sync();
    } finally {
        // 优雅退出,释放NIO线程组
        group.shutdownGracefully();
    }
}
 
源代码25 项目: nettice   文件: ActionDispatcher.java
private void writeResponse(boolean forceClose){
	boolean close = isClose();
	if(!close && !forceClose){
		response.headers().add(HttpHeaders.CONTENT_LENGTH, String.valueOf(response.content().readableBytes()));
	}
	ChannelFuture future = channel.write(response);
	if(close || forceClose){
		future.addListener(ChannelFutureListener.CLOSE);
	}
}
 
源代码26 项目: grpc-nebula-java   文件: NettyClientHandlerTest.java
@Test
public void receivedGoAwayShouldRefuseLaterStreamId() throws Exception {
  ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));
  channelRead(goAwayFrame(streamId - 1));
  verify(streamListener).closed(any(Status.class), eq(REFUSED), any(Metadata.class));
  assertTrue(future.isDone());
}
 
源代码27 项目: grpc-nebula-java   文件: NettyClientHandlerTest.java
@Test
public void receivedGoAwayShouldFailUnknownBufferedStreams() throws Exception {
  receiveMaxConcurrentStreams(0);

  ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));

  // Read a GOAWAY that indicates our stream was never processed by the server.
  channelRead(goAwayFrame(0, 8 /* Cancel */, Unpooled.copiedBuffer("this is a test", UTF_8)));
  assertTrue(future.isDone());
  assertFalse(future.isSuccess());
  Status status = Status.fromThrowable(future.cause());
  assertEquals(Status.CANCELLED.getCode(), status.getCode());
  assertEquals("HTTP/2 error code: CANCEL\nReceived Goaway\nthis is a test",
      status.getDescription());
}
 
源代码28 项目: netty4.0.27Learn   文件: NioDatagramChannel.java
@Override
public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
    try {
        return joinGroup(
                multicastAddress,
                NetworkInterface.getByInetAddress(localAddress().getAddress()),
                null, promise);
    } catch (SocketException e) {
        promise.setFailure(e);
    }
    return promise;
}
 
@Override
public CompletableFuture<Channel> getChannel(M address) {
    final CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
    ChannelFuture f = getBootstrap().connect(address.recipient());
    //Acquire from pool and listen for completion
    f.addListener((ChannelFuture future) -> {
        if (future.isSuccess()) {
            channelFuture.complete(future.channel());
        } else {
            channelFuture.completeExceptionally(future.cause());
        }
    });
    return channelFuture;
}
 
源代码30 项目: incubator-nemo   文件: ByteTransfer.java
/**
 * @param remoteExecutorId id of the remote executor
 * @return {@link ContextManager} for the channel to the specified executor
 */
private CompletableFuture<ContextManager> connectTo(final String remoteExecutorId) {
  final CompletableFuture<ContextManager> completableFuture = new CompletableFuture<>();
  final ChannelFuture channelFuture;
  try {
    channelFuture = executorIdToChannelFutureMap.compute(remoteExecutorId, (executorId, cachedChannelFuture) -> {
      if (cachedChannelFuture != null
        && (cachedChannelFuture.channel().isOpen() || cachedChannelFuture.channel().isActive())) {
        return cachedChannelFuture;
      } else {
        final ChannelFuture future = byteTransport.connectTo(executorId);
        future.channel().closeFuture().addListener(f -> executorIdToChannelFutureMap.remove(executorId, future));
        return future;
      }
    });
  } catch (final RuntimeException e) {
    completableFuture.completeExceptionally(e);
    return completableFuture;
  }
  channelFuture.addListener(future -> {
    if (future.isSuccess()) {
      completableFuture.complete(channelFuture.channel().pipeline().get(ContextManager.class));
    } else {
      executorIdToChannelFutureMap.remove(remoteExecutorId, channelFuture);
      completableFuture.completeExceptionally(future.cause());
    }
  });
  return completableFuture;
}