org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#io.rsocket.transport.netty.server.CloseableChannel源码实例Demo

下面列出了org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#io.rsocket.transport.netty.server.CloseableChannel 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@SuppressWarnings("unchecked")
private RSocketFactory.Start<CloseableChannel> createRSocketStarter(HttpHandler httpHandler) {
	RSocketFactory.ServerRSocketFactory rSocketFactory = applyCustomizers(RSocketFactory.receive());


	HttpServer httpServer = createHttpServer();
	ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);

	return rSocketFactory
		.acceptor(socketAcceptor)
		.transport((ServerTransport) new WebsocketRouteTransport(
			httpServer,
			r -> r.route(hsr -> !("/" + hsr.path()).equals(path), handlerAdapter),
			path
		));
}
 
源代码2 项目: rsocket-java   文件: ResumeIntegrationTest.java
@Test
void timeoutOnPermanentDisconnect() {
  CloseableChannel closeable = newServerRSocket().block();

  DisconnectableClientTransport clientTransport =
      new DisconnectableClientTransport(clientTransport(closeable.address()));

  int sessionDurationSeconds = 5;
  RSocket rSocket = newClientRSocket(clientTransport, sessionDurationSeconds).block();

  Mono.delay(Duration.ofSeconds(1)).subscribe(v -> clientTransport.disconnectPermanently());

  StepVerifier.create(
          rSocket.requestChannel(testRequest()).then().doFinally(s -> closeable.dispose()))
      .expectError(ClosedChannelException.class)
      .verify(Duration.ofSeconds(7));
}
 
源代码3 项目: rsocket-java   文件: ResumeIntegrationTest.java
@Test
void serverMissingResume() {
  CloseableChannel closeableChannel =
      RSocketServer.create(SocketAcceptor.with(new TestResponderRSocket()))
          .bind(serverTransport(SERVER_HOST, SERVER_PORT))
          .block();

  RSocket rSocket =
      RSocketConnector.create()
          .resume(new Resume())
          .connect(clientTransport(closeableChannel.address()))
          .block();

  StepVerifier.create(rSocket.onClose().doFinally(s -> closeableChannel.dispose()))
      .expectErrorMatches(
          err ->
              err instanceof UnsupportedSetupException
                  && "resume not supported".equals(err.getMessage()))
      .verify(Duration.ofSeconds(5));

  Assertions.assertThat(rSocket.isDisposed()).isTrue();
}
 
源代码4 项目: liiklus   文件: RSocketConfiguration.java
@Override
public void initialize(GenericApplicationContext applicationContext) {
    var environment = applicationContext.getEnvironment();

    if (!environment.acceptsProfiles(Profiles.of("gateway"))) {
        return;
    }

    var serverProperties = PropertiesUtil.bind(environment, new RSocketServerProperties());

    if (!serverProperties.isEnabled()) {
        return;
    }

    applicationContext.registerBean(RSocketLiiklusService.class);

    applicationContext.registerBean(
            CloseableChannel.class,
            () -> {
                var liiklusService = applicationContext.getBean(LiiklusService.class);

                return RSocketFactory.receive()
                        .acceptor((setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(new LiiklusServiceServer(liiklusService, Optional.empty(), Optional.empty()))))
                        .transport(TcpServerTransport.create(serverProperties.getHost(), serverProperties.getPort()))
                        .start()
                        .block();
            },
            it -> {
                it.setDestroyMethodName("dispose");
            }
    );
}
 
源代码5 项目: rpc-benchmark   文件: Server.java
public static void main(String[] args){
    UserServiceServer userServiceServer = new UserServiceServer(new UserServiceRsocketServerImpl(), Optional.empty(), Optional.empty());
    CloseableChannel closeableChannel =
            RSocketFactory.receive()
                    .acceptor(
                            (setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(userServiceServer)))
                    .transport(TcpServerTransport.create(8080))
                    .start()
                    .block();

    // Block so we don't exit
    closeableChannel.onClose().block();
}
 
源代码6 项目: rsocket-java   文件: InteractionsLoadTest.java
@Test
@SlowTest
public void channel() {
  CloseableChannel server =
      RSocketServer.create(SocketAcceptor.with(new EchoRSocket()))
          .bind(TcpServerTransport.create("localhost", 0))
          .block(Duration.ofSeconds(10));

  RSocket clientRSocket =
      RSocketConnector.connectWith(TcpClientTransport.create(server.address()))
          .block(Duration.ofSeconds(10));

  int concurrency = 16;
  Flux.range(1, concurrency)
      .flatMap(
          v ->
              clientRSocket
                  .requestChannel(
                      input().onBackpressureDrop().map(iv -> DefaultPayload.create("foo")))
                  .limitRate(10000),
          concurrency)
      .timeout(Duration.ofSeconds(5))
      .doOnNext(
          p -> {
            String data = p.getDataUtf8();
            if (!data.equals("bar")) {
              throw new IllegalStateException("Channel Client Bad message: " + data);
            }
          })
      .window(Duration.ofSeconds(1))
      .flatMap(Flux::count)
      .doOnNext(d -> System.out.println("Got: " + d))
      .take(Duration.ofMinutes(1))
      .doOnTerminate(server::dispose)
      .subscribe();

  server.onClose().block();
}
 
源代码7 项目: rsocket-java   文件: ResumeIntegrationTest.java
@Test
public void reconnectOnDisconnect() {
  CloseableChannel closeable = newServerRSocket().block();

  DisconnectableClientTransport clientTransport =
      new DisconnectableClientTransport(clientTransport(closeable.address()));

  int sessionDurationSeconds = 15;
  RSocket rSocket = newClientRSocket(clientTransport, sessionDurationSeconds).block();

  Flux.just(3, 20, 40, 75)
      .flatMap(v -> Mono.delay(Duration.ofSeconds(v)))
      .subscribe(v -> clientTransport.disconnectFor(Duration.ofSeconds(7)));

  AtomicInteger counter = new AtomicInteger(-1);
  StepVerifier.create(
          rSocket
              .requestChannel(testRequest())
              .take(Duration.ofSeconds(600))
              .map(Payload::getDataUtf8)
              .timeout(Duration.ofSeconds(12))
              .doOnNext(x -> throwOnNonContinuous(counter, x))
              .then()
              .doFinally(s -> closeable.dispose()))
      .expectComplete()
      .verify();
}
 
源代码8 项目: rsocket-java   文件: ResumeIntegrationTest.java
@Test
public void reconnectOnMissingSession() {

  int serverSessionDuration = 2;

  CloseableChannel closeable = newServerRSocket(serverSessionDuration).block();

  DisconnectableClientTransport clientTransport =
      new DisconnectableClientTransport(clientTransport(closeable.address()));
  int clientSessionDurationSeconds = 10;

  RSocket rSocket = newClientRSocket(clientTransport, clientSessionDurationSeconds).block();

  Mono.delay(Duration.ofSeconds(1))
      .subscribe(v -> clientTransport.disconnectFor(Duration.ofSeconds(3)));

  StepVerifier.create(
          rSocket.requestChannel(testRequest()).then().doFinally(s -> closeable.dispose()))
      .expectError()
      .verify(Duration.ofSeconds(5));

  StepVerifier.create(rSocket.onClose())
      .expectErrorMatches(
          err ->
              err instanceof RejectedResumeException
                  && "unknown resume token".equals(err.getMessage()))
      .verify(Duration.ofSeconds(5));
}
 
源代码9 项目: rsocket-java   文件: ResumeIntegrationTest.java
private static Mono<CloseableChannel> newServerRSocket(int sessionDurationSeconds) {
  return RSocketServer.create(SocketAcceptor.with(new TestResponderRSocket()))
      .resume(
          new Resume()
              .sessionDuration(Duration.ofSeconds(sessionDurationSeconds))
              .cleanupStoreOnKeepAlive()
              .storeFactory(t -> new InMemoryResumableFramesStore("server", 500_000)))
      .bind(serverTransport(SERVER_HOST, SERVER_PORT));
}
 
@ParameterizedTest
@MethodSource("arguments")
void serverSucceedsWithEnabledFragmentationOnSufficientMtu(
    ServerTransport<CloseableChannel> serverTransport) {
  Mono<CloseableChannel> server =
      RSocketServer.create(mockAcceptor())
          .fragment(100)
          .bind(serverTransport)
          .doOnNext(CloseableChannel::dispose);
  StepVerifier.create(server).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
 
@ParameterizedTest
@MethodSource("arguments")
void serverSucceedsWithDisabledFragmentation(ServerTransport<CloseableChannel> serverTransport) {
  Mono<CloseableChannel> server =
      RSocketServer.create(mockAcceptor())
          .bind(serverTransport)
          .doOnNext(CloseableChannel::dispose);
  StepVerifier.create(server).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
 
@ParameterizedTest
@MethodSource("arguments")
void clientSucceedsWithEnabledFragmentationOnSufficientMtu(
    ServerTransport<CloseableChannel> serverTransport) {
  CloseableChannel server =
      RSocketServer.create(mockAcceptor()).fragment(100).bind(serverTransport).block();

  Mono<RSocket> rSocket =
      RSocketConnector.create()
          .fragment(100)
          .connect(TcpClientTransport.create(server.address()))
          .doFinally(s -> server.dispose());
  StepVerifier.create(rSocket).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
 
@ParameterizedTest
@MethodSource("arguments")
void clientSucceedsWithDisabledFragmentation(ServerTransport<CloseableChannel> serverTransport) {
  CloseableChannel server = RSocketServer.create(mockAcceptor()).bind(serverTransport).block();

  Mono<RSocket> rSocket =
      RSocketConnector.connectWith(TcpClientTransport.create(server.address()))
          .doFinally(s -> server.dispose());
  StepVerifier.create(rSocket).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
 
源代码14 项目: rsocket-java   文件: SetupRejectionTest.java
void rejectSetupTcp(
    Function<InetSocketAddress, ServerTransport<CloseableChannel>> serverTransport,
    Function<InetSocketAddress, ClientTransport> clientTransport) {

  String errorMessage = "error";
  RejectingAcceptor acceptor = new RejectingAcceptor(errorMessage);
  Mono<RSocket> serverRequester = acceptor.requesterRSocket();

  CloseableChannel channel =
      RSocketServer.create(acceptor)
          .bind(serverTransport.apply(new InetSocketAddress("localhost", 0)))
          .block(Duration.ofSeconds(5));

  ErrorConsumer errorConsumer = new ErrorConsumer();

  RSocket clientRequester =
      RSocketConnector.connectWith(clientTransport.apply(channel.address()))
          .doOnError(errorConsumer)
          .block(Duration.ofSeconds(5));

  StepVerifier.create(errorConsumer.errors().next())
      .expectNextMatches(
          err -> err instanceof RejectedSetupException && errorMessage.equals(err.getMessage()))
      .expectComplete()
      .verify(Duration.ofSeconds(5));

  StepVerifier.create(clientRequester.onClose()).expectComplete().verify(Duration.ofSeconds(5));

  StepVerifier.create(serverRequester.flatMap(socket -> socket.onClose()))
      .expectComplete()
      .verify(Duration.ofSeconds(5));

  StepVerifier.create(clientRequester.requestResponse(DefaultPayload.create("test")))
      .expectErrorMatches(
          err -> err instanceof RejectedSetupException && errorMessage.equals(err.getMessage()))
      .verify(Duration.ofSeconds(5));

  channel.dispose();
}
 
源代码15 项目: rsocket-java   文件: SetupRejectionTest.java
static Stream<Arguments> transports() {
  Function<InetSocketAddress, ServerTransport<CloseableChannel>> tcpServer =
      TcpServerTransport::create;
  Function<InetSocketAddress, ServerTransport<CloseableChannel>> wsServer =
      WebsocketServerTransport::create;
  Function<InetSocketAddress, ClientTransport> tcpClient = TcpClientTransport::create;
  Function<InetSocketAddress, ClientTransport> wsClient = WebsocketClientTransport::create;

  return Stream.of(Arguments.of(tcpServer, tcpClient), Arguments.of(wsServer, wsClient));
}
 
源代码16 项目: spring-boot-rsocket   文件: RSocketWebServer.java
public RSocketWebServer(RSocketFactory.Start<CloseableChannel> rSocketServer) {
    Assert.notNull(rSocketServer, "HttpServer must not be null");
    this.rSocketServer = rSocketServer;
}
 
源代码17 项目: spring-boot-rsocket   文件: RSocketWebServer.java
private CloseableChannel startHttpServer() {
    return rSocketServer.start()
                        .block();
}
 
源代码18 项目: rsocket-java   文件: ResumeFileTransfer.java
public static void main(String[] args) {

    Resume resume =
        new Resume()
            .sessionDuration(Duration.ofMinutes(5))
            .retry(
                Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1))
                    .doBeforeRetry(s -> logger.debug("Disconnected. Trying to resume...")));

    RequestCodec codec = new RequestCodec();

    CloseableChannel server =
        RSocketServer.create(
                SocketAcceptor.forRequestStream(
                    payload -> {
                      Request request = codec.decode(payload);
                      payload.release();
                      String fileName = request.getFileName();
                      int chunkSize = request.getChunkSize();

                      Flux<Long> ticks = Flux.interval(Duration.ofMillis(500)).onBackpressureDrop();

                      return Files.fileSource(fileName, chunkSize)
                          .map(DefaultPayload::create)
                          .zipWith(ticks, (p, tick) -> p);
                    }))
            .resume(resume)
            .bind(TcpServerTransport.create("localhost", 8000))
            .block();

    RSocket client =
        RSocketConnector.create()
            .resume(resume)
            .connect(TcpClientTransport.create("localhost", 8001))
            .block();

    client
        .requestStream(codec.encode(new Request(16, "lorem.txt")))
        .doFinally(s -> server.dispose())
        .subscribe(Files.fileSink("rsocket-examples/out/lorem_output.txt", PREFETCH_WINDOW_SIZE));

    server.onClose().block();
  }
 
源代码19 项目: rsocket-java   文件: LeaseExample.java
public static void main(String[] args) {
  // Queue for incoming messages represented as Flux
  // Imagine that every fireAndForget that is pushed is processed by a worker

  int queueCapacity = 50;
  BlockingQueue<String> messagesQueue = new ArrayBlockingQueue<>(queueCapacity);

  // emulating a worker that process data from the queue
  Thread workerThread =
      new Thread(
          () -> {
            try {
              while (!Thread.currentThread().isInterrupted()) {
                String message = messagesQueue.take();
                logger.info("Process message {}", message);
                Thread.sleep(500); // emulating processing
              }
            } catch (InterruptedException e) {
              throw new RuntimeException(e);
            }
          });

  workerThread.start();

  CloseableChannel server =
      RSocketServer.create(
              (setup, sendingSocket) ->
                  Mono.just(
                      new RSocket() {
                        @Override
                        public Mono<Void> fireAndForget(Payload payload) {
                          // add element. if overflows errors and terminates execution
                          // specifically to show that lease can limit rate of fnf requests in
                          // that example
                          try {
                            if (!messagesQueue.offer(payload.getDataUtf8())) {
                              logger.error("Queue has been overflowed. Terminating execution");
                              sendingSocket.dispose();
                              workerThread.interrupt();
                            }
                          } finally {
                            payload.release();
                          }
                          return Mono.empty();
                        }
                      }))
          .lease(() -> Leases.create().sender(new LeaseCalculator(SERVER_TAG, messagesQueue)))
          .bindNow(TcpServerTransport.create("localhost", 7000));

  LeaseReceiver receiver = new LeaseReceiver(CLIENT_TAG);
  RSocket clientRSocket =
      RSocketConnector.create()
          .lease(() -> Leases.create().receiver(receiver))
          .connect(TcpClientTransport.create(server.address()))
          .block();

  Objects.requireNonNull(clientRSocket);

  // generate stream of fnfs
  Flux.generate(
          () -> 0L,
          (state, sink) -> {
            sink.next(state);
            return state + 1;
          })
      // here we wait for the first lease for the responder side and start execution
      // on if there is allowance
      .delaySubscription(receiver.notifyWhenNewLease().then())
      .concatMap(
          tick -> {
            logger.info("Requesting FireAndForget({})", tick);
            return Mono.defer(() -> clientRSocket.fireAndForget(ByteBufPayload.create("" + tick)))
                .retryWhen(
                    Retry.indefinitely()
                        // ensures that error is the result of missed lease
                        .filter(t -> t instanceof MissingLeaseException)
                        .doBeforeRetryAsync(
                            rs -> {
                              // here we create a mechanism to delay the retry until
                              // the new lease allowance comes in.
                              logger.info("Ran out of leases {}", rs);
                              return receiver.notifyWhenNewLease().then();
                            }));
          })
      .blockLast();

  clientRSocket.onClose().block();
  server.dispose();
}
 
源代码20 项目: rsocket-java   文件: ResumeIntegrationTest.java
static ServerTransport<CloseableChannel> serverTransport(String host, int port) {
  return TcpServerTransport.create(host, port);
}
 
源代码21 项目: rsocket-java   文件: ResumeIntegrationTest.java
private static Mono<CloseableChannel> newServerRSocket() {
  return newServerRSocket(15);
}
 
static Stream<? extends ServerTransport<CloseableChannel>> arguments() {
  return Stream.of(TcpServerTransport.create(0), WebsocketServerTransport.create(0));
}