类io.netty.channel.ConnectTimeoutException源码实例Demo

下面列出了怎么用io.netty.channel.ConnectTimeoutException的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: netty-4.1.22   文件: OioSocketChannel.java
@Override
protected void doConnect(SocketAddress remoteAddress,
        SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        SocketUtils.bind(socket, localAddress);
    }

    boolean success = false;
    try {
        SocketUtils.connect(socket, remoteAddress, config().getConnectTimeoutMillis());
        activate(socket.getInputStream(), socket.getOutputStream());
        success = true;
    } catch (SocketTimeoutException e) {
        ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
        cause.setStackTrace(e.getStackTrace());
        throw cause;
    } finally {
        if (!success) {
            doClose();
        }
    }
}
 
@ExceptionHandler(value = {Throwable.class})
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Result handle(Throwable throwable) {
    Result result = Result.fail();
    if (throwable instanceof ResponseStatusException) {
        result = handle((ResponseStatusException) throwable);
    } else if (throwable instanceof ConnectTimeoutException) {
        result = handle((ConnectTimeoutException) throwable);
    } else if (throwable instanceof NotFoundException) {
        result = handle((NotFoundException) throwable);
    } else if (throwable instanceof RuntimeException) {
        result = handle((RuntimeException) throwable);
    } else if (throwable instanceof Exception) {
        result = handle((Exception) throwable);
    }
    return result;
}
 
@ExceptionHandler(value = {Throwable.class})
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Result handle(Throwable throwable) {
    Result result = Result.fail();
    if (throwable instanceof ResponseStatusException) {
        result = handle((ResponseStatusException) throwable);
    } else if (throwable instanceof ConnectTimeoutException) {
        result = handle((ConnectTimeoutException) throwable);
    } else if (throwable instanceof NotFoundException) {
        result = handle((NotFoundException) throwable);
    } else if (throwable instanceof RuntimeException) {
        result = handle((RuntimeException) throwable);
    } else if (throwable instanceof Exception) {
        result = handle((Exception) throwable);
    }
    return result;
}
 
源代码4 项目: pulsar   文件: ConnectionTimeoutTest.java
@Test
public void testLowTimeout() throws Exception {
    long startNanos = System.nanoTime();

    try (PulsarClient clientLow = PulsarClient.builder().serviceUrl(blackholeBroker)
            .connectionTimeout(1, TimeUnit.MILLISECONDS)
            .operationTimeout(1000, TimeUnit.MILLISECONDS).build();
         PulsarClient clientDefault = PulsarClient.builder().serviceUrl(blackholeBroker)
             .operationTimeout(1000, TimeUnit.MILLISECONDS).build()) {
        CompletableFuture<?> lowFuture = clientLow.newProducer().topic("foo").createAsync();
        CompletableFuture<?> defaultFuture = clientDefault.newProducer().topic("foo").createAsync();

        try {
            lowFuture.get();
            Assert.fail("Shouldn't be able to connect to anything");
        } catch (Exception e) {
            Assert.assertFalse(defaultFuture.isDone());
            Assert.assertEquals(e.getCause().getCause().getCause().getClass(), ConnectTimeoutException.class);
        }
    }
}
 
源代码5 项目: netty4.0.27Learn   文件: OioSocketChannel.java
@Override
protected void doConnect(SocketAddress remoteAddress,
        SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        socket.bind(localAddress);
    }

    boolean success = false;
    try {
        socket.connect(remoteAddress, config().getConnectTimeoutMillis());
        activate(socket.getInputStream(), socket.getOutputStream());
        success = true;
    } catch (SocketTimeoutException e) {
        ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
        cause.setStackTrace(e.getStackTrace());
        throw cause;
    } finally {
        if (!success) {
            doClose();
        }
    }
}
 
源代码6 项目: couchbase-jvm-core   文件: AbstractEndpointTest.java
@Test
public void shouldForceTimeoutOfSocketConnectDoesNotReturn() {
    BootstrapAdapter bootstrap = mock(BootstrapAdapter.class);
    when(bootstrap.connect()).thenReturn(channel.newPromise()); // this promise never completes
    Endpoint endpoint = new DummyEndpoint(bootstrap, ctx);

    Observable<LifecycleState> observable = endpoint.connect();

    TestSubscriber<LifecycleState> testSubscriber = new TestSubscriber<LifecycleState>();
    observable.subscribe(testSubscriber);
    testSubscriber.awaitTerminalEvent();

    List<Throwable> errors = testSubscriber.getOnErrorEvents();
    assertEquals(1, errors.size());
    assertEquals(ConnectTimeoutException.class, errors.get(0).getClass());

    endpoint.disconnect().subscribe();
}
 
源代码7 项目: netty-4.1.22   文件: EpollSocketTcpMd5Test.java
@Test(expected = ConnectTimeoutException.class)
public void testKeyMismatch() throws Exception {
    server.config().setOption(EpollChannelOption.TCP_MD5SIG,
            Collections.<InetAddress, byte[]>singletonMap(NetUtil.LOCALHOST4, SERVER_KEY));

    EpollSocketChannel client = (EpollSocketChannel) new Bootstrap().group(GROUP)
            .channel(EpollSocketChannel.class)
            .handler(new ChannelInboundHandlerAdapter())
            .option(EpollChannelOption.TCP_MD5SIG,
                    Collections.<InetAddress, byte[]>singletonMap(NetUtil.LOCALHOST4, BAD_KEY))
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
            .connect(server.localAddress()).syncUninterruptibly().channel();
    client.close().syncUninterruptibly();
}
 
源代码8 项目: servicetalk   文件: TcpConnector.java
/**
 * Connects to the passed {@code resolvedRemoteAddress} address, resolving the address, if required.
 *
 * @param localAddress The local address to bind to, or {@code null}.
 * @param resolvedRemoteAddress The address to connect to. This address should already be resolved at this point.
 * @param config The {@link ReadOnlyTcpClientConfig} to use while connecting.
 * @param autoRead if {@code true} auto read will be enabled for new {@link Channel}s.
 * @param executionContext The {@link ExecutionContext} to use for the returned {@link NettyConnection}.
 * @param connectionFactory {@link Function} to create a {@link NettyConnection} asynchronously.
 * @param <C> Type of the created connection.
 * @return A {@link Single} that completes with a new {@link Channel} when connected.
 */
public static <C extends ListenableAsyncCloseable> Single<C> connect(
        final @Nullable SocketAddress localAddress, final Object resolvedRemoteAddress,
        final ReadOnlyTcpClientConfig config, final boolean autoRead, final ExecutionContext executionContext,
        final Function<Channel, Single<? extends C>> connectionFactory) {
    requireNonNull(resolvedRemoteAddress);
    requireNonNull(config);
    requireNonNull(executionContext);
    return new SubscribableSingle<C>() {
        @Override
        protected void handleSubscribe(final Subscriber<? super C> subscriber) {
            ConnectHandler<C> connectHandler = new ConnectHandler<>(subscriber, connectionFactory);
            try {
                Future<?> connectFuture = connect0(localAddress, resolvedRemoteAddress, config, autoRead,
                        executionContext, connectHandler);
                connectHandler.connectFuture(connectFuture);
                connectFuture.addListener(f -> {
                    Throwable cause = f.cause();
                    if (cause != null) {
                        if (cause instanceof ConnectTimeoutException) {
                            String msg = resolvedRemoteAddress instanceof FileDescriptorSocketAddress ?
                                    "Failed to register: " + resolvedRemoteAddress :
                                    "Failed to connect: " + resolvedRemoteAddress + " (localAddress: " +
                                    localAddress + ")";
                            cause = new io.servicetalk.client.api.ConnectTimeoutException(msg, cause);
                        } else if (cause instanceof ConnectException) {
                            cause = new RetryableConnectException((ConnectException) cause);
                        }
                        connectHandler.connectFailed(cause);
                    }
                });
            } catch (Throwable t) {
                connectHandler.unexpectedFailure(t);
            }
        }
    };
}
 
源代码9 项目: prebid-server-java   文件: HttpBidderRequester.java
/**
 * Produces {@link Future} with {@link HttpCall} containing request and error description.
 */
private static <T> Future<HttpCall<T>> failResponse(Throwable exception, HttpRequest<T> httpRequest) {
    logger.warn("Error occurred while sending HTTP request to a bidder url: {0} with message: {1}",
            httpRequest.getUri(), exception.getMessage());
    logger.debug("Error occurred while sending HTTP request to a bidder url: {0}", exception, httpRequest.getUri());

    final BidderError.Type errorType =
            exception instanceof TimeoutException || exception instanceof ConnectTimeoutException
                    ? BidderError.Type.timeout
                    : BidderError.Type.generic;

    return Future.succeededFuture(
            HttpCall.failure(httpRequest, BidderError.create(exception.getMessage(), errorType)));
}
 
源代码10 项目: prebid-server-java   文件: HttpAdapterConnector.java
/**
 * Handles request (e.g. read timeout) and response (e.g. connection reset) errors producing
 * {@link ExchangeCall} containing {@link BidderDebug} and error description.
 */
private static <T, R> Future<ExchangeCall<T, R>> failResponse(Throwable exception,
                                                              BidderDebug.BidderDebugBuilder bidderDebugBuilder) {
    logger.warn("Error occurred while sending bid request to an exchange: {0}", exception.getMessage());
    logger.debug("Error occurred while sending bid request to an exchange", exception);

    final BidderError error = exception instanceof TimeoutException || exception instanceof ConnectTimeoutException
            ? BidderError.timeout("Timed out")
            : BidderError.generic(exception.getMessage());

    return Future.succeededFuture(ExchangeCall.error(bidderDebugBuilder.build(), error));
}
 
@Test
public void callShouldSubmitTimeOutErrorToAdapterIfConnectTimeoutOccurs() {
    // given
    givenHttpClientProducesException(new ConnectTimeoutException());

    // when
    final Future<AdapterResponse> adapterResponseFuture =
            httpAdapterConnector.call(adapter, usersyncer, adapterRequest, preBidRequestContext);

    // then
    final AdapterResponse adapterResponse = adapterResponseFuture.result();
    assertThat(adapterResponse.getError()).isEqualTo(BidderError.timeout("Timed out"));
    assertThat(adapterResponse.getBidderStatus().getError()).isEqualTo("Timed out");
}
 
@DataProvider(name = "exceptionToIsRetriable")
public Object[][] exceptionToIsRetriable() {
    return new Object[][]{
            // exception, is retriable
            {
                    new RuntimeException(), false
            },
            {
                    new ConnectException(), true
            },
            {
                    new ConnectTimeoutException(), true
            },
            {
                    new UnknownHostException(), true
            },
            {
                    ReadTimeoutException.INSTANCE, false
            },
            {
                    new SSLHandshakeException("dummy"), true
            },
            {
                    new NoRouteToHostException(), true,
            },
            {
                    new SSLPeerUnverifiedException("dummy"), true
            },
            {
                    new SocketTimeoutException(), false
            },
            {
                    new PoolExhaustedException(), true
            }
    };
}
 
@DataProvider(name = "networkFailure")
public Object[][] networkFailure() {
    return new Object[][]{
            // exception, is retriable
            {
                    new RuntimeException(), false
            },
            {
                    new ConnectException(), true
            },
            {
                    new ConnectTimeoutException(), true
            },
            {
                    new UnknownHostException(), true
            },
            {
                    ReadTimeoutException.INSTANCE, true
            },
            {
                    new SSLHandshakeException("dummy"), true
            },
            {
                    new NoRouteToHostException(), true,
            },
            {
                    new SSLPeerUnverifiedException("dummy"), true
            },
            {
                    new SocketTimeoutException(), true
            },
            {
                    new ChannelException(), true
            }
    };
}
 
源代码14 项目: xrpc   文件: RetryLoop.java
public static boolean isRetryException(Throwable exception) {
  if (exception instanceof ConnectException
      || exception instanceof ConnectTimeoutException
      || exception instanceof UnknownHostException) {
    return true;
  }
  return false;
}
 
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  if (evt instanceof HandshakeDeadlineEvent) {
    originalPromise().tryFailure(new ConnectTimeoutException("Handshake did not complete before deadline. " + customPortAdvice));
    ctx.close();
    return;
  }

  ctx.fireUserEventTriggered(evt);
}
 
@Test
public void testNegativeConnectCall() throws Exception {
    asyncCall(new ClientBuilder() {
        @Override
        public PbrpcClient getClient() {
            return PbrpcClientFactory.buildShortLiveConnection("9.9.9.9", 9999);
        }
    }, new ConnectTimeoutException(), false);
}
 
源代码17 项目: xio   文件: RetryLoop.java
public static boolean isRetryException(Throwable exception) {
  if (exception instanceof ConnectException
      || exception instanceof ConnectTimeoutException
      || exception instanceof UnknownHostException) {
    return true;
  }
  return false;
}
 
源代码18 项目: PacketLib   文件: TcpSession.java
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    String message = null;
    if(cause instanceof ConnectTimeoutException || (cause instanceof ConnectException && cause.getMessage().contains("connection timed out"))) {
        message = "Connection timed out.";
    } else if(cause instanceof ReadTimeoutException) {
        message = "Read timed out.";
    } else if(cause instanceof WriteTimeoutException) {
        message = "Write timed out.";
    } else {
        message = cause.toString();
    }

    this.disconnect(message, cause);
}
 
@ExceptionHandler(value = {ConnectTimeoutException.class})
public Result handle(ConnectTimeoutException ex) {
    log.error("connect timeout exception:{}", ex.getMessage());
    return Result.fail(ErrorType.GATEWAY_CONNECT_TIME_OUT);
}
 
@DataProvider(name = "fromMockedNetworkFailureToExpectedDocumentClientException")
public Object[][] fromMockedNetworkFailureToExpectedDocumentClientException() {
    return new Object[][]{
            // create request, retriable network exception
            {
                    createRequestFromName(
                        OperationType.Create, "dbs/db/colls/col", ResourceType.Document),

                    HttpClientMockWrapper.
                            httpClientBehaviourBuilder()
                            .withNetworkFailure(new UnknownHostException()),

                    FailureValidator.builder()
                            .instanceOf(GoneException.class)
            },

            // create request, retriable network exception
            {
                    createRequestFromName(
                            OperationType.Create, "dbs/db/colls/col", ResourceType.Document),

                    HttpClientMockWrapper.
                            httpClientBehaviourBuilder()
                            .withNetworkFailure(new UnknownHostException()),

                    FailureValidator.builder()
                            .instanceOf(GoneException.class)
            },

            // create request, retriable network exception
            {
                    createRequestFromName(
                            OperationType.Create, "dbs/db/colls/col", ResourceType.Document),

                    HttpClientMockWrapper.
                            httpClientBehaviourBuilder()
                            .withNetworkFailure(new ConnectTimeoutException()),

                    FailureValidator.builder()
                            .instanceOf(GoneException.class)
            },

            // read request, retriable network exception
            {
                    createRequestFromName(
                            OperationType.Read, "dbs/db/colls/col", ResourceType.Document),

                    HttpClientMockWrapper.
                            httpClientBehaviourBuilder()
                            .withNetworkFailure(new ConnectTimeoutException()),

                    FailureValidator.builder()
                            .instanceOf(GoneException.class)
            },

            // create request, non-retriable network exception
            {
                    createRequestFromName(
                            OperationType.Create, "dbs/db/colls/col", ResourceType.Document),

                    HttpClientMockWrapper.
                            httpClientBehaviourBuilder()
                            .withNetworkFailure(new RuntimeException()),

                    FailureValidator.builder()
                            .instanceOf(ServiceUnavailableException.class)
            },

            // read request, non-retriable network exception
            {
                    createRequestFromName(
                            OperationType.Read, "dbs/db/colls/col", ResourceType.Document),

                    HttpClientMockWrapper.
                            httpClientBehaviourBuilder()
                            .withNetworkFailure(new RuntimeException()),

                    FailureValidator.builder()
                            .instanceOf(GoneException.class)
            },
    };
}
 
@ExceptionHandler(value = {ConnectTimeoutException.class})
public Result handle(ConnectTimeoutException ex) {
    log.error("connect timeout exception:{}", ex.getMessage());
    return Result.fail(SystemErrorType.GATEWAY_CONNECT_TIME_OUT);
}
 
/**
 * Reconcile all the connectors selected by the given connect instance, updated each connectors status with the result.
 * @param reconciliation The reconciliation
 * @param connect The connector
 * @param connectStatus Status of the KafkaConnect or KafkaConnectS2I resource (will be used to set the available
 *                      connector plugins)
 * @param scaledToZero  Indicated whether the related Connect cluster is currently scaled to 0 replicas
 * @return A future, failed if any of the connectors' statuses could not be updated.
 */
protected Future<Void> reconcileConnectors(Reconciliation reconciliation, T connect, S connectStatus, boolean scaledToZero) {
    String connectName = connect.getMetadata().getName();
    String namespace = connect.getMetadata().getNamespace();
    String host = KafkaConnectResources.qualifiedServiceName(connectName, namespace);

    if (!isUseResources(connect))    {
        return Future.succeededFuture();
    }

    if (scaledToZero)   {
        return connectorOperator.listAsync(namespace, Optional.of(new LabelSelectorBuilder().addToMatchLabels(Labels.STRIMZI_CLUSTER_LABEL, connectName).build()))
                .compose(connectors -> CompositeFuture.join(
                        connectors.stream().map(connector -> maybeUpdateConnectorStatus(reconciliation, connector, null, zeroReplicas(namespace, connectName)))
                                .collect(Collectors.toList())
                ))
                .map((Void) null);
    }

    KafkaConnectApi apiClient = connectClientProvider.apply(vertx);

    return CompositeFuture.join(
            apiClient.list(host, port),
            connectorOperator.listAsync(namespace, Optional.of(new LabelSelectorBuilder().addToMatchLabels(Labels.STRIMZI_CLUSTER_LABEL, connectName).build())),
            apiClient.listConnectorPlugins(host, port)
    ).compose(cf -> {
        List<String> runningConnectorNames = cf.resultAt(0);
        List<KafkaConnector> desiredConnectors = cf.resultAt(1);
        List<ConnectorPlugin> connectorPlugins = cf.resultAt(2);

        log.debug("{}: Setting list of connector plugins in Kafka Connect status", reconciliation);
        connectStatus.setConnectorPlugins(connectorPlugins);

        if (connectorsResourceCounter != null)  {
            connectorsResourceCounter.set(desiredConnectors.size());
        }

        Set<String> deleteConnectorNames = new HashSet<>(runningConnectorNames);
        deleteConnectorNames.removeAll(desiredConnectors.stream().map(c -> c.getMetadata().getName()).collect(Collectors.toSet()));
        log.debug("{}: {} cluster: delete connectors: {}", reconciliation, kind(), deleteConnectorNames);
        Stream<Future<Void>> deletionFutures = deleteConnectorNames.stream().map(connectorName ->
                reconcileConnectorAndHandleResult(reconciliation, host, apiClient, true, connectorName, null)
        );

        log.debug("{}: {} cluster: required connectors: {}", reconciliation, kind(), desiredConnectors);
        Stream<Future<Void>> createUpdateFutures = desiredConnectors.stream()
                .map(connector -> reconcileConnectorAndHandleResult(reconciliation, host, apiClient, true, connector.getMetadata().getName(), connector));

        return CompositeFuture.join(Stream.concat(deletionFutures, createUpdateFutures).collect(Collectors.toList())).map((Void) null);
    }).recover(error -> {
        if (error instanceof ConnectTimeoutException) {
            Promise<Void> connectorStatuses = Promise.promise();
            log.warn("{}: Failed to connect to the REST API => trying to update the connector status", reconciliation);

            connectorOperator.listAsync(namespace, Optional.of(new LabelSelectorBuilder().addToMatchLabels(Labels.STRIMZI_CLUSTER_LABEL, connectName).build()))
                    .compose(connectors -> CompositeFuture.join(
                            connectors.stream().map(connector -> maybeUpdateConnectorStatus(reconciliation, connector, null, error))
                                    .collect(Collectors.toList())
                    ))
                    .onComplete(ignore -> connectorStatuses.fail(error));

            return connectorStatuses.future();
        } else {
            return Future.failedFuture(error);
        }
    });
}
 
源代码23 项目: strimzi-kafka-operator   文件: ConnectorMockTest.java
/** Create connect, create connector, Scale to 0 */
@Test
public void testConnectScaleToZero() {
    String connectName = "cluster";
    String connectorName = "connector";

    // Create KafkaConnect cluster and wait till it's ready
    Crds.kafkaConnectOperation(client).inNamespace(NAMESPACE).createNew()
            .withNewMetadata()
                .withNamespace(NAMESPACE)
                .withName(connectName)
                .addToAnnotations(Annotations.STRIMZI_IO_USE_CONNECTOR_RESOURCES, "true")
            .endMetadata()
            .withNewSpec()
                .withReplicas(1)
            .endSpec()
            .done();
    waitForConnectReady(connectName);

    // triggered twice (creation followed by status update)
    verify(api, times(2)).list(
            eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT));

    verify(api, never()).createOrUpdatePutRequest(
            eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT),
            eq(connectorName), any());

    // Create KafkaConnector and wait till it's ready
    Crds.kafkaConnectorOperation(client).inNamespace(NAMESPACE).createNew()
            .withNewMetadata()
                .withName(connectorName)
                .withNamespace(NAMESPACE)
                .addToLabels(Labels.STRIMZI_CLUSTER_LABEL, connectName)
            .endMetadata()
            .withNewSpec()
                .withTasksMax(1)
                .withClassName("Dummy")
            .endSpec()
            .done();
    waitForConnectorReady(connectorName);

    verify(api, times(2)).list(
            eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT));
    verify(api, times(2)).createOrUpdatePutRequest(
            eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT),
            eq(connectorName), any());
    assertThat(runningConnectors.keySet(), is(Collections.singleton(key("cluster-connect-api.ns.svc", connectorName))));

    when(api.list(any(), anyInt())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
    when(api.listConnectorPlugins(any(), anyInt())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
    when(api.createOrUpdatePutRequest(any(), anyInt(), anyString(), any())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
    when(api.getConnectorConfig(any(), any(), anyInt(), any())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
    when(api.getConnector(any(), anyInt(), any())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));

    Crds.kafkaConnectOperation(client).inNamespace(NAMESPACE).withName(connectName).edit()
            .editSpec()
                .withReplicas(0)
            .endSpec()
            .done();

    waitForConnectReady(connectName);
    waitForConnectorNotReady(connectorName, "RuntimeException", "Kafka Connect cluster 'cluster' in namespace ns has 0 replicas.");
}
 
源代码24 项目: strimzi-kafka-operator   文件: ConnectorMockTest.java
/** Create connect, create connector, break the REST API */
@Test
public void testConnectRestAPIIssues() {
    String connectName = "cluster";
    String connectorName = "connector";

    // Create KafkaConnect cluster and wait till it's ready
    Crds.kafkaConnectOperation(client).inNamespace(NAMESPACE).createNew()
            .withNewMetadata()
                .withNamespace(NAMESPACE)
                .withName(connectName)
                .addToAnnotations(Annotations.STRIMZI_IO_USE_CONNECTOR_RESOURCES, "true")
            .endMetadata()
            .withNewSpec()
                .withReplicas(1)
            .endSpec()
            .done();
    waitForConnectReady(connectName);

    // triggered twice (creation followed by status update)
    verify(api, times(2)).list(
            eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT));

    verify(api, never()).createOrUpdatePutRequest(
            eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT),
            eq(connectorName), any());

    // Create KafkaConnector and wait till it's ready
    Crds.kafkaConnectorOperation(client).inNamespace(NAMESPACE).createNew()
            .withNewMetadata()
                .withName(connectorName)
                .withNamespace(NAMESPACE)
                .addToLabels(Labels.STRIMZI_CLUSTER_LABEL, connectName)
            .endMetadata()
            .withNewSpec()
                .withTasksMax(1)
                .withClassName("Dummy")
            .endSpec()
            .done();
    waitForConnectorReady(connectorName);

    verify(api, times(2)).list(
            eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT));
    verify(api, times(2)).createOrUpdatePutRequest(
            eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT),
            eq(connectorName), any());
    assertThat(runningConnectors.keySet(), is(Collections.singleton(key("cluster-connect-api.ns.svc", connectorName))));

    when(api.list(any(), anyInt())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
    when(api.listConnectorPlugins(any(), anyInt())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
    when(api.createOrUpdatePutRequest(any(), anyInt(), anyString(), any())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
    when(api.getConnectorConfig(any(), any(), anyInt(), any())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
    when(api.getConnector(any(), anyInt(), any())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));

    Crds.kafkaConnectOperation(client).inNamespace(NAMESPACE).withName(connectName).edit()
            .editSpec()
                .withNewTemplate()
                .endTemplate()
            .endSpec()
            .done();

    // Wait for Status change due to the broker REST API
    waitForConnectNotReady(connectName, "ConnectTimeoutException", "connection timed out");
    waitForConnectorNotReady(connectorName, "ConnectTimeoutException", "connection timed out");
}
 
 类所在包
 类方法
 同包方法