下面列出了怎么用io.grpc.BindableService的API类实例代码及写法,或者点击链接到github查看源代码。
GrpcStartable(GrpcServerConfig serverConfig, Tracing tracing, BindableService... services) {
ServerBuilder<?> serverBuilder;
if (serverConfig.isSslEnable()) {
serverBuilder = NettyServerBuilder.forAddress(
new InetSocketAddress(serverConfig.getHost(), serverConfig.getPort()));
try {
((NettyServerBuilder) serverBuilder).sslContext(getSslContextBuilder(serverConfig).build());
} catch (SSLException e) {
throw new IllegalStateException("Unable to setup grpc to use SSL.", e);
}
} else {
serverBuilder = ServerBuilder.forPort(serverConfig.getPort());
}
// Arrays.stream(services).forEach(serverBuilder::addService);
// add interceptor for grpc server By Gannalyo
Arrays.stream(services).forEach(service ->
serverBuilder.addService(ServerInterceptors.intercept(service,
GrpcTracing.create(tracing).newServerInterceptor())));
server = serverBuilder.build();
}
@Override
public void registerProcessor(ProviderConfig providerConfig, Invoker instance) {
Object ref = providerConfig.getRef();
try {
final ServerServiceDefinition serviceDef;
if (SofaProtoUtils.isProtoClass(ref)) {
BindableService bindableService = (BindableService) providerConfig.getRef();
serviceDef = bindableService.bindService();
} else {
GenericServiceImpl genericService = new GenericServiceImpl(providerConfig);
serviceDef = buildSofaServiceDef(genericService, providerConfig, instance);
}
List<TripleServerInterceptor> interceptorList = buildInterceptorChain(serviceDef);
ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(
serviceDef, interceptorList);
serviceInfo.put(providerConfig, serviceDefinition);
handlerRegistry.addService(serviceDefinition);
invokerCnt.incrementAndGet();
} catch (Exception e) {
LOGGER.error("Register triple service error", e);
serviceInfo.remove(providerConfig);
}
}
@Bean
@Lazy
InfoContributor grpcInfoContributor(final GrpcServerProperties properties,
final Collection<BindableService> grpcServices, final HealthStatusManager healthStatusManager) {
final Map<String, Object> details = new LinkedHashMap<>();
details.put("port", properties.getPort());
if (properties.isReflectionServiceEnabled()) {
// Only expose services via web-info if we do the same via grpc.
final Map<String, List<String>> services = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
details.put("services", services);
final List<BindableService> mutableGrpcServiceList = new ArrayList<>(grpcServices);
mutableGrpcServiceList.add(ProtoReflectionService.newInstance());
if (properties.isHealthServiceEnabled()) {
mutableGrpcServiceList.add(healthStatusManager.getHealthService());
}
for (final BindableService grpcService : mutableGrpcServiceList) {
final ServiceDescriptor serviceDescriptor = grpcService.bindService().getServiceDescriptor();
final List<String> methods = collectMethodNamesForService(serviceDescriptor);
services.put(serviceDescriptor.getName(), methods);
}
}
return new SimpleInfoContributor("grpc.server", details);
}
/**
* Create a {@link Server} if one isn't already present in the context.
*
* @param port The port this server should listen on
* @param services The gRPC services this server should serve
* @param serverInterceptors The {@link ServerInterceptor} implementations that should be applied to all services
* @return A Netty server instance based on the provided information
*/
@Bean
@ConditionalOnMissingBean(Server.class)
public Server gRpcServer(
@Value("${grpc.server.port:0}") final int port, // TODO: finalize how to get configure this property
final Set<BindableService> services,
final List<ServerInterceptor> serverInterceptors
) {
final NettyServerBuilder builder = NettyServerBuilder.forPort(port);
// Add Service interceptors and add services to the server
services
.stream()
.map(BindableService::bindService)
.map(serviceDefinition -> ServerInterceptors.intercept(serviceDefinition, serverInterceptors))
.forEach(builder::addService);
return builder.build();
}
public GrpcServerStrategy(GrpcURL providerUrl, Object protocolImpl){
if (protocolImpl instanceof BindableService) {
this.exporter = new GrpcStubServerExporter();
this.protocolClass = protocolImpl.getClass();
} else {
Class<?> protocol;
try {
protocol = ReflectUtils.name2class(providerUrl.getServiceInterface());
if (!protocol.isAssignableFrom(protocolImpl.getClass())) {
throw new IllegalStateException("protocolClass " + providerUrl.getServiceInterface()
+ " is not implemented by protocolImpl which is of class "
+ protocolImpl.getClass());
}
} catch (ClassNotFoundException e) {
protocol = protocolImpl.getClass();
}
this.protocolClass = protocol;
this.exporter = new DefaultProxyExporter(providerUrl);
}
this.protocolImpl = protocolImpl;
}
@Bean()
@ConditionalOnProperty(name = "alpha.feature.akka.enabled", havingValue = "false", matchIfMissing = true)
ServerStartable serverStartable(GrpcServerConfig serverConfig, TxConsistentService txConsistentService,
Map<String, Map<String, OmegaCallback>> omegaCallbacks, @Autowired(required = false) GrpcTccEventService grpcTccEventService,
@Qualifier("alphaEventBus") EventBus eventBus) throws IOException {
ServerMeta serverMeta = ServerMeta.newBuilder()
.putMeta(AlphaMetaKeys.AkkaEnabled.name(), String.valueOf(false)).build();
List<BindableService> bindableServices = new ArrayList();
bindableServices.add(new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks, serverMeta));
if (grpcTccEventService != null) {
LOG.info("alpha.feature.tcc.enable=true, starting the TCC service.");
bindableServices.add(grpcTccEventService);
} else {
LOG.info("alpha.feature.tcc.enable=false, the TCC service is disabled.");
}
ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus,
bindableServices.toArray(new BindableService[0]));
new Thread(bootstrap::start).start();
LOG.info("alpha.feature.akka.enabled=false, starting the saga db service");
return bootstrap;
}
@Bean
@ConditionalOnProperty(name= "alpha.feature.akka.enabled", havingValue = "true")
ServerStartable serverStartableWithAkka(GrpcServerConfig serverConfig,
Map<String, Map<String, OmegaCallback>> omegaCallbacks, @Autowired(required = false) GrpcTccEventService grpcTccEventService,
@Qualifier("alphaEventBus") EventBus eventBus, ActorEventChannel actorEventChannel) throws IOException {
ServerMeta serverMeta = ServerMeta.newBuilder()
.putMeta(AlphaMetaKeys.AkkaEnabled.name(), String.valueOf(true)).build();
List<BindableService> bindableServices = new ArrayList();
bindableServices.add(new GrpcSagaEventService(actorEventChannel, omegaCallbacks, serverMeta));
if (grpcTccEventService != null) {
LOG.info("alpha.feature.tcc.enable=true, starting the TCC service.");
bindableServices.add(grpcTccEventService);
} else {
LOG.info("alpha.feature.tcc.enable=false, the TCC service is disabled.");
}
ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus, bindableServices.toArray(new BindableService[0]));
new Thread(bootstrap::start).start();
LOG.info("alpha.feature.akka.enabled=true, starting the saga akka service.");
return bootstrap;
}
@Test
public void getPortReturnsServerPortForRunningServer() throws Exception {
final int configPort = ThreadLocalRandom.current().nextInt(1000, 2000);
final int serverPort = ThreadLocalRandom.current().nextInt(2000, 3000);
final int serviceCount = ThreadLocalRandom.current().nextInt(5, 10);
final long shutdownWaitTimeInMillis = ThreadLocalRandom.current().nextLong(1000, 10000);
final ApplicationContext applicationContext = mock(ApplicationContext.class);
final Server server = mock(Server.class, new TriesToReturnSelf());
final GrpcServerFactory factory = (p, s) -> server;
final Map<String, Object> services = IntStream.range(0, serviceCount)
.mapToObj(i -> mock(BindableService.class))
.collect(Collectors.toMap(s -> UUID.randomUUID().toString(), s -> s));
when(applicationContext.getBeansWithAnnotation(eq(GrpcService.class))).thenReturn(services);
when(server.getPort()).thenReturn(serverPort);
GrpcServerHost runner = new GrpcServerHost(configPort, shutdownWaitTimeInMillis, factory);
runner.setApplicationContext(applicationContext);
runner.start();
assertThat(runner.getPort()).isEqualTo(serverPort);
}
@Override
public void start() throws Exception {
final ConduitServiceRegistryImpl registry = (ConduitServiceRegistryImpl) registryProvider.get();
for (BindableService service : registry.getServiceList()) {
serverBuilder.addService(service);
}
for (CloseableBindableService closeableService : registry.getCloseableServiceList()) {
serverBuilder.addService(closeableService);
closeableServices.add(closeableService);
}
serverBuilder.maxInboundMetadataSize(Integer.MAX_VALUE).maxInboundMessageSize(Integer.MAX_VALUE)
.intercept(TransmitStatusRuntimeExceptionInterceptor.instance());
if (sslEngineFactory.isPresent()) {
final SslContextBuilder contextBuilder = sslEngineFactory.get().newServerContextBuilder();
// add gRPC overrides using #configure
serverBuilder.sslContext(GrpcSslContexts.configure(contextBuilder).build());
}
server = serverBuilder.build();
server.start();
logger.info("ConduitServer is up. Listening on port '{}'", server.getPort());
}
private GRPCServer buildGRPCServer(GRPCServerConfiguration grpcServerConfiguration) {
ImmutableList.Builder<BindableService> services = ImmutableList.<BindableService>builder().add(
healthServiceImpl,
eventServiceImpl,
metadataServiceImpl,
taskServiceImpl,
workflowServiceImpl);
if (grpcServerConfiguration.isReflectionEnabled()) {
services.add(ProtoReflectionService.newInstance());
}
return new GRPCServer(
grpcServerConfiguration.getPort(),
services.build().toArray(new BindableService[]{})
);
}
@Override
public Collection<GrpcServiceDefinition> findGrpcServices() {
Collection<String> beanNames =
Arrays.asList(this.applicationContext.getBeanNamesForAnnotation(GrpcService.class));
List<GrpcServiceDefinition> definitions = Lists.newArrayListWithCapacity(beanNames.size());
GlobalServerInterceptorRegistry globalServerInterceptorRegistry =
applicationContext.getBean(GlobalServerInterceptorRegistry.class);
for (String beanName : beanNames) {
BindableService bindableService = this.applicationContext.getBean(beanName, BindableService.class);
ServerServiceDefinition serviceDefinition = bindableService.bindService();
GrpcService grpcServiceAnnotation = applicationContext.findAnnotationOnBean(beanName, GrpcService.class);
serviceDefinition =
bindInterceptors(serviceDefinition, grpcServiceAnnotation, globalServerInterceptorRegistry);
definitions.add(new GrpcServiceDefinition(beanName, bindableService.getClass(), serviceDefinition));
log.debug("Found gRPC service: " + serviceDefinition.getServiceDescriptor().getName() + ", bean: "
+ beanName + ", class: " + bindableService.getClass().getName());
}
return definitions;
}
@Bean
@Lazy
InfoContributor grpcInfoContributor(final GrpcServerProperties properties,
final Collection<BindableService> grpcServices, final HealthStatusManager healthStatusManager) {
final Map<String, Object> details = new LinkedHashMap<>();
details.put("port", properties.getPort());
if (properties.isReflectionServiceEnabled()) {
// Only expose services via web-info if we do the same via grpc.
final Map<String, List<String>> services = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
details.put("services", services);
final List<BindableService> mutableGrpcServiceList = new ArrayList<>(grpcServices);
mutableGrpcServiceList.add(ProtoReflectionService.newInstance());
if (properties.isHealthServiceEnabled()) {
mutableGrpcServiceList.add(healthStatusManager.getHealthService());
}
for (final BindableService grpcService : mutableGrpcServiceList) {
final ServiceDescriptor serviceDescriptor = grpcService.bindService().getServiceDescriptor();
final List<String> methods = collectMethodNamesForService(serviceDescriptor);
services.put(serviceDescriptor.getName(), methods);
}
}
return new SimpleInfoContributor("grpc.server", details);
}
@Override
public Collection<GrpcServiceDefinition> findGrpcServices() {
Collection<String> beanNames = findGrpcServiceBeanNames();
List<GrpcServiceDefinition> definitions = new ArrayList<GrpcServiceDefinition>(
beanNames.size());
for (String beanName : beanNames) {
Object bean = this.applicationContext.getBean(beanName);
Class<?> beanClazz = bean.getClass();
if (!BindableService.class.isAssignableFrom(beanClazz)) {
throw new IllegalStateException(beanClazz.getName() + " does not seem to extend a generated base implementation nor implements BindableService");
}
definitions.add(new GrpcServiceDefinition(beanName, (BindableService) bean));
}
return definitions;
}
public void run() throws Exception {
GrpcReceiver grpcReceiver = new GrpcReceiver();
grpcReceiver.setEnable(true);
grpcReceiver.setBeanName("AgentServer");
grpcReceiver.setBindIp(IP);
grpcReceiver.setBindPort(PORT);
PingEventHandler pingEventHandler = mock(PingEventHandler.class);
BindableService agentService = new AgentService(new MockDispatchHandler(), pingEventHandler, Executors.newFixedThreadPool(8), serverRequestFactory);
grpcReceiver.setBindableServiceList(Arrays.asList(agentService, new MetadataService(new MockDispatchHandler(), Executors.newFixedThreadPool(8), serverRequestFactory)));
grpcReceiver.setAddressFilter(new MockAddressFilter());
grpcReceiver.setExecutor(Executors.newFixedThreadPool(8));
grpcReceiver.setServerOption(new ServerOption.Builder().build());
grpcReceiver.afterPropertiesSet();
grpcReceiver.blockUntilShutdown();
grpcReceiver.destroy();
}
public TestRemoteExecutionClients(List<BindableService> services) throws IOException {
eventBus = new DefaultBuckEventBus(new DefaultClock(), new BuildId("dontcare"));
String serverName = "uniquish-" + new Random().nextLong();
InProcessServerBuilder serverBuilder =
InProcessServerBuilder.forName(serverName).directExecutor();
for (BindableService service : services) {
serverBuilder.addService(service);
}
server = serverBuilder.build().start();
ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
clients =
new GrpcRemoteExecutionClients(
"buck",
channel,
channel,
100,
MetadataProviderFactory.emptyMetadataProvider(),
eventBus,
FakeBuckConfig.builder()
.build()
.getView(RemoteExecutionConfig.class)
.getStrategyConfig());
}
@Override
public final T addService(BindableService bindableService) {
if (bindableService instanceof InternalNotifyOnServerBuild) {
notifyOnBuildList.add((InternalNotifyOnServerBuild) bindableService);
}
return addService(bindableService.bindService());
}
private void start(CommandLine cmd) throws IOException, GeneralSecurityException, SQLException {
// The port on which the server should run.
int port = Integer.valueOf(cmd.getOptionValue(PORT_OPTION));
// The FCM message sender.
FcmSender fcmSender = new FcmSender(
cmd.getOptionValue(FIREBASE_PROJECT_ID_OPTION),
cmd.getOptionValue(SERVICE_ACCOUNT_CREDENTIALS_PATH_OPTION));
// The Capillary encrypter managers.
RsaEcdsaEncrypterManager rsaEcdsaEncrypterManager;
try (FileInputStream senderSigningKey =
new FileInputStream(cmd.getOptionValue(ECDSA_PRIVATE_KEY_PATH_OPTION))) {
rsaEcdsaEncrypterManager = new RsaEcdsaEncrypterManager(senderSigningKey);
}
WebPushEncrypterManager webPushEncrypterManager = new WebPushEncrypterManager();
// The {certificate, private key} pair to use for gRPC TLS.
File tlsCertFile = new File(cmd.getOptionValue(TLS_CERT_PATH_OPTION));
File tlsPrivateKeyFile = new File(cmd.getOptionValue(TLS_PRIVATE_KEY_PATH_OPTION));
// The interface to demo SQLite DB.
DemoDb db = new DemoDb(
"jdbc:sqlite:" + cmd.getOptionValue(DATABASE_PATH_OPTION));
// The demo service.
BindableService demoService =
new DemoServiceImpl(db, rsaEcdsaEncrypterManager, webPushEncrypterManager, fcmSender);
// Create and start the gRPC server instance.
server = ServerBuilder.forPort(port)
.useTransportSecurity(tlsCertFile, tlsPrivateKeyFile)
.addService(demoService)
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
shutdown();
System.err.println("*** server shut down");
}));
}
private void start() throws IOException {
server = ServerBuilder.forPort(port).addService((BindableService) new TestHelloWorldImpl()).build().start();
System.out.println("------------------- 服务端服务已开启,等待客户端访问 -------------------");
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("*** shutting down gRPC server since JVM is shutting down");
TestServer.this.stop();
System.err.println("*** server shut down");
}
});
}
private static List<ServerServiceDefinition> gatherServices(Instance<BindableService> services) {
List<ServerServiceDefinition> definitions = new ArrayList<>();
services.forEach(new Consumer<BindableService>() { // NOSONAR
@Override
public void accept(BindableService bindable) {
ServerServiceDefinition definition = bindable.bindService();
LOGGER.debugf("Registered gRPC service '%s'", definition.getServiceDescriptor().getName());
definitions.add(definition);
}
});
return definitions;
}
@Test
public void simpleLookupWithBindable() {
BindableService bindableService =
new BindableService() {
@Override
public ServerServiceDefinition bindService() {
return basicServiceDefinition;
}
};
assertNull(registry.addService(bindableService));
ServerMethodDefinition<?, ?> method = registry.lookupMethod("basic/flow");
assertSame(flowMethodDefinition, method);
}
@Bean
@ConditionalOnMissingBean
public MetricCollectingServerInterceptor metricCollectingServerInterceptor(final MeterRegistry registry,
final Collection<BindableService> services) {
final MetricCollectingServerInterceptor metricCollector = new MetricCollectingServerInterceptor(registry);
log.debug("Pre-Registering service metrics");
for (final BindableService service : services) {
log.debug("- {}", service);
metricCollector.preregisterService(service);
}
return metricCollector;
}
@Bean
public MetricCollectingServerInterceptor metricCollectingServerInterceptor(final MeterRegistry registry,
final Collection<BindableService> services) {
final MetricCollectingServerInterceptor metricCollector = new MetricCollectingServerInterceptor(registry,
counter -> counter.tag("type", "counter"),
timer -> timer.tag("type", "timer").publishPercentiles(0.5, 0.9, 0.99),
OK, UNKNOWN);
log.debug("Pre-Registering custom service metrics");
for (final BindableService service : services) {
log.debug("- {}", service);
metricCollector.preregisterService(service);
}
return metricCollector;
}
@Override
public Server buildServerForServices(int port, Collection<BindableService> services) {
ServerBuilder builder = ServerBuilder.forPort(port);
setupServer(builder);
services.forEach(service -> registerService(builder, service));
return builder.build();
}
/**
* Start the gRPC {@link Server}.
*
* @throws IOException if unable to bind to server address or port
* @throws IllegalStateException if any non-{@link BindableService} classes are annotated with {@link GrpcService}
*/
public void start() throws IOException {
if (serverFactory == null) {
serverFactory = findServerFactory();
}
final Collection<BindableService> services = getServicesFromApplicationContext();
if (services.isEmpty()) {
throw new IOException("gRPC server not started because no services were found in the application context.");
}
server = serverFactory.buildServerForServices(port, services);
server.start();
}
@Test
public void startStartsServerWithServices() throws Exception {
final int port = ThreadLocalRandom.current().nextInt(1000, 10000);
final int serviceCount = ThreadLocalRandom.current().nextInt(5, 10);
final long shutdownWaitTimeInMillis = ThreadLocalRandom.current().nextLong(1000, 10000);
final ApplicationContext applicationContext = mock(ApplicationContext.class);
final Server server = mock(Server.class, new TriesToReturnSelf());
when(server.getPort()).thenReturn(port);
final Map<String, Object> services = IntStream.range(0, serviceCount)
.mapToObj(i -> mock(BindableService.class))
.collect(Collectors.toMap(s -> UUID.randomUUID().toString(), s -> s));
AtomicBoolean built = new AtomicBoolean(false);
GrpcServerFactory fakeFactory = (p, s) -> {
built.set(true);
assertThat(p).isEqualTo(port);
s.forEach(ss -> assertThat(services.values().contains(ss)).isTrue());
return server;
};
when(applicationContext.getBeansWithAnnotation(eq(GrpcService.class))).thenReturn(services);
GrpcServerHost runner = new GrpcServerHost(port, shutdownWaitTimeInMillis, fakeFactory);
runner.setApplicationContext(applicationContext);
runner.start();
assertThat(built.get()).isTrue();
verify(server).start();
assertThat(runner.server()).isEqualTo(server);
}
protected AbstractTitusGrpcServer(GrpcEndpointConfiguration configuration,
BindableService bindableService,
TitusRuntime runtime) {
this.configuration = configuration;
this.serviceDefinition = bindableService.bindService();
this.runtime = runtime;
this.grpcCallbackExecutor = ExecutorsExt.instrumentedCachedThreadPool(runtime.getRegistry(), "grpcCallbackExecutor");
}
@Bean
@ConditionalOnMissingBean
public MetricCollectingServerInterceptor metricCollectingServerInterceptor(final MeterRegistry registry,
final Collection<BindableService> services) {
final MetricCollectingServerInterceptor metricCollector = new MetricCollectingServerInterceptor(registry);
log.debug("Pre-Registering service metrics");
for (final BindableService service : services) {
log.debug("- {}", service);
metricCollector.preregisterService(service);
}
return metricCollector;
}
@Override
public DropwizardServerBuilder addService(final BindableService bindableService) {
// TODO configure io.grpc.ServerInterceptor to collect dropwizard metrics
// TODO configure io.grpc.ServerInterceptor to send rpc call and exception events to logback
origin.addService(bindableService);
return this;
}
/**
* Adds a gRPC {@link BindableService} to this {@link GrpcServiceBuilder}. Most gRPC service
* implementations are {@link BindableService}s.
*/
public GrpcServiceBuilder addService(BindableService bindableService) {
if (bindableService instanceof ProtoReflectionService) {
checkState(protoReflectionServiceInterceptor == null,
"Attempting to add a ProtoReflectionService but one is already present. " +
"ProtoReflectionService must only be added once.");
protoReflectionServiceInterceptor = new ProtoReflectionServiceInterceptor();
return addService(ServerInterceptors.intercept(bindableService, protoReflectionServiceInterceptor));
}
return addService(bindableService.bindService());
}
private static UnframedGrpcService buildUnframedGrpcService(BindableService bindableService) {
return (UnframedGrpcService) GrpcService.builder()
.addService(bindableService)
.setMaxInboundMessageSizeBytes(MAX_MESSAGE_BYTES)
.setMaxOutboundMessageSizeBytes(MAX_MESSAGE_BYTES)
.supportedSerializationFormats(
GrpcSerializationFormats.values())
.enableUnframedRequests(true)
.build();
}