下面列出了org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#io.rsocket.transport.ServerTransport 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("unchecked")
private RSocketFactory.Start<CloseableChannel> createRSocketStarter(HttpHandler httpHandler) {
RSocketFactory.ServerRSocketFactory rSocketFactory = applyCustomizers(RSocketFactory.receive());
HttpServer httpServer = createHttpServer();
ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
return rSocketFactory
.acceptor(socketAcceptor)
.transport((ServerTransport) new WebsocketRouteTransport(
httpServer,
r -> r.route(hsr -> !("/" + hsr.path()).equals(path), handlerAdapter),
path
));
}
@Override
public Mono<DuplexConnection> connect() {
return Mono.defer(
() -> {
ServerTransport.ConnectionAcceptor server = LocalServerTransport.findServer(name);
if (server == null) {
return Mono.error(new IllegalArgumentException("Could not find server: " + name));
}
UnboundedProcessor<ByteBuf> in = new UnboundedProcessor<>();
UnboundedProcessor<ByteBuf> out = new UnboundedProcessor<>();
MonoProcessor<Void> closeNotifier = MonoProcessor.create();
server.apply(new LocalDuplexConnection(allocator, out, in, closeNotifier)).subscribe();
return Mono.just(
(DuplexConnection) new LocalDuplexConnection(allocator, in, out, closeNotifier));
});
}
public ClientSetupRule(
Supplier<T> addressSupplier,
BiFunction<T, S, ClientTransport> clientTransportSupplier,
Function<T, ServerTransport<S>> serverTransportSupplier) {
this.addressSupplier = addressSupplier;
this.serverInit =
address ->
RSocketServer.create((setup, rsocket) -> Mono.just(new TestRSocket(data, metadata)))
.bind(serverTransportSupplier.apply(address))
.block();
this.clientConnector =
(address, server) ->
RSocketConnector.connectWith(clientTransportSupplier.apply(address, server))
.doOnError(Throwable::printStackTrace)
.block();
}
public TransportPair(
Supplier<T> addressSupplier,
BiFunction<T, S, ClientTransport> clientTransportSupplier,
Function<T, ServerTransport<S>> serverTransportSupplier) {
T address = addressSupplier.get();
server =
RSocketServer.create((setup, sendingSocket) -> Mono.just(new TestRSocket(data, metadata)))
.bind(serverTransportSupplier.apply(address))
.block();
client =
RSocketConnector.connectWith(clientTransportSupplier.apply(address, server))
.doOnError(Throwable::printStackTrace)
.block();
}
private ServerTransport<?> findServer(String uriString) {
URI uri = URI.create(uriString);
for (UriHandler h : handlers) {
Optional<ServerTransport<?>> r = h.buildServer(uri);
if (r.isPresent()) {
return r.get();
}
}
return FAILED_SERVER_LOOKUP;
}
@Override
public Optional<ServerTransport<?>> buildServer(URI uri) {
Objects.requireNonNull(uri, "uri must not be null");
if (!SCHEME.equals(uri.getScheme())) {
return Optional.empty();
}
return Optional.of(LocalServerTransport.create(uri.getSchemeSpecificPart()));
}
@Override
public Optional<ServerTransport<?>> buildServer(URI uri) {
Objects.requireNonNull(uri, "uri must not be null");
if (!SCHEME.equals(uri.getScheme())) {
return Optional.empty();
}
return Optional.of(
TcpServerTransport.create(TcpServer.create().host(uri.getHost()).port(uri.getPort())));
}
@Override
public Optional<ServerTransport<?>> buildServer(URI uri) {
Objects.requireNonNull(uri, "uri must not be null");
if (SCHEME.stream().noneMatch(scheme -> scheme.equals(uri.getScheme()))) {
return Optional.empty();
}
int port = isSecure(uri) ? getPort(uri, 443) : getPort(uri, 80);
return Optional.of(WebsocketServerTransport.create(uri.getHost(), port));
}
/**
* An alternative to {@link #bind(ServerTransport)} that is useful for installing RSocket on a
* server that is started independently.
*
* @see io.rsocket.examples.transport.ws.WebSocketHeadersSample
*/
public ServerTransport.ConnectionAcceptor asConnectionAcceptor() {
return new ServerTransport.ConnectionAcceptor() {
private final ServerSetup serverSetup = serverSetup();
@Override
public Mono<Void> apply(DuplexConnection connection) {
return acceptor(serverSetup, connection);
}
};
}
@ParameterizedTest
@MethodSource("arguments")
void serverSucceedsWithEnabledFragmentationOnSufficientMtu(
ServerTransport<CloseableChannel> serverTransport) {
Mono<CloseableChannel> server =
RSocketServer.create(mockAcceptor())
.fragment(100)
.bind(serverTransport)
.doOnNext(CloseableChannel::dispose);
StepVerifier.create(server).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
@ParameterizedTest
@MethodSource("arguments")
void serverSucceedsWithDisabledFragmentation(ServerTransport<CloseableChannel> serverTransport) {
Mono<CloseableChannel> server =
RSocketServer.create(mockAcceptor())
.bind(serverTransport)
.doOnNext(CloseableChannel::dispose);
StepVerifier.create(server).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
@ParameterizedTest
@MethodSource("arguments")
void clientSucceedsWithEnabledFragmentationOnSufficientMtu(
ServerTransport<CloseableChannel> serverTransport) {
CloseableChannel server =
RSocketServer.create(mockAcceptor()).fragment(100).bind(serverTransport).block();
Mono<RSocket> rSocket =
RSocketConnector.create()
.fragment(100)
.connect(TcpClientTransport.create(server.address()))
.doFinally(s -> server.dispose());
StepVerifier.create(rSocket).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
@ParameterizedTest
@MethodSource("arguments")
void clientSucceedsWithDisabledFragmentation(ServerTransport<CloseableChannel> serverTransport) {
CloseableChannel server = RSocketServer.create(mockAcceptor()).bind(serverTransport).block();
Mono<RSocket> rSocket =
RSocketConnector.connectWith(TcpClientTransport.create(server.address()))
.doFinally(s -> server.dispose());
StepVerifier.create(rSocket).expectNextCount(1).expectComplete().verify(Duration.ofSeconds(5));
}
@Test
public void sendStreamOfDataWithExternalHttpServerTest() {
ServerTransport.ConnectionAcceptor acceptor =
RSocketServer.create(
SocketAcceptor.forRequestStream(
payload ->
Flux.range(0, 10).map(i -> DefaultPayload.create(String.valueOf(i)))))
.asConnectionAcceptor();
DisposableServer server =
HttpServer.create()
.host("localhost")
.route(router -> router.ws("/test", WebsocketRouteTransport.newHandler(acceptor)))
.bindNow();
RSocket rsocket =
RSocketConnector.connectWith(
WebsocketClientTransport.create(
URI.create("ws://" + server.host() + ":" + server.port() + "/test")))
.block();
StepVerifier.create(rsocket.requestStream(EmptyPayload.INSTANCE))
.expectSubscription()
.expectNextCount(10)
.expectComplete()
.verify(Duration.ofMillis(1000));
}
void rejectSetupTcp(
Function<InetSocketAddress, ServerTransport<CloseableChannel>> serverTransport,
Function<InetSocketAddress, ClientTransport> clientTransport) {
String errorMessage = "error";
RejectingAcceptor acceptor = new RejectingAcceptor(errorMessage);
Mono<RSocket> serverRequester = acceptor.requesterRSocket();
CloseableChannel channel =
RSocketServer.create(acceptor)
.bind(serverTransport.apply(new InetSocketAddress("localhost", 0)))
.block(Duration.ofSeconds(5));
ErrorConsumer errorConsumer = new ErrorConsumer();
RSocket clientRequester =
RSocketConnector.connectWith(clientTransport.apply(channel.address()))
.doOnError(errorConsumer)
.block(Duration.ofSeconds(5));
StepVerifier.create(errorConsumer.errors().next())
.expectNextMatches(
err -> err instanceof RejectedSetupException && errorMessage.equals(err.getMessage()))
.expectComplete()
.verify(Duration.ofSeconds(5));
StepVerifier.create(clientRequester.onClose()).expectComplete().verify(Duration.ofSeconds(5));
StepVerifier.create(serverRequester.flatMap(socket -> socket.onClose()))
.expectComplete()
.verify(Duration.ofSeconds(5));
StepVerifier.create(clientRequester.requestResponse(DefaultPayload.create("test")))
.expectErrorMatches(
err -> err instanceof RejectedSetupException && errorMessage.equals(err.getMessage()))
.verify(Duration.ofSeconds(5));
channel.dispose();
}
static Stream<Arguments> transports() {
Function<InetSocketAddress, ServerTransport<CloseableChannel>> tcpServer =
TcpServerTransport::create;
Function<InetSocketAddress, ServerTransport<CloseableChannel>> wsServer =
WebsocketServerTransport::create;
Function<InetSocketAddress, ClientTransport> tcpClient = TcpClientTransport::create;
Function<InetSocketAddress, ClientTransport> wsClient = WebsocketClientTransport::create;
return Stream.of(Arguments.of(tcpServer, tcpClient), Arguments.of(wsServer, wsClient));
}
@Override
public void start() throws Exception {
if (status != 1) {
for (Map.Entry<Integer, String> entry : schemas.entrySet()) {
String schema = entry.getValue();
int port = entry.getKey();
ServerTransport<?> transport;
if (schema.equals("local")) {
transport = LocalServerTransport.create("unittest");
} else if (schema.equals("tcp")) {
transport = TcpServerTransport.create(host, port);
} else if (schema.equals("tcps")) {
TcpServer tcpServer = TcpServer.create()
.host(host)
.port(port)
.secure(ssl -> ssl.sslContext(
SslContextBuilder.forServer(privateKey, (X509Certificate) certificate)
.protocols(protocols)
.sslProvider(getSslProvider())
));
transport = TcpServerTransport.create(tcpServer);
} else if (schema.equals("ws")) {
transport = WebsocketServerTransport.create(host, port);
} else if (schema.equals("wss")) {
HttpServer httpServer = HttpServer.create()
.host(host)
.port(port)
.secure(ssl -> ssl.sslContext(
SslContextBuilder.forServer(privateKey, (X509Certificate) certificate)
.protocols(protocols)
.sslProvider(getSslProvider())
));
transport = WebsocketServerTransport.create(httpServer);
} else {
transport = TcpServerTransport.create(host, port);
}
RSocketServer rsocketServer = RSocketServer.create();
//acceptor interceptor
for (SocketAcceptorInterceptor acceptorInterceptor : acceptorInterceptors) {
rsocketServer.interceptors(interceptorRegistry -> {
interceptorRegistry.forSocketAcceptor(acceptorInterceptor);
});
}
//connection interceptor
for (DuplexConnectionInterceptor connectionInterceptor : connectionInterceptors) {
rsocketServer.interceptors(interceptorRegistry -> {
interceptorRegistry.forConnection(connectionInterceptor);
});
}
//responder interceptor
for (RSocketInterceptor responderInterceptor : responderInterceptors) {
rsocketServer.interceptors(interceptorRegistry -> {
interceptorRegistry.forResponder(responderInterceptor);
});
}
Disposable disposable = rsocketServer
.acceptor(acceptor)
.bind(transport)
.onTerminateDetach()
.subscribe();
responders.add(disposable);
log.info(RsocketErrorCode.message("RST-100001", schema + "://" + host + ":" + port));
}
status = 1;
}
}
public static ServerTransport<?> serverForUri(String uri) {
return UriTransportRegistry.fromServices().findServer(uri);
}
public ReactiveSocketServer(ServerTransport transport, SocketAcceptor acceptor) {
this.transport = transport;
this.acceptor = acceptor;
}
@Bean
@ConditionalOnMissingBean(ServerTransport.class)
public ServerTransport transport(){
logger.info("Creating transport : {} on [ {}:{} ]", TcpServerTransport.class.getName(), properties.getHost(), properties.getPort());
return TcpServerTransport.create(properties.getHost(), properties.getPort());
}
public static void main(String[] args) {
ServerTransport.ConnectionAcceptor connectionAcceptor =
RSocketServer.create(SocketAcceptor.forRequestResponse(Mono::just))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.asConnectionAcceptor();
DisposableServer server =
HttpServer.create()
.host("localhost")
.port(0)
.route(
routes ->
routes.get(
"/",
(req, res) -> {
if (req.requestHeaders().containsValue("Authorization", "test", true)) {
return res.sendWebsocket(
(in, out) ->
connectionAcceptor
.apply(new WebsocketDuplexConnection((Connection) in))
.then(out.neverComplete()));
}
res.status(HttpResponseStatus.UNAUTHORIZED);
return res.send();
}))
.bindNow();
logger.debug(
"\n\nStart of Authorized WebSocket Connection\n----------------------------------\n");
WebsocketClientTransport transport =
WebsocketClientTransport.create(server.host(), server.port())
.header("Authorization", "test");
RSocket clientRSocket =
RSocketConnector.create()
.keepAlive(Duration.ofMinutes(10), Duration.ofMinutes(10))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(transport)
.block();
Flux.range(1, 100)
.concatMap(i -> clientRSocket.requestResponse(ByteBufPayload.create("Hello " + i)))
.doOnNext(payload -> logger.debug("Processed " + payload.getDataUtf8()))
.blockLast();
clientRSocket.dispose();
logger.debug(
"\n\nStart of Unauthorized WebSocket Upgrade\n----------------------------------\n");
RSocketConnector.create()
.keepAlive(Duration.ofMinutes(10), Duration.ofMinutes(10))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(WebsocketClientTransport.create(server.host(), server.port()))
.block();
}
static ServerTransport<CloseableChannel> serverTransport(String host, int port) {
return TcpServerTransport.create(host, port);
}
@Override
public ServerTransport.ConnectionAcceptor toConnectionAcceptor() {
return server.asConnectionAcceptor();
}
static Stream<? extends ServerTransport<CloseableChannel>> arguments() {
return Stream.of(TcpServerTransport.create(0), WebsocketServerTransport.create(0));
}
@ParameterizedTest
@MethodSource("provideServerTransport")
void webSocketPingPong(ServerTransport<Closeable> serverTransport) {
server =
RSocketServer.create(SocketAcceptor.forRequestResponse(Mono::just))
.bind(serverTransport)
.block();
String expectedData = "data";
String expectedPing = "ping";
PingSender pingSender = new PingSender();
HttpClient httpClient =
HttpClient.create()
.tcpConfiguration(
tcpClient ->
tcpClient
.doOnConnected(b -> b.addHandlerLast(pingSender))
.host(host)
.port(port));
RSocket rSocket =
RSocketConnector.connectWith(WebsocketClientTransport.create(httpClient, "/")).block();
rSocket
.requestResponse(DefaultPayload.create(expectedData))
.delaySubscription(pingSender.sendPing(expectedPing))
.as(StepVerifier::create)
.expectNextMatches(p -> expectedData.equals(p.getDataUtf8()))
.expectComplete()
.verify(Duration.ofSeconds(5));
pingSender
.receivePong()
.as(StepVerifier::create)
.expectNextMatches(expectedPing::equals)
.expectComplete()
.verify(Duration.ofSeconds(5));
rSocket
.requestResponse(DefaultPayload.create(expectedData))
.delaySubscription(pingSender.sendPong())
.as(StepVerifier::create)
.expectNextMatches(p -> expectedData.equals(p.getDataUtf8()))
.expectComplete()
.verify(Duration.ofSeconds(5));
}
/**
* Start the server on the given transport.
*
* <p>The following transports are available from additional RSocket Java modules:
*
* <ul>
* <li>{@link io.rsocket.transport.netty.client.TcpServerTransport TcpServerTransport} via
* {@code rsocket-transport-netty}.
* <li>{@link io.rsocket.transport.netty.client.WebsocketServerTransport
* WebsocketServerTransport} via {@code rsocket-transport-netty}.
* <li>{@link io.rsocket.transport.local.LocalServerTransport LocalServerTransport} via {@code
* rsocket-transport-local}
* </ul>
*
* @param transport the transport of choice to connect with
* @param <T> the type of {@code Closeable} for the given transport
* @return a {@code Mono} with a {@code Closeable} that can be used to obtain information about
* the server, stop it, or be notified of when it is stopped.
*/
public <T extends Closeable> Mono<T> bind(ServerTransport<T> transport) {
return Mono.defer(
new Supplier<Mono<T>>() {
ServerSetup serverSetup = serverSetup();
@Override
public Mono<T> get() {
return transport
.start(duplexConnection -> acceptor(serverSetup, duplexConnection))
.doOnNext(c -> c.onClose().doFinally(v -> serverSetup.dispose()).subscribe());
}
});
}
/**
* Returns an implementation of {@link ServerTransport} unambiguously mapped to a {@link URI},
* otherwise {@link Optional#empty()}.
*
* @param uri the uri to map
* @return an implementation of {@link ServerTransport} unambiguously mapped to a {@link URI}, *
* otherwise {@link Optional#empty()}
* @throws NullPointerException if {@code uri} is {@code null}
*/
Optional<ServerTransport<?>> buildServer(URI uri);
/**
* Start the server on the given transport. Effectively is a shortcut for {@code
* .bind(ServerTransport).block()}
*/
public <T extends Closeable> T bindNow(ServerTransport<T> transport) {
return bind(transport).block();
}
ServerTransport.ConnectionAcceptor toConnectionAcceptor();