下面列出了怎么用org.springframework.messaging.tcp.TcpConnectionHandler的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
Assert.notNull(handler, "TcpConnectionHandler is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
Mono<Void> connectMono = this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnError(handler::afterConnectFailure)
.then();
return new MonoToListenableFutureAdapter<>(connectMono);
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
// Report first connect to the ListenableFuture
MonoProcessor<Void> connectMono = MonoProcessor.create();
this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.subscribe();
return new MonoToListenableFutureAdapter<>(connectMono);
}
@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
Assert.notNull(handler, "TcpConnectionHandler is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
Mono<Void> connectMono = this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnError(handler::afterConnectFailure)
.then();
return new MonoToListenableFutureAdapter<>(connectMono);
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
return handleShuttingDownConnectFailure(handler);
}
// Report first connect to the ListenableFuture
MonoProcessor<Void> connectMono = MonoProcessor.create();
this.tcpClient
.handle(new ReactorNettyHandler(handler))
.connect()
.doOnNext(updateConnectMono(connectMono))
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.subscribe();
return new MonoToListenableFutureAdapter<>(connectMono);
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler, ReconnectStrategy strategy) {
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
Assert.notNull(strategy, "ReconnectStrategy must not be null");
TcpClient<Message<P>, Message<P>> tcpClient;
synchronized (this.tcpClients) {
if (this.stopping) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
connectionHandler.afterConnectFailure(ex);
return new PassThroughPromiseToListenableFutureAdapter<Void>(Promises.<Void>error(ex));
}
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
this.tcpClients.add(tcpClient);
}
Stream<Tuple2<InetSocketAddress, Integer>> stream = tcpClient.start(
new MessageChannelStreamHandler<P>(connectionHandler),
new ReactorReconnectAdapter(strategy));
return new PassThroughPromiseToListenableFutureAdapter<Void>(stream.next().after());
}
private ListenableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
handler.afterConnectFailure(ex);
return new MonoToListenableFutureAdapter<>(Mono.error(ex));
}
ReactorNettyHandler(TcpConnectionHandler<P> handler) {
this.connectionHandler = handler;
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler) {
this.connectionHandler = handler;
handler.afterConnected(this.connection);
return getVoidFuture();
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
this.connectionHandler = handler;
handler.afterConnected(this.connection);
return getVoidFuture();
}
public WebSocketTcpConnectionHandlerAdapter(TcpConnectionHandler<byte[]> connectionHandler) {
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
this.connectionHandler = connectionHandler;
}
private ListenableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) {
IllegalStateException ex = new IllegalStateException("Shutting down.");
handler.afterConnectFailure(ex);
return new MonoToListenableFutureAdapter<>(Mono.error(ex));
}
ReactorNettyHandler(TcpConnectionHandler<P> handler) {
this.connectionHandler = handler;
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler) {
this.connectionHandler = handler;
handler.afterConnected(this.connection);
return getVoidFuture();
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
this.connectionHandler = handler;
handler.afterConnected(this.connection);
return getVoidFuture();
}
public WebSocketTcpConnectionHandlerAdapter(TcpConnectionHandler<byte[]> connectionHandler) {
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
this.connectionHandler = connectionHandler;
}
public MessageChannelStreamHandler(TcpConnectionHandler<P> connectionHandler) {
this.connectionHandler = connectionHandler;
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler) {
this.connectionHandler = handler;
handler.afterConnected(this.connection);
return getVoidFuture();
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
this.connectionHandler = handler;
handler.afterConnected(this.connection);
return getVoidFuture();
}
public WebSocketTcpConnectionHandlerAdapter(TcpConnectionHandler<byte[]> connectionHandler) {
Assert.notNull(connectionHandler);
this.connectionHandler = connectionHandler;
}