下面列出了怎么用org.springframework.messaging.tcp.ReconnectStrategy的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
// Report first connect to the ListenableFuture
MonoProcessor<Void> connectMono = MonoProcessor.create();
this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.subscribe();
return new MonoToListenableFutureAdapter<>(connectMono);
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
// Report first connect to the ListenableFuture
MonoProcessor<Void> connectMono = MonoProcessor.create();
this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.subscribe();
return new MonoToListenableFutureAdapter<>(connectMono);
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy strategy) {
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
Assert.notNull(strategy, "ReconnectStrategy must not be null");
TcpClient<Message<P>, Message<P>> tcpClient;
synchronized (this.tcpClients) {
if (this.stopping) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
connectionHandler.afterConnectFailure(ex);
return new PassThroughPromiseToListenableFutureAdapter<Void>(Promises.<Void>error(ex));
}
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
this.tcpClients.add(tcpClient);
}
Stream<Tuple2<InetSocketAddress, Integer>> stream = tcpClient.start(
new MessageChannelStreamHandler<P>(connectionHandler),
new ReactorReconnectAdapter(strategy));
return new PassThroughPromiseToListenableFutureAdapter<Void>(stream.next().after());
}
private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
return flux -> flux
.scan(1, (count, element) -> count++)
.flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(attempt))
.map(time -> Mono.delay(Duration.ofMillis(time), this.scheduler))
.orElse(Mono.empty()));
}
private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
return flux -> flux
.scan(1, (count, element) -> count++)
.flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(attempt))
.map(time -> Mono.delay(Duration.ofMillis(time), this.scheduler))
.orElse(Mono.empty()));
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
this.connectionHandler = handler;
handler.afterConnected(this.connection);
return getVoidFuture();
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
this.connectionHandler = handler;
handler.afterConnected(this.connection);
return getVoidFuture();
}
public ReactorReconnectAdapter(ReconnectStrategy strategy) {
this.strategy = strategy;
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
this.connectionHandler = handler;
handler.afterConnected(this.connection);
return getVoidFuture();
}