类org.springframework.messaging.tcp.ReconnectStrategy源码实例Demo

下面列出了怎么用org.springframework.messaging.tcp.ReconnectStrategy的API类实例代码及写法,或者点击链接到github查看源代码。

@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
	Assert.notNull(handler, "TcpConnectionHandler is required");
	Assert.notNull(strategy, "ReconnectStrategy is required");

	if (this.stopping) {
		return handleShuttingDownConnectFailure(handler);
	}

	// Report first connect to the ListenableFuture
	MonoProcessor<Void> connectMono = MonoProcessor.create();

	this.tcpClient
			.handle(new ReactorNettyHandler(handler))
			.connect()
			.doOnNext(updateConnectMono(connectMono))
			.doOnError(updateConnectMono(connectMono))
			.doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
			.flatMap(Connection::onDispose)             // post-connect issues
			.retryWhen(reconnectFunction(strategy))
			.repeatWhen(reconnectFunction(strategy))
			.subscribe();

	return new MonoToListenableFutureAdapter<>(connectMono);
}
 
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
	Assert.notNull(handler, "TcpConnectionHandler is required");
	Assert.notNull(strategy, "ReconnectStrategy is required");

	if (this.stopping) {
		return handleShuttingDownConnectFailure(handler);
	}

	// Report first connect to the ListenableFuture
	MonoProcessor<Void> connectMono = MonoProcessor.create();

	this.tcpClient
			.handle(new ReactorNettyHandler(handler))
			.connect()
			.doOnNext(updateConnectMono(connectMono))
			.doOnError(updateConnectMono(connectMono))
			.doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
			.flatMap(Connection::onDispose)             // post-connect issues
			.retryWhen(reconnectFunction(strategy))
			.repeatWhen(reconnectFunction(strategy))
			.subscribe();

	return new MonoToListenableFutureAdapter<>(connectMono);
}
 
源代码3 项目: spring4-understanding   文件: Reactor2TcpClient.java
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy strategy) {
	Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
	Assert.notNull(strategy, "ReconnectStrategy must not be null");

	TcpClient<Message<P>, Message<P>> tcpClient;
	synchronized (this.tcpClients) {
		if (this.stopping) {
			IllegalStateException ex = new IllegalStateException("Shutting down.");
			connectionHandler.afterConnectFailure(ex);
			return new PassThroughPromiseToListenableFutureAdapter<Void>(Promises.<Void>error(ex));
		}
		tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
		this.tcpClients.add(tcpClient);
	}

	Stream<Tuple2<InetSocketAddress, Integer>> stream = tcpClient.start(
			new MessageChannelStreamHandler<P>(connectionHandler),
			new ReactorReconnectAdapter(strategy));

	return new PassThroughPromiseToListenableFutureAdapter<Void>(stream.next().after());
}
 
private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
	return flux -> flux
			.scan(1, (count, element) -> count++)
			.flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(attempt))
					.map(time -> Mono.delay(Duration.ofMillis(time), this.scheduler))
					.orElse(Mono.empty()));
}
 
private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
	return flux -> flux
			.scan(1, (count, element) -> count++)
			.flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(attempt))
					.map(time -> Mono.delay(Duration.ofMillis(time), this.scheduler))
					.orElse(Mono.empty()));
}
 
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
	this.connectionHandler = handler;
	handler.afterConnected(this.connection);
	return getVoidFuture();
}
 
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
	this.connectionHandler = handler;
	handler.afterConnected(this.connection);
	return getVoidFuture();
}
 
源代码8 项目: spring4-understanding   文件: Reactor2TcpClient.java
public ReactorReconnectAdapter(ReconnectStrategy strategy) {
	this.strategy = strategy;
}
 
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
	this.connectionHandler = handler;
	handler.afterConnected(this.connection);
	return getVoidFuture();
}
 
 类方法
 同包方法