下面列出了怎么用io.grpc.ServerServiceDefinition的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void serviceAdded(ServiceConfig cfg) {
if (maxInboundMessageSizeBytes == ArmeriaMessageDeframer.NO_MAX_INBOUND_MESSAGE_SIZE) {
maxInboundMessageSizeBytes = (int) Math.min(cfg.maxRequestLength(), Integer.MAX_VALUE);
}
if (protoReflectionServiceInterceptor != null) {
final Map<String, ServerServiceDefinition> grpcServices =
cfg.server().config().virtualHosts().stream()
.flatMap(host -> host.serviceConfigs().stream())
.map(serviceConfig -> serviceConfig.service().as(FramedGrpcService.class))
.filter(Objects::nonNull)
.flatMap(service -> service.services().stream())
// Armeria allows the same service to be registered multiple times at different
// paths, but proto reflection service only supports a single instance of each
// service so we dedupe here.
.collect(toImmutableMap(def -> def.getServiceDescriptor().getName(),
Function.identity(),
(a, b) -> a));
protoReflectionServiceInterceptor.setServer(newDummyServer(grpcServices));
}
}
@Override
public ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition
.builder(GreeterGrpc.getServiceDescriptor().getName())
.addMethod(HelloJsonClient.HelloJsonStub.METHOD_SAY_HELLO,
asyncUnaryCall(
new UnaryMethod<HelloRequest, HelloReply>() {
@Override
public void invoke(
HelloRequest request, StreamObserver<HelloReply> responseObserver) {
sayHello(request, responseObserver);
}
}))
.build();
}
@Test
public void handlerRegistryPriorities() throws Exception {
fallbackRegistry = mock(HandlerRegistry.class);
builder.addService(
ServerServiceDefinition.builder(new ServiceDescriptor("Waiter", METHOD))
.addMethod(METHOD, callHandler).build());
transportServer = new SimpleServer();
createAndStartServer();
ServerTransportListener transportListener
= transportServer.registerNewServerTransport(new SimpleServerTransport());
transportListener.transportReady(Attributes.EMPTY);
Metadata requestHeaders = new Metadata();
StatsTraceContext statsTraceCtx =
StatsTraceContext.newServerContext(streamTracerFactories, "Waiter/serve", requestHeaders);
when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
// This call will be handled by callHandler from the internal registry
transportListener.streamCreated(stream, "Waiter/serve", requestHeaders);
assertEquals(1, executor.runDueTasks());
verify(callHandler).startCall(Matchers.<ServerCall<String, Integer>>anyObject(),
Matchers.<Metadata>anyObject());
// This call will be handled by the fallbackRegistry because it's not registred in the internal
// registry.
transportListener.streamCreated(stream, "Service1/Method2", requestHeaders);
assertEquals(1, executor.runDueTasks());
verify(fallbackRegistry).lookupMethod("Service1/Method2", AUTHORITY);
verifyNoMoreInteractions(callHandler);
verifyNoMoreInteractions(fallbackRegistry);
}
/**
* Starts the server with given transport params.
*
* @param name UDS pathname or server name for {@link InProcessServerBuilder}
* @param useUds creates a UDS based server if true.
* @param useInterceptor if true, uses {@link SdsServerInterceptor} to grab & save Jwt Token.
*/
void startServer(String name, boolean useUds, boolean useInterceptor) throws IOException {
checkNotNull(name, "name");
discoveryService = new SecretDiscoveryServiceImpl();
ServerServiceDefinition serviceDefinition = discoveryService.bindService();
if (useInterceptor) {
serviceDefinition =
ServerInterceptors.intercept(serviceDefinition, new SdsServerInterceptor());
}
if (useUds) {
elg = new EpollEventLoopGroup();
boss = new EpollEventLoopGroup(1);
server =
NettyServerBuilder.forAddress(new DomainSocketAddress(name))
.bossEventLoopGroup(boss)
.workerEventLoopGroup(elg)
.channelType(EpollServerDomainSocketChannel.class)
.addService(serviceDefinition)
.directExecutor()
.build()
.start();
} else {
server =
InProcessServerBuilder.forName(name)
.addService(serviceDefinition)
.directExecutor()
.build()
.start();
}
}
@Test
public void blockingServerStreamingCall_interruptedWaitsForOnClose() throws Exception {
Integer req = 2;
class NoopServerStreamingMethod implements ServerStreamingMethod<Integer, Integer> {
ServerCallStreamObserver<Integer> observer;
@Override public void invoke(Integer request, StreamObserver<Integer> responseObserver) {
observer = (ServerCallStreamObserver<Integer>) responseObserver;
}
}
NoopServerStreamingMethod methodImpl = new NoopServerStreamingMethod();
server = InProcessServerBuilder.forName("noop").directExecutor()
.addService(ServerServiceDefinition.builder("some")
.addMethod(SERVER_STREAMING_METHOD, ServerCalls.asyncServerStreamingCall(methodImpl))
.build())
.build().start();
InterruptInterceptor interceptor = new InterruptInterceptor();
channel = InProcessChannelBuilder.forName("noop")
.directExecutor()
.intercept(interceptor)
.build();
Iterator<Integer> iter = ClientCalls.blockingServerStreamingCall(
channel.newCall(SERVER_STREAMING_METHOD, CallOptions.DEFAULT), req);
try {
iter.next();
fail();
} catch (StatusRuntimeException ex) {
assertTrue(Thread.interrupted());
assertTrue("interrupted", ex.getCause() instanceof InterruptedException);
}
assertTrue("onCloseCalled", interceptor.onCloseCalled);
assertTrue("context not cancelled", methodImpl.observer.isCancelled());
}
@PostConstruct
public void start() {
checkState(server == null);
try {
ServerBuilder builder = ServerBuilder.forPort(port);
for (ServerServiceDefinition serviceDefinition : serviceDefinitions) {
LOG.info("Starting {} service on port {}.", serviceDefinition.getServiceDescriptor().getName(), port);
builder.addService(serviceDefinition);
}
server = builder.build().start();
LOG.info("Server started.");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private ServerServiceDefinition bindService() {
final ServerServiceDefinition.Builder builder =
ServerServiceDefinition.builder(GrpcRpcProtocol.SERVICE);
for (final GrpcEndpointHandle<?, ?> spec : container.getEndpoints()) {
final ServerCallHandler<byte[], byte[]> handler =
serverCallHandlerFor((GrpcEndpointHandle<Object, Object>) spec);
builder.addMethod(spec.descriptor(), handler);
}
return builder.build();
}
@Override
public ServerServiceDefinition getServiceDefinition() {
return serviceImpl.bindService();
}
@Override
public ServerServiceDefinition getServiceDefinition() {
return serviceImpl.bindService();
}
@Override
public ServerServiceDefinition getServiceDefinition() {
return serviceImpl.bindService();
}
/**
* 测试注册新服务
*
* @author sxp
* @since 2019/7/10
*/
private static void testRegisterNewService(RegistryServiceTest testServer) throws Exception {
logger.info("sleep 15 seconds...");
TimeUnit.SECONDS.sleep(15);
logger.info("测试注册新服务...");
String ip = IpUtils.getIP4WithPriority();
Provider provider = new Provider();
Map<String, String> parameters;
URL queryUrl;
List<URL> urls;
Server server = testServer.getServer();
int port = server.getPort();
logger.info("开始注册服务...");
// 构造入参ServerServiceDefinition
GreeterGrpc.GreeterImplBase greeterImpl = new GreeterPartImpl();
ServerServiceDefinition serviceDefinition = greeterImpl.bindService();
// 调用注册新服务的接口
BusinessResult result = Registry.registerNewService(server, serviceDefinition);
String serviceName = serviceDefinition.getServiceDescriptor().getName();
if (result.isSuccess()) {
logger.info("注册新服务[" + serviceName + "]成功");
} else {
logger.info("注册新服务[" + serviceName + "]失败," + result.getMessage());
}
if (result.isSuccess()) {
parameters = new HashMap<>(MapUtils.capacity(2));
parameters.put(GlobalConstants.Consumer.Key.INTERFACE, serviceName);
parameters.put(GlobalConstants.CommonKey.CATEGORY, RegistryConstants.PROVIDERS_CATEGORY);
queryUrl = new URL(RegistryConstants.GRPC_PROTOCOL, ip, port, parameters);
urls = provider.lookup(queryUrl);
logger.info("服务注册信息为[" + urls.get(0).toString() + "]");
}
logger.info("测试注册新服务: work done.");
}
@Override
public ServerServiceDefinition getServiceDefinition() {
return serviceImpl.bindService();
}
@Override
public ServerServiceDefinition getServiceDefinition() {
return serviceImpl.bindService();
}
public ServerReflectionIndex(
List<ServerServiceDefinition> immutableServices,
List<ServerServiceDefinition> mutableServices) {
immutableServicesIndex = new FileDescriptorIndex(immutableServices);
mutableServicesIndex = new FileDescriptorIndex(mutableServices);
}
@Override
public ServerServiceDefinition getServiceDefinition() {
return serviceImpl.bindService();
}
@Override
public ServerServiceDefinition getServiceDefinition() {
return serviceImpl.bindService();
}
@Override
public ServerServiceDefinition getServiceDefinition() {
return serviceImpl.bindService();
}
@Override
protected ServerServiceDefinition newServerServiceDefinition() {
BindableService spanService = new StatService(dispatchHandler, serverRequestFactory);
return spanService.bindService();
}
@Override
public ServerServiceDefinition getServiceDefinition() {
return serviceImpl.bindService();
}
@Override
public ServerServiceDefinition getServiceDefinition() {
return serviceImpl.bindService();
}
@Override
protected ServerServiceDefinition newServerServiceDefinition() {
BindableService spanService = new SpanService(dispatchHandler, serverRequestFactory);
return spanService.bindService();
}
@Override
public ServerServiceDefinition getServiceDefinition() {
return serviceImpl.bindService();
}
private ServiceBuilder(ServerServiceDefinition serviceDefinition, List<ServerInterceptor> interceptors) {
this.serviceDefinition = serviceDefinition;
this.interceptors = interceptors;
}
@Override
public ServerServiceDefinition getServiceDefinition() {
return serviceImpl.bindService();
}
@Override
public void addHandler(ServerServiceDefinition definition) {
}
@Override
public ServerServiceDefinition getServiceDefinition() {
return serviceImpl.bindService();
}
@Override
public List<ServerServiceDefinition> getImmutableServices() {
return delegate.getImmutableServices();
}
public void addService(ServerServiceDefinition serverServiceDefinition) {
Assert.requireNonNull(serverServiceDefinition, "serverServiceDefinition");
this.bindableServices.add(serverServiceDefinition);
}
@Override
public List<ServerServiceDefinition> getServices() {
return Collections.emptyList();
}
@Override
public final T addService(ServerServiceDefinition service) {
registryBuilder.addService(service);
return thisT();
}