下面列出了怎么用org.springframework.messaging.rsocket.RSocketStrategies的API类实例代码及写法,或者点击链接到github查看源代码。
@BeforeAll
public static void setupOnce(@Autowired RSocketRequester.Builder builder,
@LocalRSocketServerPort Integer port,
@Autowired RSocketStrategies strategies) {
SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
mimeType = MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_AUTHENTICATION.getString());
// ******* The user 'test' is NOT in the required 'USER' role! **********
credentials = new UsernamePasswordMetadata("test", "pass");
requester = builder
.setupRoute("shell-client")
.setupData(UUID.randomUUID().toString())
.setupMetadata(credentials, mimeType)
.rsocketStrategies(b ->
b.encoder(new SimpleAuthenticationEncoder()))
.rsocketConnector(connector -> connector.acceptor(responder))
.connectTcp("localhost", port)
.block();
}
@BeforeAll
public static void setupOnce(@Autowired RSocketRequester.Builder builder,
@LocalRSocketServerPort Integer port,
@Autowired RSocketStrategies strategies) {
SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
credentials = new UsernamePasswordMetadata("user", "pass");
mimeType = MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_AUTHENTICATION.getString());
requester = builder
.setupRoute("shell-client")
.setupData(UUID.randomUUID().toString())
.setupMetadata(credentials, mimeType)
.rsocketStrategies(b ->
b.encoder(new SimpleAuthenticationEncoder()))
.rsocketConnector(connector -> connector.acceptor(responder))
.connectTcp("localhost", port)
.block();
}
@Bean
public RSocketRequester rsocketRequester(UpstreamManager upstreamManager) {
LoadBalancedRSocket loadBalancedRSocket = upstreamManager.findBroker().getLoadBalancedRSocket();
RSocketStrategies rSocketStrategies = RSocketStrategies.builder()
.encoder(new HessianEncoder())
.decoder(new HessianDecoder())
.build();
return RSocketRequester.wrap(loadBalancedRSocket,
MimeType.valueOf("application/x-hessian"),
MimeType.valueOf("message/x.rsocket.composite-metadata.v0"),
rSocketStrategies);
}
/**
* This private method is used to establish a connection to our fake RSocket server.
* It also controls the state of our test controller. This method is reusable by many tests.
*
* @param connectionRoute
*/
private void connectAndRunTest(String connectionRoute) {
ServerController controller = context.getBean(ServerController.class);
RSocketStrategies strategies = context.getBean(RSocketStrategies.class);
RSocketRequester requester = null;
try {
controller.reset();
// Add our ClientHandler as a responder
SocketAcceptor responder = RSocketMessageHandler.responder(strategies, new ClientHandler());
// Create an RSocket requester that includes our responder
requester = RSocketRequester.builder()
.setupRoute(connectionRoute)
.setupData(clientId)
.rsocketStrategies(strategies)
.rsocketConnector(connector -> connector.acceptor(responder))
.connectTcp("localhost", server.address().getPort())
.block();
// Give the test time to run, wait for the server's call.
controller.await(Duration.ofSeconds(10));
} finally {
if (requester != null) {
requester.rsocket().dispose();
}
}
}
@Bean
RSocketMessageHandler messageHandler(RSocketStrategies strategies) {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.getArgumentResolverConfigurer().addCustomResolver(new AuthenticationPrincipalArgumentResolver());
handler.setRSocketStrategies(strategies);
return handler;
}
@BeforeAll
public static void setupOnce(@Autowired RSocketRequester.Builder builder,
@LocalRSocketServerPort Integer port,
@Autowired RSocketStrategies strategies) {
mimeType = MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_AUTHENTICATION.getString());
reqbuilder = builder;
theport = port;
// ******* The user 'fake' is NOT in the user list! **********
credentials = new UsernamePasswordMetadata("fake", "pass");
}
@Override
public void afterPropertiesSet() {
RSocketStrategies rSocketStrategies = this.context
.getBean(RSocketStrategies.class);
MetadataExtractor metadataExtractor = rSocketStrategies.metadataExtractor();
// TODO: see if possible to make easier in framework.
if (metadataExtractor instanceof DefaultMetadataExtractor) {
DefaultMetadataExtractor extractor = (DefaultMetadataExtractor) metadataExtractor;
extractor.metadataToExtract(FORWARDING_MIME_TYPE, Forwarding.class,
Forwarding.METADATA_KEY);
extractor.metadataToExtract(ROUTE_SETUP_MIME_TYPE, RouteSetup.class,
RouteSetup.METADATA_KEY);
}
}
public MetadataEncoder(MimeType metadataMimeType, RSocketStrategies strategies) {
Assert.notNull(metadataMimeType, "'metadataMimeType' is required");
Assert.notNull(strategies, "RSocketStrategies is required");
this.metadataMimeType = metadataMimeType;
this.strategies = strategies;
this.isComposite = this.metadataMimeType.toString()
.equals(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
this.allocator = bufferFactory() instanceof NettyDataBufferFactory
? ((NettyDataBufferFactory) bufferFactory()).getByteBufAllocator()
: ByteBufAllocator.DEFAULT;
}
@Bean
public PendingRequestRSocketFactory pendingRequestRSocketFactory(
RoutingTable routingTable, Routes routes,
RSocketStrategies rSocketStrategies) {
return new PendingRequestRSocketFactory(routingTable, routes,
rSocketStrategies.metadataExtractor());
}
@Bean
public GatewayRSocketFactory gatewayRSocketFactory(RoutingTable routingTable,
Routes routes, PendingRequestRSocketFactory pendingFactory,
LoadBalancerFactory loadBalancerFactory, MeterRegistry meterRegistry,
BrokerProperties properties, RSocketStrategies rSocketStrategies) {
return new GatewayRSocketFactory(routingTable, routes, pendingFactory,
loadBalancerFactory, meterRegistry, properties,
rSocketStrategies.metadataExtractor());
}
@Bean
public GatewaySocketAcceptor socketAcceptor(GatewayRSocketFactory rsocketFactory,
List<SocketAcceptorFilter> filters, MeterRegistry meterRegistry,
BrokerProperties properties, RSocketStrategies rSocketStrategies) {
return new GatewaySocketAcceptor(rsocketFactory, filters, meterRegistry,
properties, rSocketStrategies.metadataExtractor());
}
@Bean
public ClusterJoinListener clusterJoinListener(ClusterService clusterService,
BrokerProperties properties, RSocketStrategies strategies,
GatewayRSocketFactory gatewayRSocketFactory) {
return new ClusterJoinListener(clusterService, properties, strategies,
gatewayRSocketFactory);
}
public ClusterJoinListener(ClusterService clusterService, BrokerProperties properties,
RSocketStrategies strategies, GatewayRSocketFactory gatewayRSocketFactory) {
this.clusterService = clusterService;
this.properties = properties;
this.strategies = strategies;
this.gatewayRSocketFactory = gatewayRSocketFactory;
}
static ByteBuf getRouteSetupMetadata(RSocketStrategies strategies, String name,
long id) {
RouteSetup routeSetup = RouteSetup.of(id, name)
.with("current-time", String.valueOf(System.currentTimeMillis())).build();
LinkedHashMap<TagsMetadata.Key, String> tags = new LinkedHashMap<>();
tags.put(new TagsMetadata.Key(WellKnownKey.TIME_ZONE),
System.currentTimeMillis() + "");
DataBuffer dataBuffer = new MetadataEncoder(COMPOSITE_MIME_TYPE, strategies)
.metadata(routeSetup, ROUTE_SETUP_MIME_TYPE).encode();
return TagsMetadata.asByteBuf(dataBuffer);
}
static ByteBuf getForwardingMetadata(RSocketStrategies strategies, String name,
long id) {
Forwarding metadata = Forwarding.of(id).serviceName(name).build();
DataBuffer dataBuffer = new MetadataEncoder(COMPOSITE_MIME_TYPE, strategies)
.metadata(metadata, Forwarding.FORWARDING_MIME_TYPE).encode();
return TagsMetadata.asByteBuf(dataBuffer);
}
@Bean
@Scope("prototype") // TODO: I don't think prototype works here
@ConditionalOnMissingBean
public RSocketRequester.Builder gatewayRSocketRequesterBuilder(
RSocketStrategies strategies, ClientProperties properties,
MeterRegistry meterRegistry) {
RouteSetup.Builder routeSetup = RouteSetup.of(properties.getRouteId(),
properties.getServiceName());
properties.getTags().forEach((key, value) -> {
if (key.getWellKnownKey() != null) {
routeSetup.with(key.getWellKnownKey(), value);
}
else if (key.getCustomKey() != null) {
routeSetup.with(key.getCustomKey(), value);
}
});
MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(
meterRegistry, Tag.of("servicename", properties.getServiceName()));
RSocketRequester.Builder builder = RSocketRequester.builder()
.setupMetadata(routeSetup.build(), RouteSetup.ROUTE_SETUP_MIME_TYPE)
.rsocketStrategies(strategies).rsocketFactory(configurer(interceptor));
return new ClientRSocketRequesterBuilder(builder, properties,
strategies.routeMatcher());
}
@Autowired
public RSocketShellClient(RSocketRequester.Builder builder,
@Qualifier("rSocketStrategies") RSocketStrategies strategies) {
this.rsocketRequesterBuilder = builder;
this.rsocketStrategies = strategies;
}
@Bean
public RSocketMessageHandler serverMessageHandler(@Qualifier("testStrategies") RSocketStrategies strategies) {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(strategies);
return handler;
}
@Bean("testStrategies")
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.create();
}
public static DataBuffer emptyDataBuffer(RSocketStrategies rSocketStrategies) {
return rSocketStrategies.dataBufferFactory().wrap(new byte[0]);
}
@Override
public RSocketRequester.Builder rsocketStrategies(RSocketStrategies strategies) {
return delegate.rsocketStrategies(strategies);
}
@Override
public RSocketRequester.Builder rsocketStrategies(
Consumer<RSocketStrategies.Builder> configurer) {
return delegate.rsocketStrategies(configurer);
}
@Bean
RSocketRequester requester(RSocketStrategies rSocketStrategies) {
return RSocketRequester
.create(this.rSocket(), MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
@Bean
@Lazy
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}