下面列出了怎么用io.netty.channel.ConnectTimeoutException的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress) throws Exception {
if (localAddress != null) {
SocketUtils.bind(socket, localAddress);
}
boolean success = false;
try {
SocketUtils.connect(socket, remoteAddress, config().getConnectTimeoutMillis());
activate(socket.getInputStream(), socket.getOutputStream());
success = true;
} catch (SocketTimeoutException e) {
ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
cause.setStackTrace(e.getStackTrace());
throw cause;
} finally {
if (!success) {
doClose();
}
}
}
@ExceptionHandler(value = {Throwable.class})
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Result handle(Throwable throwable) {
Result result = Result.fail();
if (throwable instanceof ResponseStatusException) {
result = handle((ResponseStatusException) throwable);
} else if (throwable instanceof ConnectTimeoutException) {
result = handle((ConnectTimeoutException) throwable);
} else if (throwable instanceof NotFoundException) {
result = handle((NotFoundException) throwable);
} else if (throwable instanceof RuntimeException) {
result = handle((RuntimeException) throwable);
} else if (throwable instanceof Exception) {
result = handle((Exception) throwable);
}
return result;
}
@ExceptionHandler(value = {Throwable.class})
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Result handle(Throwable throwable) {
Result result = Result.fail();
if (throwable instanceof ResponseStatusException) {
result = handle((ResponseStatusException) throwable);
} else if (throwable instanceof ConnectTimeoutException) {
result = handle((ConnectTimeoutException) throwable);
} else if (throwable instanceof NotFoundException) {
result = handle((NotFoundException) throwable);
} else if (throwable instanceof RuntimeException) {
result = handle((RuntimeException) throwable);
} else if (throwable instanceof Exception) {
result = handle((Exception) throwable);
}
return result;
}
@Test
public void testLowTimeout() throws Exception {
long startNanos = System.nanoTime();
try (PulsarClient clientLow = PulsarClient.builder().serviceUrl(blackholeBroker)
.connectionTimeout(1, TimeUnit.MILLISECONDS)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build();
PulsarClient clientDefault = PulsarClient.builder().serviceUrl(blackholeBroker)
.operationTimeout(1000, TimeUnit.MILLISECONDS).build()) {
CompletableFuture<?> lowFuture = clientLow.newProducer().topic("foo").createAsync();
CompletableFuture<?> defaultFuture = clientDefault.newProducer().topic("foo").createAsync();
try {
lowFuture.get();
Assert.fail("Shouldn't be able to connect to anything");
} catch (Exception e) {
Assert.assertFalse(defaultFuture.isDone());
Assert.assertEquals(e.getCause().getCause().getCause().getClass(), ConnectTimeoutException.class);
}
}
}
@Override
protected void doConnect(SocketAddress remoteAddress,
SocketAddress localAddress) throws Exception {
if (localAddress != null) {
socket.bind(localAddress);
}
boolean success = false;
try {
socket.connect(remoteAddress, config().getConnectTimeoutMillis());
activate(socket.getInputStream(), socket.getOutputStream());
success = true;
} catch (SocketTimeoutException e) {
ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
cause.setStackTrace(e.getStackTrace());
throw cause;
} finally {
if (!success) {
doClose();
}
}
}
@Test
public void shouldForceTimeoutOfSocketConnectDoesNotReturn() {
BootstrapAdapter bootstrap = mock(BootstrapAdapter.class);
when(bootstrap.connect()).thenReturn(channel.newPromise()); // this promise never completes
Endpoint endpoint = new DummyEndpoint(bootstrap, ctx);
Observable<LifecycleState> observable = endpoint.connect();
TestSubscriber<LifecycleState> testSubscriber = new TestSubscriber<LifecycleState>();
observable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
List<Throwable> errors = testSubscriber.getOnErrorEvents();
assertEquals(1, errors.size());
assertEquals(ConnectTimeoutException.class, errors.get(0).getClass());
endpoint.disconnect().subscribe();
}
@Test(expected = ConnectTimeoutException.class)
public void testKeyMismatch() throws Exception {
server.config().setOption(EpollChannelOption.TCP_MD5SIG,
Collections.<InetAddress, byte[]>singletonMap(NetUtil.LOCALHOST4, SERVER_KEY));
EpollSocketChannel client = (EpollSocketChannel) new Bootstrap().group(GROUP)
.channel(EpollSocketChannel.class)
.handler(new ChannelInboundHandlerAdapter())
.option(EpollChannelOption.TCP_MD5SIG,
Collections.<InetAddress, byte[]>singletonMap(NetUtil.LOCALHOST4, BAD_KEY))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
.connect(server.localAddress()).syncUninterruptibly().channel();
client.close().syncUninterruptibly();
}
/**
* Connects to the passed {@code resolvedRemoteAddress} address, resolving the address, if required.
*
* @param localAddress The local address to bind to, or {@code null}.
* @param resolvedRemoteAddress The address to connect to. This address should already be resolved at this point.
* @param config The {@link ReadOnlyTcpClientConfig} to use while connecting.
* @param autoRead if {@code true} auto read will be enabled for new {@link Channel}s.
* @param executionContext The {@link ExecutionContext} to use for the returned {@link NettyConnection}.
* @param connectionFactory {@link Function} to create a {@link NettyConnection} asynchronously.
* @param <C> Type of the created connection.
* @return A {@link Single} that completes with a new {@link Channel} when connected.
*/
public static <C extends ListenableAsyncCloseable> Single<C> connect(
final @Nullable SocketAddress localAddress, final Object resolvedRemoteAddress,
final ReadOnlyTcpClientConfig config, final boolean autoRead, final ExecutionContext executionContext,
final Function<Channel, Single<? extends C>> connectionFactory) {
requireNonNull(resolvedRemoteAddress);
requireNonNull(config);
requireNonNull(executionContext);
return new SubscribableSingle<C>() {
@Override
protected void handleSubscribe(final Subscriber<? super C> subscriber) {
ConnectHandler<C> connectHandler = new ConnectHandler<>(subscriber, connectionFactory);
try {
Future<?> connectFuture = connect0(localAddress, resolvedRemoteAddress, config, autoRead,
executionContext, connectHandler);
connectHandler.connectFuture(connectFuture);
connectFuture.addListener(f -> {
Throwable cause = f.cause();
if (cause != null) {
if (cause instanceof ConnectTimeoutException) {
String msg = resolvedRemoteAddress instanceof FileDescriptorSocketAddress ?
"Failed to register: " + resolvedRemoteAddress :
"Failed to connect: " + resolvedRemoteAddress + " (localAddress: " +
localAddress + ")";
cause = new io.servicetalk.client.api.ConnectTimeoutException(msg, cause);
} else if (cause instanceof ConnectException) {
cause = new RetryableConnectException((ConnectException) cause);
}
connectHandler.connectFailed(cause);
}
});
} catch (Throwable t) {
connectHandler.unexpectedFailure(t);
}
}
};
}
/**
* Produces {@link Future} with {@link HttpCall} containing request and error description.
*/
private static <T> Future<HttpCall<T>> failResponse(Throwable exception, HttpRequest<T> httpRequest) {
logger.warn("Error occurred while sending HTTP request to a bidder url: {0} with message: {1}",
httpRequest.getUri(), exception.getMessage());
logger.debug("Error occurred while sending HTTP request to a bidder url: {0}", exception, httpRequest.getUri());
final BidderError.Type errorType =
exception instanceof TimeoutException || exception instanceof ConnectTimeoutException
? BidderError.Type.timeout
: BidderError.Type.generic;
return Future.succeededFuture(
HttpCall.failure(httpRequest, BidderError.create(exception.getMessage(), errorType)));
}
/**
* Handles request (e.g. read timeout) and response (e.g. connection reset) errors producing
* {@link ExchangeCall} containing {@link BidderDebug} and error description.
*/
private static <T, R> Future<ExchangeCall<T, R>> failResponse(Throwable exception,
BidderDebug.BidderDebugBuilder bidderDebugBuilder) {
logger.warn("Error occurred while sending bid request to an exchange: {0}", exception.getMessage());
logger.debug("Error occurred while sending bid request to an exchange", exception);
final BidderError error = exception instanceof TimeoutException || exception instanceof ConnectTimeoutException
? BidderError.timeout("Timed out")
: BidderError.generic(exception.getMessage());
return Future.succeededFuture(ExchangeCall.error(bidderDebugBuilder.build(), error));
}
@Test
public void callShouldSubmitTimeOutErrorToAdapterIfConnectTimeoutOccurs() {
// given
givenHttpClientProducesException(new ConnectTimeoutException());
// when
final Future<AdapterResponse> adapterResponseFuture =
httpAdapterConnector.call(adapter, usersyncer, adapterRequest, preBidRequestContext);
// then
final AdapterResponse adapterResponse = adapterResponseFuture.result();
assertThat(adapterResponse.getError()).isEqualTo(BidderError.timeout("Timed out"));
assertThat(adapterResponse.getBidderStatus().getError()).isEqualTo("Timed out");
}
@DataProvider(name = "exceptionToIsRetriable")
public Object[][] exceptionToIsRetriable() {
return new Object[][]{
// exception, is retriable
{
new RuntimeException(), false
},
{
new ConnectException(), true
},
{
new ConnectTimeoutException(), true
},
{
new UnknownHostException(), true
},
{
ReadTimeoutException.INSTANCE, false
},
{
new SSLHandshakeException("dummy"), true
},
{
new NoRouteToHostException(), true,
},
{
new SSLPeerUnverifiedException("dummy"), true
},
{
new SocketTimeoutException(), false
},
{
new PoolExhaustedException(), true
}
};
}
@DataProvider(name = "networkFailure")
public Object[][] networkFailure() {
return new Object[][]{
// exception, is retriable
{
new RuntimeException(), false
},
{
new ConnectException(), true
},
{
new ConnectTimeoutException(), true
},
{
new UnknownHostException(), true
},
{
ReadTimeoutException.INSTANCE, true
},
{
new SSLHandshakeException("dummy"), true
},
{
new NoRouteToHostException(), true,
},
{
new SSLPeerUnverifiedException("dummy"), true
},
{
new SocketTimeoutException(), true
},
{
new ChannelException(), true
}
};
}
public static boolean isRetryException(Throwable exception) {
if (exception instanceof ConnectException
|| exception instanceof ConnectTimeoutException
|| exception instanceof UnknownHostException) {
return true;
}
return false;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof HandshakeDeadlineEvent) {
originalPromise().tryFailure(new ConnectTimeoutException("Handshake did not complete before deadline. " + customPortAdvice));
ctx.close();
return;
}
ctx.fireUserEventTriggered(evt);
}
@Test
public void testNegativeConnectCall() throws Exception {
asyncCall(new ClientBuilder() {
@Override
public PbrpcClient getClient() {
return PbrpcClientFactory.buildShortLiveConnection("9.9.9.9", 9999);
}
}, new ConnectTimeoutException(), false);
}
public static boolean isRetryException(Throwable exception) {
if (exception instanceof ConnectException
|| exception instanceof ConnectTimeoutException
|| exception instanceof UnknownHostException) {
return true;
}
return false;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
String message = null;
if(cause instanceof ConnectTimeoutException || (cause instanceof ConnectException && cause.getMessage().contains("connection timed out"))) {
message = "Connection timed out.";
} else if(cause instanceof ReadTimeoutException) {
message = "Read timed out.";
} else if(cause instanceof WriteTimeoutException) {
message = "Write timed out.";
} else {
message = cause.toString();
}
this.disconnect(message, cause);
}
@ExceptionHandler(value = {ConnectTimeoutException.class})
public Result handle(ConnectTimeoutException ex) {
log.error("connect timeout exception:{}", ex.getMessage());
return Result.fail(ErrorType.GATEWAY_CONNECT_TIME_OUT);
}
@DataProvider(name = "fromMockedNetworkFailureToExpectedDocumentClientException")
public Object[][] fromMockedNetworkFailureToExpectedDocumentClientException() {
return new Object[][]{
// create request, retriable network exception
{
createRequestFromName(
OperationType.Create, "dbs/db/colls/col", ResourceType.Document),
HttpClientMockWrapper.
httpClientBehaviourBuilder()
.withNetworkFailure(new UnknownHostException()),
FailureValidator.builder()
.instanceOf(GoneException.class)
},
// create request, retriable network exception
{
createRequestFromName(
OperationType.Create, "dbs/db/colls/col", ResourceType.Document),
HttpClientMockWrapper.
httpClientBehaviourBuilder()
.withNetworkFailure(new UnknownHostException()),
FailureValidator.builder()
.instanceOf(GoneException.class)
},
// create request, retriable network exception
{
createRequestFromName(
OperationType.Create, "dbs/db/colls/col", ResourceType.Document),
HttpClientMockWrapper.
httpClientBehaviourBuilder()
.withNetworkFailure(new ConnectTimeoutException()),
FailureValidator.builder()
.instanceOf(GoneException.class)
},
// read request, retriable network exception
{
createRequestFromName(
OperationType.Read, "dbs/db/colls/col", ResourceType.Document),
HttpClientMockWrapper.
httpClientBehaviourBuilder()
.withNetworkFailure(new ConnectTimeoutException()),
FailureValidator.builder()
.instanceOf(GoneException.class)
},
// create request, non-retriable network exception
{
createRequestFromName(
OperationType.Create, "dbs/db/colls/col", ResourceType.Document),
HttpClientMockWrapper.
httpClientBehaviourBuilder()
.withNetworkFailure(new RuntimeException()),
FailureValidator.builder()
.instanceOf(ServiceUnavailableException.class)
},
// read request, non-retriable network exception
{
createRequestFromName(
OperationType.Read, "dbs/db/colls/col", ResourceType.Document),
HttpClientMockWrapper.
httpClientBehaviourBuilder()
.withNetworkFailure(new RuntimeException()),
FailureValidator.builder()
.instanceOf(GoneException.class)
},
};
}
@ExceptionHandler(value = {ConnectTimeoutException.class})
public Result handle(ConnectTimeoutException ex) {
log.error("connect timeout exception:{}", ex.getMessage());
return Result.fail(SystemErrorType.GATEWAY_CONNECT_TIME_OUT);
}
/**
* Reconcile all the connectors selected by the given connect instance, updated each connectors status with the result.
* @param reconciliation The reconciliation
* @param connect The connector
* @param connectStatus Status of the KafkaConnect or KafkaConnectS2I resource (will be used to set the available
* connector plugins)
* @param scaledToZero Indicated whether the related Connect cluster is currently scaled to 0 replicas
* @return A future, failed if any of the connectors' statuses could not be updated.
*/
protected Future<Void> reconcileConnectors(Reconciliation reconciliation, T connect, S connectStatus, boolean scaledToZero) {
String connectName = connect.getMetadata().getName();
String namespace = connect.getMetadata().getNamespace();
String host = KafkaConnectResources.qualifiedServiceName(connectName, namespace);
if (!isUseResources(connect)) {
return Future.succeededFuture();
}
if (scaledToZero) {
return connectorOperator.listAsync(namespace, Optional.of(new LabelSelectorBuilder().addToMatchLabels(Labels.STRIMZI_CLUSTER_LABEL, connectName).build()))
.compose(connectors -> CompositeFuture.join(
connectors.stream().map(connector -> maybeUpdateConnectorStatus(reconciliation, connector, null, zeroReplicas(namespace, connectName)))
.collect(Collectors.toList())
))
.map((Void) null);
}
KafkaConnectApi apiClient = connectClientProvider.apply(vertx);
return CompositeFuture.join(
apiClient.list(host, port),
connectorOperator.listAsync(namespace, Optional.of(new LabelSelectorBuilder().addToMatchLabels(Labels.STRIMZI_CLUSTER_LABEL, connectName).build())),
apiClient.listConnectorPlugins(host, port)
).compose(cf -> {
List<String> runningConnectorNames = cf.resultAt(0);
List<KafkaConnector> desiredConnectors = cf.resultAt(1);
List<ConnectorPlugin> connectorPlugins = cf.resultAt(2);
log.debug("{}: Setting list of connector plugins in Kafka Connect status", reconciliation);
connectStatus.setConnectorPlugins(connectorPlugins);
if (connectorsResourceCounter != null) {
connectorsResourceCounter.set(desiredConnectors.size());
}
Set<String> deleteConnectorNames = new HashSet<>(runningConnectorNames);
deleteConnectorNames.removeAll(desiredConnectors.stream().map(c -> c.getMetadata().getName()).collect(Collectors.toSet()));
log.debug("{}: {} cluster: delete connectors: {}", reconciliation, kind(), deleteConnectorNames);
Stream<Future<Void>> deletionFutures = deleteConnectorNames.stream().map(connectorName ->
reconcileConnectorAndHandleResult(reconciliation, host, apiClient, true, connectorName, null)
);
log.debug("{}: {} cluster: required connectors: {}", reconciliation, kind(), desiredConnectors);
Stream<Future<Void>> createUpdateFutures = desiredConnectors.stream()
.map(connector -> reconcileConnectorAndHandleResult(reconciliation, host, apiClient, true, connector.getMetadata().getName(), connector));
return CompositeFuture.join(Stream.concat(deletionFutures, createUpdateFutures).collect(Collectors.toList())).map((Void) null);
}).recover(error -> {
if (error instanceof ConnectTimeoutException) {
Promise<Void> connectorStatuses = Promise.promise();
log.warn("{}: Failed to connect to the REST API => trying to update the connector status", reconciliation);
connectorOperator.listAsync(namespace, Optional.of(new LabelSelectorBuilder().addToMatchLabels(Labels.STRIMZI_CLUSTER_LABEL, connectName).build()))
.compose(connectors -> CompositeFuture.join(
connectors.stream().map(connector -> maybeUpdateConnectorStatus(reconciliation, connector, null, error))
.collect(Collectors.toList())
))
.onComplete(ignore -> connectorStatuses.fail(error));
return connectorStatuses.future();
} else {
return Future.failedFuture(error);
}
});
}
/** Create connect, create connector, Scale to 0 */
@Test
public void testConnectScaleToZero() {
String connectName = "cluster";
String connectorName = "connector";
// Create KafkaConnect cluster and wait till it's ready
Crds.kafkaConnectOperation(client).inNamespace(NAMESPACE).createNew()
.withNewMetadata()
.withNamespace(NAMESPACE)
.withName(connectName)
.addToAnnotations(Annotations.STRIMZI_IO_USE_CONNECTOR_RESOURCES, "true")
.endMetadata()
.withNewSpec()
.withReplicas(1)
.endSpec()
.done();
waitForConnectReady(connectName);
// triggered twice (creation followed by status update)
verify(api, times(2)).list(
eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT));
verify(api, never()).createOrUpdatePutRequest(
eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT),
eq(connectorName), any());
// Create KafkaConnector and wait till it's ready
Crds.kafkaConnectorOperation(client).inNamespace(NAMESPACE).createNew()
.withNewMetadata()
.withName(connectorName)
.withNamespace(NAMESPACE)
.addToLabels(Labels.STRIMZI_CLUSTER_LABEL, connectName)
.endMetadata()
.withNewSpec()
.withTasksMax(1)
.withClassName("Dummy")
.endSpec()
.done();
waitForConnectorReady(connectorName);
verify(api, times(2)).list(
eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT));
verify(api, times(2)).createOrUpdatePutRequest(
eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT),
eq(connectorName), any());
assertThat(runningConnectors.keySet(), is(Collections.singleton(key("cluster-connect-api.ns.svc", connectorName))));
when(api.list(any(), anyInt())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
when(api.listConnectorPlugins(any(), anyInt())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
when(api.createOrUpdatePutRequest(any(), anyInt(), anyString(), any())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
when(api.getConnectorConfig(any(), any(), anyInt(), any())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
when(api.getConnector(any(), anyInt(), any())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
Crds.kafkaConnectOperation(client).inNamespace(NAMESPACE).withName(connectName).edit()
.editSpec()
.withReplicas(0)
.endSpec()
.done();
waitForConnectReady(connectName);
waitForConnectorNotReady(connectorName, "RuntimeException", "Kafka Connect cluster 'cluster' in namespace ns has 0 replicas.");
}
/** Create connect, create connector, break the REST API */
@Test
public void testConnectRestAPIIssues() {
String connectName = "cluster";
String connectorName = "connector";
// Create KafkaConnect cluster and wait till it's ready
Crds.kafkaConnectOperation(client).inNamespace(NAMESPACE).createNew()
.withNewMetadata()
.withNamespace(NAMESPACE)
.withName(connectName)
.addToAnnotations(Annotations.STRIMZI_IO_USE_CONNECTOR_RESOURCES, "true")
.endMetadata()
.withNewSpec()
.withReplicas(1)
.endSpec()
.done();
waitForConnectReady(connectName);
// triggered twice (creation followed by status update)
verify(api, times(2)).list(
eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT));
verify(api, never()).createOrUpdatePutRequest(
eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT),
eq(connectorName), any());
// Create KafkaConnector and wait till it's ready
Crds.kafkaConnectorOperation(client).inNamespace(NAMESPACE).createNew()
.withNewMetadata()
.withName(connectorName)
.withNamespace(NAMESPACE)
.addToLabels(Labels.STRIMZI_CLUSTER_LABEL, connectName)
.endMetadata()
.withNewSpec()
.withTasksMax(1)
.withClassName("Dummy")
.endSpec()
.done();
waitForConnectorReady(connectorName);
verify(api, times(2)).list(
eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT));
verify(api, times(2)).createOrUpdatePutRequest(
eq(KafkaConnectResources.qualifiedServiceName(connectName, NAMESPACE)), eq(KafkaConnectCluster.REST_API_PORT),
eq(connectorName), any());
assertThat(runningConnectors.keySet(), is(Collections.singleton(key("cluster-connect-api.ns.svc", connectorName))));
when(api.list(any(), anyInt())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
when(api.listConnectorPlugins(any(), anyInt())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
when(api.createOrUpdatePutRequest(any(), anyInt(), anyString(), any())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
when(api.getConnectorConfig(any(), any(), anyInt(), any())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
when(api.getConnector(any(), anyInt(), any())).thenReturn(Future.failedFuture(new ConnectTimeoutException("connection timed out")));
Crds.kafkaConnectOperation(client).inNamespace(NAMESPACE).withName(connectName).edit()
.editSpec()
.withNewTemplate()
.endTemplate()
.endSpec()
.done();
// Wait for Status change due to the broker REST API
waitForConnectNotReady(connectName, "ConnectTimeoutException", "connection timed out");
waitForConnectorNotReady(connectorName, "ConnectTimeoutException", "connection timed out");
}