org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#io.rsocket.RSocketFactory源码实例Demo

下面列出了org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#io.rsocket.RSocketFactory 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Override
public Mono<RSocketRequester> connect(ClientTransport transport) {
	return Mono.defer(() -> {
		RSocketStrategies strategies = getRSocketStrategies();
		MimeType dataMimeType = getDefaultDataMimeType(strategies);

		RSocketFactory.ClientRSocketFactory factory = RSocketFactory.connect();
		if (dataMimeType != null) {
			factory.dataMimeType(dataMimeType.toString());
		}
		this.factoryConfigurers.forEach(configurer -> configurer.accept(factory));

		return factory.transport(transport).start()
				.map(rsocket -> new DefaultRSocketRequester(rsocket, dataMimeType, strategies));
	});
}
 
@BeforeClass
@SuppressWarnings("ConstantConditions")
public static void setupOnce() {
	context = new AnnotationConfigApplicationContext(ServerConfig.class);

	server = RSocketFactory.receive()
			.frameDecoder(PayloadDecoder.ZERO_COPY)
			.addServerPlugin(payloadInterceptor) // intercept responding
			.acceptor(context.getBean(MessageHandlerAcceptor.class))
			.transport(TcpServerTransport.create("localhost", 7000))
			.start()
			.block();

	requester = RSocketRequester.builder()
			.rsocketFactory(factory -> {
				factory.frameDecoder(PayloadDecoder.ZERO_COPY);
				factory.addClientPlugin(payloadInterceptor); // intercept outgoing requests
			})
			.rsocketStrategies(context.getBean(RSocketStrategies.class))
			.connectTcp("localhost", 7000)
			.block();
}
 
private static void connectAndVerify(String destination) {

		ServerController serverController = context.getBean(ServerController.class);
		serverController.reset();

		RSocket rsocket = null;
		try {
			rsocket = RSocketFactory.connect()
					.setupPayload(DefaultPayload.create("", destination))
					.dataMimeType("text/plain")
					.frameDecoder(PayloadDecoder.ZERO_COPY)
					.acceptor(context.getBean("clientAcceptor", MessageHandlerAcceptor.class))
					.transport(TcpClientTransport.create("localhost", 7000))
					.start()
					.block();

			serverController.await(Duration.ofSeconds(5));
		}
		finally {
			if (rsocket != null) {
				rsocket.dispose();
			}
		}
	}
 
@BeforeClass
@SuppressWarnings("ConstantConditions")
public static void setupOnce() {
	context = new AnnotationConfigApplicationContext(ServerConfig.class);

	server = RSocketFactory.receive()
			.addServerPlugin(interceptor)
			.frameDecoder(PayloadDecoder.ZERO_COPY)
			.acceptor(context.getBean(MessageHandlerAcceptor.class))
			.transport(TcpServerTransport.create("localhost", 7000))
			.start()
			.block();

	requester = RSocketRequester.builder()
			.rsocketFactory(factory -> factory.frameDecoder(PayloadDecoder.ZERO_COPY))
			.rsocketStrategies(context.getBean(RSocketStrategies.class))
			.connectTcp("localhost", 7000)
			.block();
}
 
源代码5 项目: rsc   文件: Rsc.java
static Flux<?> run(Args args) {
	if (args.debug()) {
		configureDebugLevel("io.rsocket.FrameLogger");
	}
	args.log().ifPresent(Rsc::configureDebugLevel);
	final ClientTransport clientTransport = args.clientTransport();
	final RSocketFactory.ClientRSocketFactory factory = RSocketFactory.connect();
	args.resume().ifPresent(duration -> factory.resume().resumeSessionDuration(duration)
			.resumeStrategy(() -> new PeriodicResumeStrategy(Duration.ofSeconds(5))));
	args.setup().map(DefaultPayload::create).ifPresent(factory::setupPayload);
	return factory //
			.frameDecoder(PayloadDecoder.ZERO_COPY) //
			.metadataMimeType(args.composeMetadata().getT1()) //
			.dataMimeType(args.dataMimeType()) //
			.transport(clientTransport) //
			.start() //
			.flatMapMany(rsocket -> args.interactionModel().request(rsocket, args));
}
 
源代码6 项目: spring-cloud-rsocket   文件: PingPongApp.java
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
	ConfigurableEnvironment env = event.getApplicationContext().getEnvironment();
	Integer pongDelay = env.getProperty("pong.delay", Integer.class, 5000);
	try {
		Thread.sleep(pongDelay);
	}
	catch (InterruptedException e) {
		e.printStackTrace();
	}
	log.info("Starting Pong");
	Integer gatewayPort = env.getProperty("spring.rsocket.server.port",
			Integer.class, 7002);
	MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(
			meterRegistry, Tag.of("component", "pong"));

	ByteBuf announcementMetadata = getRouteSetupMetadata(strategies, "pong", 3L);
	RSocketFactory.connect().metadataMimeType(COMPOSITE_MIME_TYPE.toString())
			.setupPayload(
					DefaultPayload.create(EMPTY_BUFFER, announcementMetadata))
			.addRequesterPlugin(interceptor).acceptor(this::accept)
			.transport(TcpClientTransport.create(gatewayPort)) // proxy
			.start().block();
}
 
@SuppressWarnings("unchecked")
private RSocketFactory.Start<CloseableChannel> createRSocketStarter(HttpHandler httpHandler) {
	RSocketFactory.ServerRSocketFactory rSocketFactory = applyCustomizers(RSocketFactory.receive());


	HttpServer httpServer = createHttpServer();
	ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);

	return rSocketFactory
		.acceptor(socketAcceptor)
		.transport((ServerTransport) new WebsocketRouteTransport(
			httpServer,
			r -> r.route(hsr -> !("/" + hsr.path()).equals(path), handlerAdapter),
			path
		));
}
 
源代码8 项目: spring-boot-rsocket   文件: DemoApplication.java
public static void main(String[] args) {
	SpringApplication.run(DemoApplication.class, args);

	RSocket rSocket = RSocketFactory.connect()
	                              .transport(WebsocketClientTransport.create(
			                              HttpClient.from(TcpClient.create()
			                                                       .host("localhost")
			                                                       .port(8080)),
			                              "/rsocket"
	                              ))
	                              .start()
	                              .block();

	logger.info(
		rSocket.requestResponse(DefaultPayload.create("HelloWorld"))
		     .map(Payload::getDataUtf8)
		     .block()
	);
}
 
源代码9 项目: spring-boot-rsocket   文件: DemoApplication.java
public static void main(String[] args) {
	SpringApplication.run(DemoApplication.class, args);

	RSocket rSocket = RSocketFactory.connect()
	                              .transport(WebsocketClientTransport.create(
			                              HttpClient.from(TcpClient.create()
			                                                       .host("localhost")
			                                                       .port(8080)),
			                              "/rsocket-rpc"
	                              ))
	                              .start()
	                              .block();

	GreeterClient client = new GreeterClient(rSocket);

	client.streamGreet(HelloRequest.newBuilder().setName("Jon Doe").build())
	      .log()
	      .blockLast();

	client.requestGreet(HelloRequest.newBuilder().setName("Arthur Conan Doyle").build())
	      .log()
	      .block();
}
 
public Flux<Reservation> getAllReservations() {

		return RSocketFactory
			.connect()
			.transport(this.localhost)
			.start()
			.flatMapMany(socket ->
				socket
					.requestStream(DefaultPayload.create(new byte[0]))
					.map(Payload::getDataUtf8)
					.map(obj -> {
						try {
							return this.objectMapper
								.readValue(obj, Reservation.class);
						}
						catch (IOException e) {
							throw new RuntimeException(e);
						}
					})
					.doFinally(signal -> socket.dispose())
			);
	}
 
@EventListener(ApplicationReadyEvent.class)
public void serve() throws Exception {

	var abstractRSocket = new AbstractRSocket() {

		@Override
		public Flux<Payload> requestStream(Payload payload) {
			return reservationRepository.findAll()
				.map(RsocketServer.this::toJson)
				.map(DefaultPayload::create);
		}
	};

	SocketAcceptor socketAcceptor = (connectionSetupPayload, rSocket) -> Mono.just(abstractRSocket);

	RSocketFactory
		.receive()
		.acceptor(socketAcceptor)
		.transport(this.tcp)
		.start()
		.subscribe();

}
 
@Override
public void start() {
	try{
		this.lifecycleMonitor.lock();
		if(!running){
			logger.info("Starting RSocket server using transport: {} ", this.transport.getClass().getName());
			this.disposable = RSocketFactory.receive()
					.acceptor(acceptor)
					.transport(transport)
					.start()
					.subscribe();
			running = true;
		}
	}finally {
		lifecycleMonitor.unlock();
	}
}
 
源代码13 项目: rpc-thunderdome   文件: RSocketRpcServer.java
public static void main(String... args) {
  String host = System.getProperty("host", "127.0.0.1");
  int port = Integer.getInteger("port", 8001);
  RSocketFactory.receive()
      .frameDecoder(Frame::retain)
      .acceptor(
          (setup, sendingSocket) ->
              Mono.just(
                  new SimpleServiceServer(
                      new DefaultService(), Optional.empty(), Optional.empty())))
      .transport(TcpServerTransport.create(host, port))
      .start()
      .block()
      .onClose()
      .doOnSubscribe(s -> logger.info("server started"))
      .block();
}
 
@BeforeClass
@SuppressWarnings("ConstantConditions")
public static void setupOnce() {
	context = new AnnotationConfigApplicationContext(RSocketConfig.class);

	server = RSocketFactory.receive()
			.frameDecoder(PayloadDecoder.ZERO_COPY)
			.acceptor(context.getBean("serverAcceptor", MessageHandlerAcceptor.class))
			.transport(TcpServerTransport.create("localhost", 7000))
			.start()
			.block();
}
 
@Test
@SuppressWarnings("unchecked")
public void shouldApplyCustomizationsAtSubscription() {
	Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class);
	Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
	RSocketRequester.builder()
			.rsocketFactory(factoryConfigurer)
			.rsocketStrategies(strategiesConfigurer)
			.connect(this.transport);
	verifyZeroInteractions(this.transport, factoryConfigurer, strategiesConfigurer);
}
 
@Test
@SuppressWarnings("unchecked")
public void shouldApplyCustomizations() {
	Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = mock(Consumer.class);
	Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
	RSocketRequester.builder()
			.rsocketFactory(factoryConfigurer)
			.rsocketStrategies(strategiesConfigurer)
			.connect(this.transport)
			.block();
	verify(this.transport).connect(anyInt());
	verify(factoryConfigurer).accept(any(RSocketFactory.ClientRSocketFactory.class));
	verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
}
 
源代码17 项目: staccato   文件: RSocketService.java
public RSocketService(RSocketConfigProps configProps, ItemSocketAcceptor itemSocketAcceptor) {
    log.info("Starting RSocket");
    TcpServerTransport tcp = TcpServerTransport.create("0.0.0.0", configProps.getPort());
    RSocketFactory.receive()
            .acceptor(itemSocketAcceptor)
            .transport(tcp)
            .start().log()
            .subscribe(channel -> log.info("RSocket initialized on port " + configProps.getPort()));
}
 
源代码18 项目: spring-cloud-rsocket   文件: PingPongApp.java
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
	log.info("Starting Ping" + id);
	ConfigurableEnvironment env = event.getApplicationContext().getEnvironment();
	Integer take = env.getProperty("ping.take", Integer.class, null);
	Integer gatewayPort = env.getProperty("spring.rsocket.server.port",
			Integer.class, 7002);

	log.debug("ping.take: " + take);

	MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(
			meterRegistry, Tag.of("component", "ping"));
	ByteBuf metadata = getRouteSetupMetadata(strategies, "ping", id);
	Payload setupPayload = DefaultPayload.create(EMPTY_BUFFER, metadata);

	pongFlux = RSocketFactory.connect().frameDecoder(PayloadDecoder.ZERO_COPY)
			.metadataMimeType(COMPOSITE_MIME_TYPE.toString())
			.setupPayload(setupPayload).addRequesterPlugin(interceptor)
			.transport(TcpClientTransport.create(gatewayPort)) // proxy
			.start().log("startPing" + id)
			.flatMapMany(socket -> doPing(take, socket)).cast(String.class)
			.doOnSubscribe(o -> {
				if (log.isDebugEnabled()) {
					log.debug("ping doOnSubscribe");
				}
			});

	boolean subscribe = env.getProperty("ping.subscribe", Boolean.class, true);

	if (subscribe) {
		pongFlux.subscribe();
	}
}
 
源代码19 项目: liiklus   文件: RSocketConfiguration.java
@Override
public void initialize(GenericApplicationContext applicationContext) {
    var environment = applicationContext.getEnvironment();

    if (!environment.acceptsProfiles(Profiles.of("gateway"))) {
        return;
    }

    var serverProperties = PropertiesUtil.bind(environment, new RSocketServerProperties());

    if (!serverProperties.isEnabled()) {
        return;
    }

    applicationContext.registerBean(RSocketLiiklusService.class);

    applicationContext.registerBean(
            CloseableChannel.class,
            () -> {
                var liiklusService = applicationContext.getBean(LiiklusService.class);

                return RSocketFactory.receive()
                        .acceptor((setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(new LiiklusServiceServer(liiklusService, Optional.empty(), Optional.empty()))))
                        .transport(TcpServerTransport.create(serverProperties.getHost(), serverProperties.getPort()))
                        .start()
                        .block();
            },
            it -> {
                it.setDestroyMethodName("dispose");
            }
    );
}
 
Flux<Reservation> getAllReservations() {
	return RSocketFactory
		.connect()
		.transport(this.tcpClientTransport)
		.start()
		.flatMapMany(rs ->
			rs.requestStream(DefaultPayload.create(new byte[0]))
				.map(Payload::getDataUtf8)
				.map(this::to)
		);
}
 
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {

	SocketAcceptor sa =
		(connectionSetupPayload, rSocket) -> Mono.just(new AbstractRSocket() {
			@Override
			public Flux<Payload> requestStream(Payload payload) {
				return
					reservationRepository
						.findAll()
						.map(x -> {
							try {
								return mapper.writeValueAsString(x);
							}
							catch (JsonProcessingException e) {
								throw new RuntimeException(e);
							}
						})
						.map(DefaultPayload::create);
			}
		});

	RSocketFactory
		.receive()
		.acceptor(sa)
		.transport(TcpServerTransport.create("localhost", 7000))
		.start()
		.onTerminateDetach()
		.subscribe();
}
 
@Bean
RSocket rSocket() {
	return RSocketFactory
		.connect()
		.frameDecoder(PayloadDecoder.ZERO_COPY)
		.dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)
		.transport(TcpClientTransport.create(7000))
		.start()
		.block();
}
 
源代码23 项目: rpc-benchmark   文件: Server.java
public static void main(String[] args){
    UserServiceServer userServiceServer = new UserServiceServer(new UserServiceRsocketServerImpl(), Optional.empty(), Optional.empty());
    CloseableChannel closeableChannel =
            RSocketFactory.receive()
                    .acceptor(
                            (setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(userServiceServer)))
                    .transport(TcpServerTransport.create(8080))
                    .start()
                    .block();

    // Block so we don't exit
    closeableChannel.onClose().block();
}
 
源代码24 项目: tutorials   文件: Server.java
public Server() {
    this.server = RSocketFactory.receive()
      .acceptor((setupPayload, reactiveSocket) -> Mono.just(new RSocketImpl()))
      .transport(TcpServerTransport.create("localhost", TCP_PORT))
      .start()
      .doOnNext(x -> LOG.info("Server started"))
      .subscribe();

    this.gameController = new GameController("Server Player");
}
 
源代码25 项目: tutorials   文件: ChannelClient.java
public ChannelClient() {
    this.socket = RSocketFactory.connect()
      .transport(TcpClientTransport.create("localhost", TCP_PORT))
      .start()
      .block();

    this.gameController = new GameController("Client Player");
}
 
源代码26 项目: tutorials   文件: FireNForgetClient.java
public FireNForgetClient() {
    this.socket = RSocketFactory.connect()
      .transport(TcpClientTransport.create("localhost", TCP_PORT))
      .start()
      .block();
    this.data = Collections.unmodifiableList(generateData());
}
 
源代码27 项目: tutorials   文件: ClientConfiguration.java
@Bean
public RSocket rSocket() {
    return RSocketFactory.connect()
                         .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
                         .frameDecoder(PayloadDecoder.ZERO_COPY)
                         .transport(TcpClientTransport.create(7000))
                         .start()
                         .block();
}
 
@Bean
@Lazy
public RSocket rSocket() {
    return RSocketFactory.connect()
                         .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
                         .frameDecoder(PayloadDecoder.ZERO_COPY)
                         .transport(TcpClientTransport.create(7000))
                         .start()
                         .block();
}
 
@Override
public RSocketRequester.Builder rsocketFactory(Consumer<RSocketFactory.ClientRSocketFactory> configurer) {
	this.factoryConfigurers.add(configurer);
	return this;
}
 
private RSocketFactory.ServerRSocketFactory applyCustomizers(RSocketFactory.ServerRSocketFactory server) {
	for (RSocketReceiverCustomizer customizer : this.rSocketCustomizers) {
		server = customizer.apply(server);
	}
	return server;
}