下面列出了org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#io.rsocket.transport.netty.server.CloseableChannel 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("unchecked")
private RSocketFactory.Start<CloseableChannel> createRSocketStarter(HttpHandler httpHandler) {
RSocketFactory.ServerRSocketFactory rSocketFactory = applyCustomizers(RSocketFactory.receive());
HttpServer httpServer = createHttpServer();
ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
return rSocketFactory
.acceptor(socketAcceptor)
.transport((ServerTransport) new WebsocketRouteTransport(
httpServer,
r -> r.route(hsr -> !("/" + hsr.path()).equals(path), handlerAdapter),
path
));
}
@Test
void timeoutOnPermanentDisconnect() {
CloseableChannel closeable = newServerRSocket().block();
DisconnectableClientTransport clientTransport =
new DisconnectableClientTransport(clientTransport(closeable.address()));
int sessionDurationSeconds = 5;
RSocket rSocket = newClientRSocket(clientTransport, sessionDurationSeconds).block();
Mono.delay(Duration.ofSeconds(1)).subscribe(v -> clientTransport.disconnectPermanently());
StepVerifier.create(
rSocket.requestChannel(testRequest()).then().doFinally(s -> closeable.dispose()))
.expectError(ClosedChannelException.class)
.verify(Duration.ofSeconds(7));
}
@Test
void serverMissingResume() {
CloseableChannel closeableChannel =
RSocketServer.create(SocketAcceptor.with(new TestResponderRSocket()))
.bind(serverTransport(SERVER_HOST, SERVER_PORT))
.block();
RSocket rSocket =
RSocketConnector.create()
.resume(new Resume())
.connect(clientTransport(closeableChannel.address()))
.block();
StepVerifier.create(rSocket.onClose().doFinally(s -> closeableChannel.dispose()))
.expectErrorMatches(
err ->
err instanceof UnsupportedSetupException
&& "resume not supported".equals(err.getMessage()))
.verify(Duration.ofSeconds(5));
Assertions.assertThat(rSocket.isDisposed()).isTrue();
}
@Override
public void initialize(GenericApplicationContext applicationContext) {
var environment = applicationContext.getEnvironment();
if (!environment.acceptsProfiles(Profiles.of("gateway"))) {
return;
}
var serverProperties = PropertiesUtil.bind(environment, new RSocketServerProperties());
if (!serverProperties.isEnabled()) {
return;
}
applicationContext.registerBean(RSocketLiiklusService.class);
applicationContext.registerBean(
CloseableChannel.class,
() -> {
var liiklusService = applicationContext.getBean(LiiklusService.class);
return RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(new LiiklusServiceServer(liiklusService, Optional.empty(), Optional.empty()))))
.transport(TcpServerTransport.create(serverProperties.getHost(), serverProperties.getPort()))
.start()
.block();
},
it -> {
it.setDestroyMethodName("dispose");
}
);
}
public static void main(String[] args){
UserServiceServer userServiceServer = new UserServiceServer(new UserServiceRsocketServerImpl(), Optional.empty(), Optional.empty());
CloseableChannel closeableChannel =
RSocketFactory.receive()
.acceptor(
(setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(userServiceServer)))
.transport(TcpServerTransport.create(8080))
.start()
.block();
// Block so we don't exit
closeableChannel.onClose().block();
}
@Test
@SlowTest
public void channel() {
CloseableChannel server =
RSocketServer.create(SocketAcceptor.with(new EchoRSocket()))
.bind(TcpServerTransport.create("localhost", 0))
.block(Duration.ofSeconds(10));
RSocket clientRSocket =
RSocketConnector.connectWith(TcpClientTransport.create(server.address()))
.block(Duration.ofSeconds(10));
int concurrency = 16;
Flux.range(1, concurrency)
.flatMap(
v ->
clientRSocket
.requestChannel(
input().onBackpressureDrop().map(iv -> DefaultPayload.create("foo")))
.limitRate(10000),
concurrency)
.timeout(Duration.ofSeconds(5))
.doOnNext(
p -> {
String data = p.getDataUtf8();
if (!data.equals("bar")) {
throw new IllegalStateException("Channel Client Bad message: " + data);
}
})
.window(Duration.ofSeconds(1))
.flatMap(Flux::count)
.doOnNext(d -> System.out.println("Got: " + d))
.take(Duration.ofMinutes(1))
.doOnTerminate(server::dispose)
.subscribe();
server.onClose().block();
}
@Test
public void reconnectOnDisconnect() {
CloseableChannel closeable = newServerRSocket().block();
DisconnectableClientTransport clientTransport =
new DisconnectableClientTransport(clientTransport(closeable.address()));
int sessionDurationSeconds = 15;
RSocket rSocket = newClientRSocket(clientTransport, sessionDurationSeconds).block();
Flux.just(3, 20, 40, 75)
.flatMap(v -> Mono.delay(Duration.ofSeconds(v)))
.subscribe(v -> clientTransport.disconnectFor(Duration.ofSeconds(7)));
AtomicInteger counter = new AtomicInteger(-1);
StepVerifier.create(
rSocket
.requestChannel(testRequest())
.take(Duration.ofSeconds(600))
.map(Payload::getDataUtf8)
.timeout(Duration.ofSeconds(12))
.doOnNext(x -> throwOnNonContinuous(counter, x))
.then()
.doFinally(s -> closeable.dispose()))
.expectComplete()
.verify();
}
@Test
public void reconnectOnMissingSession() {
int serverSessionDuration = 2;
CloseableChannel closeable = newServerRSocket(serverSessionDuration).block();
DisconnectableClientTransport clientTransport =
new DisconnectableClientTransport(clientTransport(closeable.address()));
int clientSessionDurationSeconds = 10;
RSocket rSocket = newClientRSocket(clientTransport, clientSessionDurationSeconds).block();
Mono.delay(Duration.ofSeconds(1))
.subscribe(v -> clientTransport.disconnectFor(Duration.ofSeconds(3)));
StepVerifier.create(
rSocket.requestChannel(testRequest()).then().doFinally(s -> closeable.dispose()))
.expectError()
.verify(Duration.ofSeconds(5));
StepVerifier.create(rSocket.onClose())
.expectErrorMatches(
err ->
err instanceof RejectedResumeException
&& "unknown resume token".equals(err.getMessage()))
.verify(Duration.ofSeconds(5));
}
private static Mono<CloseableChannel> newServerRSocket(int sessionDurationSeconds) {
return RSocketServer.create(SocketAcceptor.with(new TestResponderRSocket()))
.resume(
new Resume()
.sessionDuration(Duration.ofSeconds(sessionDurationSeconds))
.cleanupStoreOnKeepAlive()
.storeFactory(t -> new InMemoryResumableFramesStore("server", 500_000)))
.bind(serverTransport(SERVER_HOST, SERVER_PORT));
}
@ParameterizedTest
@MethodSource("arguments")
void serverSucceedsWithEnabledFragmentationOnSufficientMtu(
ServerTransport<CloseableChannel> serverTransport) {
Mono<CloseableChannel> server =
RSocketServer.create(mockAcceptor())
.fragment(100)
.bind(serverTransport)
.doOnNext(CloseableChannel::dispose);
StepVerifier.create(server).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
@ParameterizedTest
@MethodSource("arguments")
void serverSucceedsWithDisabledFragmentation(ServerTransport<CloseableChannel> serverTransport) {
Mono<CloseableChannel> server =
RSocketServer.create(mockAcceptor())
.bind(serverTransport)
.doOnNext(CloseableChannel::dispose);
StepVerifier.create(server).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
@ParameterizedTest
@MethodSource("arguments")
void clientSucceedsWithEnabledFragmentationOnSufficientMtu(
ServerTransport<CloseableChannel> serverTransport) {
CloseableChannel server =
RSocketServer.create(mockAcceptor()).fragment(100).bind(serverTransport).block();
Mono<RSocket> rSocket =
RSocketConnector.create()
.fragment(100)
.connect(TcpClientTransport.create(server.address()))
.doFinally(s -> server.dispose());
StepVerifier.create(rSocket).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
@ParameterizedTest
@MethodSource("arguments")
void clientSucceedsWithDisabledFragmentation(ServerTransport<CloseableChannel> serverTransport) {
CloseableChannel server = RSocketServer.create(mockAcceptor()).bind(serverTransport).block();
Mono<RSocket> rSocket =
RSocketConnector.connectWith(TcpClientTransport.create(server.address()))
.doFinally(s -> server.dispose());
StepVerifier.create(rSocket).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
void rejectSetupTcp(
Function<InetSocketAddress, ServerTransport<CloseableChannel>> serverTransport,
Function<InetSocketAddress, ClientTransport> clientTransport) {
String errorMessage = "error";
RejectingAcceptor acceptor = new RejectingAcceptor(errorMessage);
Mono<RSocket> serverRequester = acceptor.requesterRSocket();
CloseableChannel channel =
RSocketServer.create(acceptor)
.bind(serverTransport.apply(new InetSocketAddress("localhost", 0)))
.block(Duration.ofSeconds(5));
ErrorConsumer errorConsumer = new ErrorConsumer();
RSocket clientRequester =
RSocketConnector.connectWith(clientTransport.apply(channel.address()))
.doOnError(errorConsumer)
.block(Duration.ofSeconds(5));
StepVerifier.create(errorConsumer.errors().next())
.expectNextMatches(
err -> err instanceof RejectedSetupException && errorMessage.equals(err.getMessage()))
.expectComplete()
.verify(Duration.ofSeconds(5));
StepVerifier.create(clientRequester.onClose()).expectComplete().verify(Duration.ofSeconds(5));
StepVerifier.create(serverRequester.flatMap(socket -> socket.onClose()))
.expectComplete()
.verify(Duration.ofSeconds(5));
StepVerifier.create(clientRequester.requestResponse(DefaultPayload.create("test")))
.expectErrorMatches(
err -> err instanceof RejectedSetupException && errorMessage.equals(err.getMessage()))
.verify(Duration.ofSeconds(5));
channel.dispose();
}
static Stream<Arguments> transports() {
Function<InetSocketAddress, ServerTransport<CloseableChannel>> tcpServer =
TcpServerTransport::create;
Function<InetSocketAddress, ServerTransport<CloseableChannel>> wsServer =
WebsocketServerTransport::create;
Function<InetSocketAddress, ClientTransport> tcpClient = TcpClientTransport::create;
Function<InetSocketAddress, ClientTransport> wsClient = WebsocketClientTransport::create;
return Stream.of(Arguments.of(tcpServer, tcpClient), Arguments.of(wsServer, wsClient));
}
public RSocketWebServer(RSocketFactory.Start<CloseableChannel> rSocketServer) {
Assert.notNull(rSocketServer, "HttpServer must not be null");
this.rSocketServer = rSocketServer;
}
private CloseableChannel startHttpServer() {
return rSocketServer.start()
.block();
}
public static void main(String[] args) {
Resume resume =
new Resume()
.sessionDuration(Duration.ofMinutes(5))
.retry(
Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1))
.doBeforeRetry(s -> logger.debug("Disconnected. Trying to resume...")));
RequestCodec codec = new RequestCodec();
CloseableChannel server =
RSocketServer.create(
SocketAcceptor.forRequestStream(
payload -> {
Request request = codec.decode(payload);
payload.release();
String fileName = request.getFileName();
int chunkSize = request.getChunkSize();
Flux<Long> ticks = Flux.interval(Duration.ofMillis(500)).onBackpressureDrop();
return Files.fileSource(fileName, chunkSize)
.map(DefaultPayload::create)
.zipWith(ticks, (p, tick) -> p);
}))
.resume(resume)
.bind(TcpServerTransport.create("localhost", 8000))
.block();
RSocket client =
RSocketConnector.create()
.resume(resume)
.connect(TcpClientTransport.create("localhost", 8001))
.block();
client
.requestStream(codec.encode(new Request(16, "lorem.txt")))
.doFinally(s -> server.dispose())
.subscribe(Files.fileSink("rsocket-examples/out/lorem_output.txt", PREFETCH_WINDOW_SIZE));
server.onClose().block();
}
public static void main(String[] args) {
// Queue for incoming messages represented as Flux
// Imagine that every fireAndForget that is pushed is processed by a worker
int queueCapacity = 50;
BlockingQueue<String> messagesQueue = new ArrayBlockingQueue<>(queueCapacity);
// emulating a worker that process data from the queue
Thread workerThread =
new Thread(
() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
String message = messagesQueue.take();
logger.info("Process message {}", message);
Thread.sleep(500); // emulating processing
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
workerThread.start();
CloseableChannel server =
RSocketServer.create(
(setup, sendingSocket) ->
Mono.just(
new RSocket() {
@Override
public Mono<Void> fireAndForget(Payload payload) {
// add element. if overflows errors and terminates execution
// specifically to show that lease can limit rate of fnf requests in
// that example
try {
if (!messagesQueue.offer(payload.getDataUtf8())) {
logger.error("Queue has been overflowed. Terminating execution");
sendingSocket.dispose();
workerThread.interrupt();
}
} finally {
payload.release();
}
return Mono.empty();
}
}))
.lease(() -> Leases.create().sender(new LeaseCalculator(SERVER_TAG, messagesQueue)))
.bindNow(TcpServerTransport.create("localhost", 7000));
LeaseReceiver receiver = new LeaseReceiver(CLIENT_TAG);
RSocket clientRSocket =
RSocketConnector.create()
.lease(() -> Leases.create().receiver(receiver))
.connect(TcpClientTransport.create(server.address()))
.block();
Objects.requireNonNull(clientRSocket);
// generate stream of fnfs
Flux.generate(
() -> 0L,
(state, sink) -> {
sink.next(state);
return state + 1;
})
// here we wait for the first lease for the responder side and start execution
// on if there is allowance
.delaySubscription(receiver.notifyWhenNewLease().then())
.concatMap(
tick -> {
logger.info("Requesting FireAndForget({})", tick);
return Mono.defer(() -> clientRSocket.fireAndForget(ByteBufPayload.create("" + tick)))
.retryWhen(
Retry.indefinitely()
// ensures that error is the result of missed lease
.filter(t -> t instanceof MissingLeaseException)
.doBeforeRetryAsync(
rs -> {
// here we create a mechanism to delay the retry until
// the new lease allowance comes in.
logger.info("Ran out of leases {}", rs);
return receiver.notifyWhenNewLease().then();
}));
})
.blockLast();
clientRSocket.onClose().block();
server.dispose();
}
static ServerTransport<CloseableChannel> serverTransport(String host, int port) {
return TcpServerTransport.create(host, port);
}
private static Mono<CloseableChannel> newServerRSocket() {
return newServerRSocket(15);
}
static Stream<? extends ServerTransport<CloseableChannel>> arguments() {
return Stream.of(TcpServerTransport.create(0), WebsocketServerTransport.create(0));
}