下面列出了org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#io.rsocket.RSocketFactory 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Mono<RSocketRequester> connect(ClientTransport transport) {
return Mono.defer(() -> {
RSocketStrategies strategies = getRSocketStrategies();
MimeType dataMimeType = getDefaultDataMimeType(strategies);
RSocketFactory.ClientRSocketFactory factory = RSocketFactory.connect();
if (dataMimeType != null) {
factory.dataMimeType(dataMimeType.toString());
}
this.factoryConfigurers.forEach(configurer -> configurer.accept(factory));
return factory.transport(transport).start()
.map(rsocket -> new DefaultRSocketRequester(rsocket, dataMimeType, strategies));
});
}
@BeforeClass
@SuppressWarnings("ConstantConditions")
public static void setupOnce() {
context = new AnnotationConfigApplicationContext(ServerConfig.class);
server = RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.addServerPlugin(payloadInterceptor) // intercept responding
.acceptor(context.getBean(MessageHandlerAcceptor.class))
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.block();
requester = RSocketRequester.builder()
.rsocketFactory(factory -> {
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
factory.addClientPlugin(payloadInterceptor); // intercept outgoing requests
})
.rsocketStrategies(context.getBean(RSocketStrategies.class))
.connectTcp("localhost", 7000)
.block();
}
private static void connectAndVerify(String destination) {
ServerController serverController = context.getBean(ServerController.class);
serverController.reset();
RSocket rsocket = null;
try {
rsocket = RSocketFactory.connect()
.setupPayload(DefaultPayload.create("", destination))
.dataMimeType("text/plain")
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(context.getBean("clientAcceptor", MessageHandlerAcceptor.class))
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block();
serverController.await(Duration.ofSeconds(5));
}
finally {
if (rsocket != null) {
rsocket.dispose();
}
}
}
@BeforeClass
@SuppressWarnings("ConstantConditions")
public static void setupOnce() {
context = new AnnotationConfigApplicationContext(ServerConfig.class);
server = RSocketFactory.receive()
.addServerPlugin(interceptor)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(context.getBean(MessageHandlerAcceptor.class))
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.block();
requester = RSocketRequester.builder()
.rsocketFactory(factory -> factory.frameDecoder(PayloadDecoder.ZERO_COPY))
.rsocketStrategies(context.getBean(RSocketStrategies.class))
.connectTcp("localhost", 7000)
.block();
}
static Flux<?> run(Args args) {
if (args.debug()) {
configureDebugLevel("io.rsocket.FrameLogger");
}
args.log().ifPresent(Rsc::configureDebugLevel);
final ClientTransport clientTransport = args.clientTransport();
final RSocketFactory.ClientRSocketFactory factory = RSocketFactory.connect();
args.resume().ifPresent(duration -> factory.resume().resumeSessionDuration(duration)
.resumeStrategy(() -> new PeriodicResumeStrategy(Duration.ofSeconds(5))));
args.setup().map(DefaultPayload::create).ifPresent(factory::setupPayload);
return factory //
.frameDecoder(PayloadDecoder.ZERO_COPY) //
.metadataMimeType(args.composeMetadata().getT1()) //
.dataMimeType(args.dataMimeType()) //
.transport(clientTransport) //
.start() //
.flatMapMany(rsocket -> args.interactionModel().request(rsocket, args));
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
ConfigurableEnvironment env = event.getApplicationContext().getEnvironment();
Integer pongDelay = env.getProperty("pong.delay", Integer.class, 5000);
try {
Thread.sleep(pongDelay);
}
catch (InterruptedException e) {
e.printStackTrace();
}
log.info("Starting Pong");
Integer gatewayPort = env.getProperty("spring.rsocket.server.port",
Integer.class, 7002);
MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(
meterRegistry, Tag.of("component", "pong"));
ByteBuf announcementMetadata = getRouteSetupMetadata(strategies, "pong", 3L);
RSocketFactory.connect().metadataMimeType(COMPOSITE_MIME_TYPE.toString())
.setupPayload(
DefaultPayload.create(EMPTY_BUFFER, announcementMetadata))
.addRequesterPlugin(interceptor).acceptor(this::accept)
.transport(TcpClientTransport.create(gatewayPort)) // proxy
.start().block();
}
@SuppressWarnings("unchecked")
private RSocketFactory.Start<CloseableChannel> createRSocketStarter(HttpHandler httpHandler) {
RSocketFactory.ServerRSocketFactory rSocketFactory = applyCustomizers(RSocketFactory.receive());
HttpServer httpServer = createHttpServer();
ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
return rSocketFactory
.acceptor(socketAcceptor)
.transport((ServerTransport) new WebsocketRouteTransport(
httpServer,
r -> r.route(hsr -> !("/" + hsr.path()).equals(path), handlerAdapter),
path
));
}
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
RSocket rSocket = RSocketFactory.connect()
.transport(WebsocketClientTransport.create(
HttpClient.from(TcpClient.create()
.host("localhost")
.port(8080)),
"/rsocket"
))
.start()
.block();
logger.info(
rSocket.requestResponse(DefaultPayload.create("HelloWorld"))
.map(Payload::getDataUtf8)
.block()
);
}
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
RSocket rSocket = RSocketFactory.connect()
.transport(WebsocketClientTransport.create(
HttpClient.from(TcpClient.create()
.host("localhost")
.port(8080)),
"/rsocket-rpc"
))
.start()
.block();
GreeterClient client = new GreeterClient(rSocket);
client.streamGreet(HelloRequest.newBuilder().setName("Jon Doe").build())
.log()
.blockLast();
client.requestGreet(HelloRequest.newBuilder().setName("Arthur Conan Doyle").build())
.log()
.block();
}
public Flux<Reservation> getAllReservations() {
return RSocketFactory
.connect()
.transport(this.localhost)
.start()
.flatMapMany(socket ->
socket
.requestStream(DefaultPayload.create(new byte[0]))
.map(Payload::getDataUtf8)
.map(obj -> {
try {
return this.objectMapper
.readValue(obj, Reservation.class);
}
catch (IOException e) {
throw new RuntimeException(e);
}
})
.doFinally(signal -> socket.dispose())
);
}
@EventListener(ApplicationReadyEvent.class)
public void serve() throws Exception {
var abstractRSocket = new AbstractRSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
return reservationRepository.findAll()
.map(RsocketServer.this::toJson)
.map(DefaultPayload::create);
}
};
SocketAcceptor socketAcceptor = (connectionSetupPayload, rSocket) -> Mono.just(abstractRSocket);
RSocketFactory
.receive()
.acceptor(socketAcceptor)
.transport(this.tcp)
.start()
.subscribe();
}
@Override
public void start() {
try{
this.lifecycleMonitor.lock();
if(!running){
logger.info("Starting RSocket server using transport: {} ", this.transport.getClass().getName());
this.disposable = RSocketFactory.receive()
.acceptor(acceptor)
.transport(transport)
.start()
.subscribe();
running = true;
}
}finally {
lifecycleMonitor.unlock();
}
}
public static void main(String... args) {
String host = System.getProperty("host", "127.0.0.1");
int port = Integer.getInteger("port", 8001);
RSocketFactory.receive()
.frameDecoder(Frame::retain)
.acceptor(
(setup, sendingSocket) ->
Mono.just(
new SimpleServiceServer(
new DefaultService(), Optional.empty(), Optional.empty())))
.transport(TcpServerTransport.create(host, port))
.start()
.block()
.onClose()
.doOnSubscribe(s -> logger.info("server started"))
.block();
}
@BeforeClass
@SuppressWarnings("ConstantConditions")
public static void setupOnce() {
context = new AnnotationConfigApplicationContext(RSocketConfig.class);
server = RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(context.getBean("serverAcceptor", MessageHandlerAcceptor.class))
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.block();
}
@Test
@SuppressWarnings("unchecked")
public void shouldApplyCustomizationsAtSubscription() {
Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class);
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
RSocketRequester.builder()
.rsocketFactory(factoryConfigurer)
.rsocketStrategies(strategiesConfigurer)
.connect(this.transport);
verifyZeroInteractions(this.transport, factoryConfigurer, strategiesConfigurer);
}
@Test
@SuppressWarnings("unchecked")
public void shouldApplyCustomizations() {
Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class);
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
RSocketRequester.builder()
.rsocketFactory(factoryConfigurer)
.rsocketStrategies(strategiesConfigurer)
.connect(this.transport)
.block();
verify(this.transport).connect(anyInt());
verify(factoryConfigurer).accept(any(RSocketFactory.ClientRSocketFactory.class));
verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
}
public RSocketService(RSocketConfigProps configProps, ItemSocketAcceptor itemSocketAcceptor) {
log.info("Starting RSocket");
TcpServerTransport tcp = TcpServerTransport.create("0.0.0.0", configProps.getPort());
RSocketFactory.receive()
.acceptor(itemSocketAcceptor)
.transport(tcp)
.start().log()
.subscribe(channel -> log.info("RSocket initialized on port " + configProps.getPort()));
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
log.info("Starting Ping" + id);
ConfigurableEnvironment env = event.getApplicationContext().getEnvironment();
Integer take = env.getProperty("ping.take", Integer.class, null);
Integer gatewayPort = env.getProperty("spring.rsocket.server.port",
Integer.class, 7002);
log.debug("ping.take: " + take);
MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(
meterRegistry, Tag.of("component", "ping"));
ByteBuf metadata = getRouteSetupMetadata(strategies, "ping", id);
Payload setupPayload = DefaultPayload.create(EMPTY_BUFFER, metadata);
pongFlux = RSocketFactory.connect().frameDecoder(PayloadDecoder.ZERO_COPY)
.metadataMimeType(COMPOSITE_MIME_TYPE.toString())
.setupPayload(setupPayload).addRequesterPlugin(interceptor)
.transport(TcpClientTransport.create(gatewayPort)) // proxy
.start().log("startPing" + id)
.flatMapMany(socket -> doPing(take, socket)).cast(String.class)
.doOnSubscribe(o -> {
if (log.isDebugEnabled()) {
log.debug("ping doOnSubscribe");
}
});
boolean subscribe = env.getProperty("ping.subscribe", Boolean.class, true);
if (subscribe) {
pongFlux.subscribe();
}
}
@Override
public void initialize(GenericApplicationContext applicationContext) {
var environment = applicationContext.getEnvironment();
if (!environment.acceptsProfiles(Profiles.of("gateway"))) {
return;
}
var serverProperties = PropertiesUtil.bind(environment, new RSocketServerProperties());
if (!serverProperties.isEnabled()) {
return;
}
applicationContext.registerBean(RSocketLiiklusService.class);
applicationContext.registerBean(
CloseableChannel.class,
() -> {
var liiklusService = applicationContext.getBean(LiiklusService.class);
return RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(new LiiklusServiceServer(liiklusService, Optional.empty(), Optional.empty()))))
.transport(TcpServerTransport.create(serverProperties.getHost(), serverProperties.getPort()))
.start()
.block();
},
it -> {
it.setDestroyMethodName("dispose");
}
);
}
Flux<Reservation> getAllReservations() {
return RSocketFactory
.connect()
.transport(this.tcpClientTransport)
.start()
.flatMapMany(rs ->
rs.requestStream(DefaultPayload.create(new byte[0]))
.map(Payload::getDataUtf8)
.map(this::to)
);
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
SocketAcceptor sa =
(connectionSetupPayload, rSocket) -> Mono.just(new AbstractRSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
return
reservationRepository
.findAll()
.map(x -> {
try {
return mapper.writeValueAsString(x);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.map(DefaultPayload::create);
}
});
RSocketFactory
.receive()
.acceptor(sa)
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.onTerminateDetach()
.subscribe();
}
@Bean
RSocket rSocket() {
return RSocketFactory
.connect()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
public static void main(String[] args){
UserServiceServer userServiceServer = new UserServiceServer(new UserServiceRsocketServerImpl(), Optional.empty(), Optional.empty());
CloseableChannel closeableChannel =
RSocketFactory.receive()
.acceptor(
(setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(userServiceServer)))
.transport(TcpServerTransport.create(8080))
.start()
.block();
// Block so we don't exit
closeableChannel.onClose().block();
}
public Server() {
this.server = RSocketFactory.receive()
.acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
.transport(TcpServerTransport.create("localhost", TCP_PORT))
.start()
.doOnNext(x -> LOG.info("Server started"))
.subscribe();
this.gameController = new GameController("Server Player");
}
public ChannelClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
this.gameController = new GameController("Client Player");
}
public FireNForgetClient() {
this.socket = RSocketFactory.connect()
.transport(TcpClientTransport.create("localhost", TCP_PORT))
.start()
.block();
this.data = Collections.unmodifiableList(generateData());
}
@Bean
public RSocket rSocket() {
return RSocketFactory.connect()
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Bean
@Lazy
public RSocket rSocket() {
return RSocketFactory.connect()
.mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(7000))
.start()
.block();
}
@Override
public RSocketRequester.Builder rsocketFactory(Consumer<RSocketFactory.ClientRSocketFactory> configurer) {
this.factoryConfigurers.add(configurer);
return this;
}
private RSocketFactory.ServerRSocketFactory applyCustomizers(RSocketFactory.ServerRSocketFactory server) {
for (RSocketReceiverCustomizer customizer : this.rSocketCustomizers) {
server = customizer.apply(server);
}
return server;
}