org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#io.rsocket.transport.ServerTransport源码实例Demo

下面列出了org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#io.rsocket.transport.ServerTransport 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@SuppressWarnings("unchecked")
private RSocketFactory.Start<CloseableChannel> createRSocketStarter(HttpHandler httpHandler) {
	RSocketFactory.ServerRSocketFactory rSocketFactory = applyCustomizers(RSocketFactory.receive());


	HttpServer httpServer = createHttpServer();
	ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);

	return rSocketFactory
		.acceptor(socketAcceptor)
		.transport((ServerTransport) new WebsocketRouteTransport(
			httpServer,
			r -> r.route(hsr -> !("/" + hsr.path()).equals(path), handlerAdapter),
			path
		));
}
 
源代码2 项目: rsocket-java   文件: LocalClientTransport.java
@Override
public Mono<DuplexConnection> connect() {
  return Mono.defer(
      () -> {
        ServerTransport.ConnectionAcceptor server = LocalServerTransport.findServer(name);
        if (server == null) {
          return Mono.error(new IllegalArgumentException("Could not find server: " + name));
        }

        UnboundedProcessor<ByteBuf> in = new UnboundedProcessor<>();
        UnboundedProcessor<ByteBuf> out = new UnboundedProcessor<>();
        MonoProcessor<Void> closeNotifier = MonoProcessor.create();

        server.apply(new LocalDuplexConnection(allocator, out, in, closeNotifier)).subscribe();

        return Mono.just(
            (DuplexConnection) new LocalDuplexConnection(allocator, in, out, closeNotifier));
      });
}
 
源代码3 项目: rsocket-java   文件: ClientSetupRule.java
public ClientSetupRule(
    Supplier<T> addressSupplier,
    BiFunction<T, S, ClientTransport> clientTransportSupplier,
    Function<T, ServerTransport<S>> serverTransportSupplier) {
  this.addressSupplier = addressSupplier;

  this.serverInit =
      address ->
          RSocketServer.create((setup, rsocket) -> Mono.just(new TestRSocket(data, metadata)))
              .bind(serverTransportSupplier.apply(address))
              .block();

  this.clientConnector =
      (address, server) ->
          RSocketConnector.connectWith(clientTransportSupplier.apply(address, server))
              .doOnError(Throwable::printStackTrace)
              .block();
}
 
源代码4 项目: rsocket-java   文件: TransportTest.java
public TransportPair(
    Supplier<T> addressSupplier,
    BiFunction<T, S, ClientTransport> clientTransportSupplier,
    Function<T, ServerTransport<S>> serverTransportSupplier) {

  T address = addressSupplier.get();

  server =
      RSocketServer.create((setup, sendingSocket) -> Mono.just(new TestRSocket(data, metadata)))
          .bind(serverTransportSupplier.apply(address))
          .block();

  client =
      RSocketConnector.connectWith(clientTransportSupplier.apply(address, server))
          .doOnError(Throwable::printStackTrace)
          .block();
}
 
private ServerTransport<?> findServer(String uriString) {
  URI uri = URI.create(uriString);

  for (UriHandler h : handlers) {
    Optional<ServerTransport<?>> r = h.buildServer(uri);
    if (r.isPresent()) {
      return r.get();
    }
  }

  return FAILED_SERVER_LOOKUP;
}
 
源代码6 项目: alibaba-rsocket-broker   文件: LocalUriHandler.java
@Override
public Optional<ServerTransport<?>> buildServer(URI uri) {
  Objects.requireNonNull(uri, "uri must not be null");

  if (!SCHEME.equals(uri.getScheme())) {
    return Optional.empty();
  }

  return Optional.of(LocalServerTransport.create(uri.getSchemeSpecificPart()));
}
 
源代码7 项目: alibaba-rsocket-broker   文件: TcpUriHandler.java
@Override
public Optional<ServerTransport<?>> buildServer(URI uri) {
  Objects.requireNonNull(uri, "uri must not be null");

  if (!SCHEME.equals(uri.getScheme())) {
    return Optional.empty();
  }

  return Optional.of(
      TcpServerTransport.create(TcpServer.create().host(uri.getHost()).port(uri.getPort())));
}
 
@Override
public Optional<ServerTransport<?>> buildServer(URI uri) {
    Objects.requireNonNull(uri, "uri must not be null");

    if (SCHEME.stream().noneMatch(scheme -> scheme.equals(uri.getScheme()))) {
        return Optional.empty();
    }

    int port = isSecure(uri) ? getPort(uri, 443) : getPort(uri, 80);

    return Optional.of(WebsocketServerTransport.create(uri.getHost(), port));
}
 
源代码9 项目: rsocket-java   文件: RSocketServer.java
/**
 * An alternative to {@link #bind(ServerTransport)} that is useful for installing RSocket on a
 * server that is started independently.
 *
 * @see io.rsocket.examples.transport.ws.WebSocketHeadersSample
 */
public ServerTransport.ConnectionAcceptor asConnectionAcceptor() {
  return new ServerTransport.ConnectionAcceptor() {
    private final ServerSetup serverSetup = serverSetup();

    @Override
    public Mono<Void> apply(DuplexConnection connection) {
      return acceptor(serverSetup, connection);
    }
  };
}
 
@ParameterizedTest
@MethodSource("arguments")
void serverSucceedsWithEnabledFragmentationOnSufficientMtu(
    ServerTransport<CloseableChannel> serverTransport) {
  Mono<CloseableChannel> server =
      RSocketServer.create(mockAcceptor())
          .fragment(100)
          .bind(serverTransport)
          .doOnNext(CloseableChannel::dispose);
  StepVerifier.create(server).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
 
@ParameterizedTest
@MethodSource("arguments")
void serverSucceedsWithDisabledFragmentation(ServerTransport<CloseableChannel> serverTransport) {
  Mono<CloseableChannel> server =
      RSocketServer.create(mockAcceptor())
          .bind(serverTransport)
          .doOnNext(CloseableChannel::dispose);
  StepVerifier.create(server).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
 
@ParameterizedTest
@MethodSource("arguments")
void clientSucceedsWithEnabledFragmentationOnSufficientMtu(
    ServerTransport<CloseableChannel> serverTransport) {
  CloseableChannel server =
      RSocketServer.create(mockAcceptor()).fragment(100).bind(serverTransport).block();

  Mono<RSocket> rSocket =
      RSocketConnector.create()
          .fragment(100)
          .connect(TcpClientTransport.create(server.address()))
          .doFinally(s -> server.dispose());
  StepVerifier.create(rSocket).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
 
@ParameterizedTest
@MethodSource("arguments")
void clientSucceedsWithDisabledFragmentation(ServerTransport<CloseableChannel> serverTransport) {
  CloseableChannel server = RSocketServer.create(mockAcceptor()).bind(serverTransport).block();

  Mono<RSocket> rSocket =
      RSocketConnector.connectWith(TcpClientTransport.create(server.address()))
          .doFinally(s -> server.dispose());
  StepVerifier.create(rSocket).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
 
@Test
public void sendStreamOfDataWithExternalHttpServerTest() {
  ServerTransport.ConnectionAcceptor acceptor =
      RSocketServer.create(
              SocketAcceptor.forRequestStream(
                  payload ->
                      Flux.range(0, 10).map(i -> DefaultPayload.create(String.valueOf(i)))))
          .asConnectionAcceptor();

  DisposableServer server =
      HttpServer.create()
          .host("localhost")
          .route(router -> router.ws("/test", WebsocketRouteTransport.newHandler(acceptor)))
          .bindNow();

  RSocket rsocket =
      RSocketConnector.connectWith(
              WebsocketClientTransport.create(
                  URI.create("ws://" + server.host() + ":" + server.port() + "/test")))
          .block();

  StepVerifier.create(rsocket.requestStream(EmptyPayload.INSTANCE))
      .expectSubscription()
      .expectNextCount(10)
      .expectComplete()
      .verify(Duration.ofMillis(1000));
}
 
源代码15 项目: rsocket-java   文件: SetupRejectionTest.java
void rejectSetupTcp(
    Function<InetSocketAddress, ServerTransport<CloseableChannel>> serverTransport,
    Function<InetSocketAddress, ClientTransport> clientTransport) {

  String errorMessage = "error";
  RejectingAcceptor acceptor = new RejectingAcceptor(errorMessage);
  Mono<RSocket> serverRequester = acceptor.requesterRSocket();

  CloseableChannel channel =
      RSocketServer.create(acceptor)
          .bind(serverTransport.apply(new InetSocketAddress("localhost", 0)))
          .block(Duration.ofSeconds(5));

  ErrorConsumer errorConsumer = new ErrorConsumer();

  RSocket clientRequester =
      RSocketConnector.connectWith(clientTransport.apply(channel.address()))
          .doOnError(errorConsumer)
          .block(Duration.ofSeconds(5));

  StepVerifier.create(errorConsumer.errors().next())
      .expectNextMatches(
          err -> err instanceof RejectedSetupException && errorMessage.equals(err.getMessage()))
      .expectComplete()
      .verify(Duration.ofSeconds(5));

  StepVerifier.create(clientRequester.onClose()).expectComplete().verify(Duration.ofSeconds(5));

  StepVerifier.create(serverRequester.flatMap(socket -> socket.onClose()))
      .expectComplete()
      .verify(Duration.ofSeconds(5));

  StepVerifier.create(clientRequester.requestResponse(DefaultPayload.create("test")))
      .expectErrorMatches(
          err -> err instanceof RejectedSetupException && errorMessage.equals(err.getMessage()))
      .verify(Duration.ofSeconds(5));

  channel.dispose();
}
 
源代码16 项目: rsocket-java   文件: SetupRejectionTest.java
static Stream<Arguments> transports() {
  Function<InetSocketAddress, ServerTransport<CloseableChannel>> tcpServer =
      TcpServerTransport::create;
  Function<InetSocketAddress, ServerTransport<CloseableChannel>> wsServer =
      WebsocketServerTransport::create;
  Function<InetSocketAddress, ClientTransport> tcpClient = TcpClientTransport::create;
  Function<InetSocketAddress, ClientTransport> wsClient = WebsocketClientTransport::create;

  return Stream.of(Arguments.of(tcpServer, tcpClient), Arguments.of(wsServer, wsClient));
}
 
@Override
public void start() throws Exception {
    if (status != 1) {
        for (Map.Entry<Integer, String> entry : schemas.entrySet()) {
            String schema = entry.getValue();
            int port = entry.getKey();
            ServerTransport<?> transport;
            if (schema.equals("local")) {
                transport = LocalServerTransport.create("unittest");
            } else if (schema.equals("tcp")) {
                transport = TcpServerTransport.create(host, port);
            } else if (schema.equals("tcps")) {
                TcpServer tcpServer = TcpServer.create()
                        .host(host)
                        .port(port)
                        .secure(ssl -> ssl.sslContext(
                                SslContextBuilder.forServer(privateKey, (X509Certificate) certificate)
                                        .protocols(protocols)
                                        .sslProvider(getSslProvider())
                        ));
                transport = TcpServerTransport.create(tcpServer);
            } else if (schema.equals("ws")) {
                transport = WebsocketServerTransport.create(host, port);
            } else if (schema.equals("wss")) {
                HttpServer httpServer = HttpServer.create()
                        .host(host)
                        .port(port)
                        .secure(ssl -> ssl.sslContext(
                                SslContextBuilder.forServer(privateKey, (X509Certificate) certificate)
                                        .protocols(protocols)
                                        .sslProvider(getSslProvider())
                        ));
                transport = WebsocketServerTransport.create(httpServer);
            } else {
                transport = TcpServerTransport.create(host, port);
            }
            RSocketServer rsocketServer = RSocketServer.create();
            //acceptor interceptor
            for (SocketAcceptorInterceptor acceptorInterceptor : acceptorInterceptors) {
                rsocketServer.interceptors(interceptorRegistry -> {
                    interceptorRegistry.forSocketAcceptor(acceptorInterceptor);
                });
            }
            //connection interceptor
            for (DuplexConnectionInterceptor connectionInterceptor : connectionInterceptors) {
                rsocketServer.interceptors(interceptorRegistry -> {
                    interceptorRegistry.forConnection(connectionInterceptor);
                });

            }
            //responder interceptor
            for (RSocketInterceptor responderInterceptor : responderInterceptors) {
                rsocketServer.interceptors(interceptorRegistry -> {
                    interceptorRegistry.forResponder(responderInterceptor);
                });

            }
            Disposable disposable = rsocketServer
                    .acceptor(acceptor)
                    .bind(transport)
                    .onTerminateDetach()
                    .subscribe();
            responders.add(disposable);
            log.info(RsocketErrorCode.message("RST-100001", schema + "://" + host + ":" + port));
        }
        status = 1;
    }
}
 
public static ServerTransport<?> serverForUri(String uri) {
  return UriTransportRegistry.fromServices().findServer(uri);
}
 
public ReactiveSocketServer(ServerTransport transport, SocketAcceptor acceptor) {
	this.transport = transport;
	this.acceptor = acceptor;
}
 
@Bean
@ConditionalOnMissingBean(ServerTransport.class)
public ServerTransport transport(){
	logger.info("Creating transport : {} on [ {}:{} ]", TcpServerTransport.class.getName(), properties.getHost(), properties.getPort());
	return TcpServerTransport.create(properties.getHost(), properties.getPort());
}
 
源代码21 项目: rsocket-java   文件: WebSocketHeadersSample.java
public static void main(String[] args) {

    ServerTransport.ConnectionAcceptor connectionAcceptor =
        RSocketServer.create(SocketAcceptor.forRequestResponse(Mono::just))
            .payloadDecoder(PayloadDecoder.ZERO_COPY)
            .asConnectionAcceptor();

    DisposableServer server =
        HttpServer.create()
            .host("localhost")
            .port(0)
            .route(
                routes ->
                    routes.get(
                        "/",
                        (req, res) -> {
                          if (req.requestHeaders().containsValue("Authorization", "test", true)) {
                            return res.sendWebsocket(
                                (in, out) ->
                                    connectionAcceptor
                                        .apply(new WebsocketDuplexConnection((Connection) in))
                                        .then(out.neverComplete()));
                          }
                          res.status(HttpResponseStatus.UNAUTHORIZED);
                          return res.send();
                        }))
            .bindNow();

    logger.debug(
        "\n\nStart of Authorized WebSocket Connection\n----------------------------------\n");

    WebsocketClientTransport transport =
        WebsocketClientTransport.create(server.host(), server.port())
            .header("Authorization", "test");

    RSocket clientRSocket =
        RSocketConnector.create()
            .keepAlive(Duration.ofMinutes(10), Duration.ofMinutes(10))
            .payloadDecoder(PayloadDecoder.ZERO_COPY)
            .connect(transport)
            .block();

    Flux.range(1, 100)
        .concatMap(i -> clientRSocket.requestResponse(ByteBufPayload.create("Hello " + i)))
        .doOnNext(payload -> logger.debug("Processed " + payload.getDataUtf8()))
        .blockLast();
    clientRSocket.dispose();

    logger.debug(
        "\n\nStart of Unauthorized WebSocket Upgrade\n----------------------------------\n");

    RSocketConnector.create()
        .keepAlive(Duration.ofMinutes(10), Duration.ofMinutes(10))
        .payloadDecoder(PayloadDecoder.ZERO_COPY)
        .connect(WebsocketClientTransport.create(server.host(), server.port()))
        .block();
  }
 
源代码22 项目: rsocket-java   文件: ResumeIntegrationTest.java
static ServerTransport<CloseableChannel> serverTransport(String host, int port) {
  return TcpServerTransport.create(host, port);
}
 
源代码23 项目: rsocket-java   文件: RSocketFactory.java
@Override
public ServerTransport.ConnectionAcceptor toConnectionAcceptor() {
  return server.asConnectionAcceptor();
}
 
static Stream<? extends ServerTransport<CloseableChannel>> arguments() {
  return Stream.of(TcpServerTransport.create(0), WebsocketServerTransport.create(0));
}
 
@ParameterizedTest
@MethodSource("provideServerTransport")
void webSocketPingPong(ServerTransport<Closeable> serverTransport) {
  server =
      RSocketServer.create(SocketAcceptor.forRequestResponse(Mono::just))
          .bind(serverTransport)
          .block();

  String expectedData = "data";
  String expectedPing = "ping";

  PingSender pingSender = new PingSender();

  HttpClient httpClient =
      HttpClient.create()
          .tcpConfiguration(
              tcpClient ->
                  tcpClient
                      .doOnConnected(b -> b.addHandlerLast(pingSender))
                      .host(host)
                      .port(port));

  RSocket rSocket =
      RSocketConnector.connectWith(WebsocketClientTransport.create(httpClient, "/")).block();

  rSocket
      .requestResponse(DefaultPayload.create(expectedData))
      .delaySubscription(pingSender.sendPing(expectedPing))
      .as(StepVerifier::create)
      .expectNextMatches(p -> expectedData.equals(p.getDataUtf8()))
      .expectComplete()
      .verify(Duration.ofSeconds(5));

  pingSender
      .receivePong()
      .as(StepVerifier::create)
      .expectNextMatches(expectedPing::equals)
      .expectComplete()
      .verify(Duration.ofSeconds(5));

  rSocket
      .requestResponse(DefaultPayload.create(expectedData))
      .delaySubscription(pingSender.sendPong())
      .as(StepVerifier::create)
      .expectNextMatches(p -> expectedData.equals(p.getDataUtf8()))
      .expectComplete()
      .verify(Duration.ofSeconds(5));
}
 
源代码26 项目: rsocket-java   文件: RSocketServer.java
/**
 * Start the server on the given transport.
 *
 * <p>The following transports are available from additional RSocket Java modules:
 *
 * <ul>
 *   <li>{@link io.rsocket.transport.netty.client.TcpServerTransport TcpServerTransport} via
 *       {@code rsocket-transport-netty}.
 *   <li>{@link io.rsocket.transport.netty.client.WebsocketServerTransport
 *       WebsocketServerTransport} via {@code rsocket-transport-netty}.
 *   <li>{@link io.rsocket.transport.local.LocalServerTransport LocalServerTransport} via {@code
 *       rsocket-transport-local}
 * </ul>
 *
 * @param transport the transport of choice to connect with
 * @param <T> the type of {@code Closeable} for the given transport
 * @return a {@code Mono} with a {@code Closeable} that can be used to obtain information about
 *     the server, stop it, or be notified of when it is stopped.
 */
public <T extends Closeable> Mono<T> bind(ServerTransport<T> transport) {
  return Mono.defer(
      new Supplier<Mono<T>>() {
        ServerSetup serverSetup = serverSetup();

        @Override
        public Mono<T> get() {
          return transport
              .start(duplexConnection -> acceptor(serverSetup, duplexConnection))
              .doOnNext(c -> c.onClose().doFinally(v -> serverSetup.dispose()).subscribe());
        }
      });
}
 
源代码27 项目: alibaba-rsocket-broker   文件: UriHandler.java
/**
 * Returns an implementation of {@link ServerTransport} unambiguously mapped to a {@link URI},
 * otherwise {@link Optional#empty()}.
 *
 * @param uri the uri to map
 * @return an implementation of {@link ServerTransport} unambiguously mapped to a {@link URI}, *
 *     otherwise {@link Optional#empty()}
 * @throws NullPointerException if {@code uri} is {@code null}
 */
Optional<ServerTransport<?>> buildServer(URI uri);
 
源代码28 项目: rsocket-java   文件: RSocketServer.java
/**
 * Start the server on the given transport. Effectively is a shortcut for {@code
 * .bind(ServerTransport).block()}
 */
public <T extends Closeable> T bindNow(ServerTransport<T> transport) {
  return bind(transport).block();
}
 
源代码29 项目: rsocket-java   文件: RSocketFactory.java
ServerTransport.ConnectionAcceptor toConnectionAcceptor();