下面列出了io.grpc.protobuf.services.ProtoReflectionService#io.grpc.ServerInterceptors 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void start() throws IOException {
server = ServerBuilder.forPort(PORT)
.addService(ServerInterceptors.intercept(new GreeterImpl(), new HeaderServerInterceptor()))
.build()
.start();
logger.info("Server started, listening on " + PORT);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
CustomHeaderServer.this.stop();
System.err.println("*** server shut down");
}
});
}
@Before
public void setUp() throws Exception {
GreeterImplBase greeterImplBase =
new GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
responseObserver.onNext(HelloReply.getDefaultInstance());
responseObserver.onCompleted();
}
};
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder.forName(serverName).directExecutor()
.addService(ServerInterceptors.intercept(greeterImplBase, new HeaderServerInterceptor()))
.build().start());
// Create a client channel and register for automatic graceful shutdown.
channel =
grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
}
private void start() throws IOException {
server = ServerBuilder.forPort(PORT)
.addService(ServerInterceptors.intercept(new GreeterImpl(), new HeaderServerInterceptor()))
.build()
.start();
logger.info("Server started, listening on " + PORT);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
CustomHeaderServer.this.stop();
System.err.println("*** server shut down");
}
});
}
GrpcStartable(GrpcServerConfig serverConfig, Tracing tracing, BindableService... services) {
ServerBuilder<?> serverBuilder;
if (serverConfig.isSslEnable()) {
serverBuilder = NettyServerBuilder.forAddress(
new InetSocketAddress(serverConfig.getHost(), serverConfig.getPort()));
try {
((NettyServerBuilder) serverBuilder).sslContext(getSslContextBuilder(serverConfig).build());
} catch (SSLException e) {
throw new IllegalStateException("Unable to setup grpc to use SSL.", e);
}
} else {
serverBuilder = ServerBuilder.forPort(serverConfig.getPort());
}
// Arrays.stream(services).forEach(serverBuilder::addService);
// add interceptor for grpc server By Gannalyo
Arrays.stream(services).forEach(service ->
serverBuilder.addService(ServerInterceptors.intercept(service,
GrpcTracing.create(tracing).newServerInterceptor())));
server = serverBuilder.build();
}
/**
* 启动服务
* @throws Exception 异常
*/
public void start() throws Exception{
int port = grpcProperties.getPort();
if (serverInterceptor != null){
server = ServerBuilder.forPort(port).addService(ServerInterceptors.intercept(commonService, serverInterceptor)).build().start();
}else {
Class clazz = grpcProperties.getServerInterceptor();
if (clazz == null){
server = ServerBuilder.forPort(port).addService(commonService).build().start();
}else {
server = ServerBuilder.forPort(port).addService(ServerInterceptors.intercept(commonService, (ServerInterceptor) clazz.newInstance())).build().start();
}
}
log.info("gRPC Server started, listening on port " + server.getPort());
startDaemonAwaitThread();
}
/**
* Create a {@link Server} if one isn't already present in the context.
*
* @param port The port this server should listen on
* @param services The gRPC services this server should serve
* @param serverInterceptors The {@link ServerInterceptor} implementations that should be applied to all services
* @return A Netty server instance based on the provided information
*/
@Bean
@ConditionalOnMissingBean(Server.class)
public Server gRpcServer(
@Value("${grpc.server.port:0}") final int port, // TODO: finalize how to get configure this property
final Set<BindableService> services,
final List<ServerInterceptor> serverInterceptors
) {
final NettyServerBuilder builder = NettyServerBuilder.forPort(port);
// Add Service interceptors and add services to the server
services
.stream()
.map(BindableService::bindService)
.map(serviceDefinition -> ServerInterceptors.intercept(serviceDefinition, serverInterceptors))
.forEach(builder::addService);
return builder.build();
}
private void startServer(ServerCalls.UnaryMethod<String, String> method) {
try {
server = NettyServerBuilder.forPort(0)
.addService(ServerInterceptors.intercept(
ServerServiceDefinition.builder("service")
.addMethod(METHOD_DESCRIPTOR, ServerCalls.asyncUnaryCall(method))
.build(),
ConcurrencyLimitServerInterceptor.newBuilder(limiter)
.build())
)
.build()
.start();
channel = NettyChannelBuilder.forAddress("localhost", server.getPort())
.usePlaintext(true)
.build();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private ServerServiceDefinition bindInterceptors(final ServerServiceDefinition serviceDefinition,
final GrpcService grpcServiceAnnotation,
final GlobalServerInterceptorRegistry globalServerInterceptorRegistry) {
final List<ServerInterceptor> interceptors = Lists.newArrayList();
interceptors.addAll(globalServerInterceptorRegistry.getServerInterceptors());
for (final Class<? extends ServerInterceptor> interceptorClass : grpcServiceAnnotation.interceptors()) {
final ServerInterceptor serverInterceptor;
if (this.applicationContext.getBeanNamesForType(interceptorClass).length > 0) {
serverInterceptor = this.applicationContext.getBean(interceptorClass);
} else {
try {
serverInterceptor = interceptorClass.getConstructor().newInstance();
} catch (final Exception e) {
throw new BeanCreationException("Failed to create interceptor instance", e);
}
}
interceptors.add(serverInterceptor);
}
for (final String interceptorName : grpcServiceAnnotation.interceptorNames()) {
interceptors.add(this.applicationContext.getBean(interceptorName, ServerInterceptor.class));
}
if (grpcServiceAnnotation.sortInterceptors()) {
globalServerInterceptorRegistry.sortInterceptors(interceptors);
}
return ServerInterceptors.interceptForward(serviceDefinition, interceptors);
}
private void start() throws IOException {
server = ServerBuilder.forPort(PORT)
.addService(ServerInterceptors.intercept(new GreeterImpl(), new HeaderServerInterceptor()))
.build()
.start();
logger.info("Server started, listening on " + PORT);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
CustomHeaderServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}
@Test
public void exactTypeMatches() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
responseObserver.onError(new ArithmeticException("Divide by zero"));
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forExactType(ArithmeticException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.INTERNAL.getCode()), "is Status.INTERNAL")
.hasMessageContaining("Divide by zero");
}
@Test
public void parentTypeMatches() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
responseObserver.onError(new ArithmeticException("Divide by zero"));
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forParentType(RuntimeException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.INTERNAL.getCode()), "is Status.INTERNAL")
.hasMessageContaining("Divide by zero");
}
@Test
public void parentTypeMatchesExactly() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
responseObserver.onError(new RuntimeException("Divide by zero"));
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forParentType(RuntimeException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.INTERNAL.getCode()), "is Status.INTERNAL")
.hasMessageContaining("Divide by zero");
}
@Test
public void unknownTypeDoesNotMatch() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
responseObserver.onError(new NullPointerException("Bananas!"));
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forExactType(ArithmeticException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.UNKNOWN.getCode()), "is Status.UNKNOWN")
.hasMessageContaining("UNKNOWN");
}
@Test
public void unexpectedExceptionCanMatch() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
throw new ArithmeticException("Divide by zero");
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forExactType(ArithmeticException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.INTERNAL.getCode()), "is Status.INTERNAL")
.hasMessageContaining("Divide by zero");
}
@Test
public void unexpectedExceptionCanNotMatch() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
throw new ArithmeticException("Divide by zero");
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forExactType(NullPointerException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.UNKNOWN.getCode()), "is Status.UNKNOWN")
.hasMessageContaining("UNKNOWN");
}
@Test
public void unexpectedExceptionCanMatchStreaming() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHelloStream(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
responseObserver.onNext(HelloResponse.getDefaultInstance());
responseObserver.onNext(HelloResponse.getDefaultInstance());
throw new ArithmeticException("Divide by zero");
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forExactType(ArithmeticException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
Iterator<HelloResponse> it = stub.sayHelloStream(HelloRequest.newBuilder().setName("World").build());
it.next();
it.next();
assertThatThrownBy(it::next)
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.INTERNAL.getCode()), "is Status.INTERNAL")
.hasMessageContaining("Divide by zero");
}
@Test
public void interceptorShouldFreezeContext() {
TestService svc = new TestService();
// Plumbing
serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc,
new AmbientContextServerInterceptor("ctx-"),
new AmbientContextFreezeServerInterceptor()));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc
.newBlockingStub(serverRule.getChannel())
.withInterceptors(new AmbientContextClientInterceptor("ctx-"));
// Test
Metadata.Key<String> key = Metadata.Key.of("ctx-k", Metadata.ASCII_STRING_MARSHALLER);
AmbientContext.initialize(Context.current()).run(() -> {
AmbientContext.current().put(key, "value");
stub.sayHello(HelloRequest.newBuilder().setName("World").build());
});
assertThat(svc.frozen).isTrue();
}
@Before
public void setUp() throws Exception {
GreeterImplBase greeterImplBase =
new GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
responseObserver.onNext(HelloReply.getDefaultInstance());
responseObserver.onCompleted();
}
};
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder.forName(serverName).directExecutor()
.addService(ServerInterceptors.intercept(greeterImplBase, new HeaderServerInterceptor()))
.build().start());
// Create a client channel and register for automatic graceful shutdown.
channel =
grpcCleanup.register(InProcessChannelBuilder.forName(serverName).directExecutor().build());
}
@PostConstruct
public void start() {
if (started.getAndSet(true)) {
return;
}
this.server = configure(ServerBuilder.forPort(port).executor(grpcCallbackExecutor))
.addService(ServerInterceptors.intercept(
serviceDefinition,
createInterceptors(HealthGrpc.getServiceDescriptor())
))
.build();
logger.info("Starting {} on port {}.", getClass().getSimpleName(), configuration.getPort());
try {
this.server.start();
this.port = server.getPort();
} catch (final IOException e) {
throw new RuntimeException(e);
}
logger.info("Started {} on port {}.", getClass().getSimpleName(), port);
}
@Before
public void setUp() throws Exception {
this.sampleService = new SampleServiceImpl();
this.server = NettyServerBuilder.forPort(0)
.addService(ServerInterceptors.intercept(sampleService, new SampleContextServerInterceptor()))
.build()
.start();
this.channel = NettyChannelBuilder.forTarget("localhost:" + server.getPort())
.negotiationType(NegotiationType.PLAINTEXT)
.build();
this.client = ReactorToGrpcClientBuilder.newBuilder(SampleServiceReactorClient.class, SampleServiceGrpc.newStub(channel), SampleServiceGrpc.getServiceDescriptor(), SampleContext.class)
.withTimeout(TIMEOUT_DURATION)
.withStreamingTimeout(Duration.ofMillis(ReactorToGrpcClientBuilder.DEFAULT_STREAMING_TIMEOUT_MS))
.withGrpcStubDecorator(SampleContextServerInterceptor::attachClientContext)
.build();
}
private ServerServiceDefinition bindInterceptors(final ServerServiceDefinition serviceDefinition,
final GrpcService grpcServiceAnnotation,
final GlobalServerInterceptorRegistry globalServerInterceptorRegistry) {
final List<ServerInterceptor> interceptors = Lists.newArrayList();
interceptors.addAll(globalServerInterceptorRegistry.getServerInterceptors());
for (final Class<? extends ServerInterceptor> interceptorClass : grpcServiceAnnotation.interceptors()) {
final ServerInterceptor serverInterceptor;
if (this.applicationContext.getBeanNamesForType(interceptorClass).length > 0) {
serverInterceptor = this.applicationContext.getBean(interceptorClass);
} else {
try {
serverInterceptor = interceptorClass.getConstructor().newInstance();
} catch (final Exception e) {
throw new BeanCreationException("Failed to create interceptor instance", e);
}
}
interceptors.add(serverInterceptor);
}
for (final String interceptorName : grpcServiceAnnotation.interceptorNames()) {
interceptors.add(this.applicationContext.getBean(interceptorName, ServerInterceptor.class));
}
if (grpcServiceAnnotation.sortInterceptors()) {
globalServerInterceptorRegistry.sortInterceptors(interceptors);
}
return ServerInterceptors.interceptForward(serviceDefinition, interceptors);
}
/**
* Create gRPC server on the specified port.
*
* @param controllerService The controller service implementation.
* @param serverConfig The RPC Server config.
* @param requestTracker Cache to track and access to client request identifiers.
*/
public GRPCServer(ControllerService controllerService, GRPCServerConfig serverConfig, RequestTracker requestTracker) {
this.objectId = "gRPCServer";
this.config = serverConfig;
GrpcAuthHelper authHelper = new GrpcAuthHelper(serverConfig.isAuthorizationEnabled(),
serverConfig.getTokenSigningKey(), serverConfig.getAccessTokenTTLInSeconds());
ServerBuilder<?> builder = ServerBuilder
.forPort(serverConfig.getPort())
.addService(ServerInterceptors.intercept(new ControllerServiceImpl(controllerService, authHelper, requestTracker,
serverConfig.isReplyWithStackTraceOnError()),
RPCTracingHelpers.getServerInterceptor(requestTracker)));
if (serverConfig.isAuthorizationEnabled()) {
this.authHandlerManager = new AuthHandlerManager(serverConfig);
this.authHandlerManager.registerInterceptors(builder);
} else {
this.authHandlerManager = null;
}
if (serverConfig.isTlsEnabled() && !Strings.isNullOrEmpty(serverConfig.getTlsCertFile())) {
builder = builder.useTransportSecurity(new File(serverConfig.getTlsCertFile()),
new File(serverConfig.getTlsKeyFile()));
}
this.server = builder.build();
}
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.workerGroup(EventLoopGroups.newEventLoopGroup(1), true);
sb.maxRequestLength(0);
sb.serviceUnder("/",
GrpcService.builder()
.setMaxInboundMessageSizeBytes(MAX_MESSAGE_SIZE)
.addService(ServerInterceptors.intercept(
new UnitTestServiceImpl(),
REPLACE_EXCEPTION, ADD_TO_CONTEXT))
.enableUnframedRequests(true)
.supportedSerializationFormats(
GrpcSerializationFormats.values())
.useBlockingTaskExecutor(true)
.build()
.decorate(LoggingService.newDecorator())
.decorate((delegate, ctx, req) -> {
ctx.log().whenComplete().thenAccept(requestLogQueue::add);
return delegate.serve(ctx, req);
}));
}
public Server startServer() throws IOException {
ServerInterceptor headersInterceptor = new TracingMetadataUtils.ServerHeadersInterceptor();
NettyServerBuilder b =
NettyServerBuilder.forPort(workerOptions.listenPort)
.addService(ServerInterceptors.intercept(actionCacheServer, headersInterceptor))
.addService(ServerInterceptors.intercept(bsServer, headersInterceptor))
.addService(ServerInterceptors.intercept(casServer, headersInterceptor))
.addService(ServerInterceptors.intercept(capabilitiesServer, headersInterceptor));
if (workerOptions.tlsCertificate != null) {
b.sslContext(getSslContextBuilder(workerOptions).build());
}
if (execServer != null) {
b.addService(ServerInterceptors.intercept(execServer, headersInterceptor));
} else {
logger.atInfo().log("Execution disabled, only serving cache requests");
}
Server server = b.build();
logger.atInfo().log("Starting gRPC server on port %d", workerOptions.listenPort);
server.start();
return server;
}
@Before
public void setUp() throws IOException {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder.forName(serverName).directExecutor()
.addService(ServerInterceptors.intercept(
new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(
HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder()
.setMessage("AuthClientTest user=" + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
},
mockServerInterceptor))
.build().start());
CallCredentials credentials = new JwtCredential("test-client");
ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
client = new AuthClient(credentials, channel);
}
@Test
public void clientHeaderDeliveredToServer() throws Exception {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder.forName(serverName).directExecutor()
.addService(ServerInterceptors.intercept(new GreeterImplBase() {}, mockServerInterceptor))
.build().start());
// Create a client channel and register for automatic graceful shutdown.
ManagedChannel channel = grpcCleanup.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(
ClientInterceptors.intercept(channel, new HeaderClientInterceptor()));
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
try {
blockingStub.sayHello(HelloRequest.getDefaultInstance());
fail();
} catch (StatusRuntimeException expected) {
// expected because the method is not implemented at server side
}
verify(mockServerInterceptor).interceptCall(
Matchers.<ServerCall<HelloRequest, HelloReply>>any(),
metadataCaptor.capture(),
Matchers.<ServerCallHandler<HelloRequest, HelloReply>>any());
assertEquals(
"customRequestValue",
metadataCaptor.getValue().get(HeaderClientInterceptor.CUSTOM_HEADER_KEY));
}
@Before
public void setUp() throws Exception {
grpcServerRule
.getServiceRegistry()
.addService(
ServerInterceptors.intercept(greeterServiceImpl, injectCacheControlInterceptor));
grpcServerRule.getServiceRegistry().addService(anotherGreeterServiceImpl);
baseChannel = grpcServerRule.getChannel();
SafeMethodCachingInterceptor interceptor =
SafeMethodCachingInterceptor.newSafeMethodCachingInterceptor(cache);
channelToUse = ClientInterceptors.intercept(baseChannel, interceptor);
}
private void startServer() {
AbstractServerImplBuilder<?> builder = getServerBuilder();
if (builder == null) {
server = null;
return;
}
testServiceExecutor = Executors.newScheduledThreadPool(2);
List<ServerInterceptor> allInterceptors = ImmutableList.<ServerInterceptor>builder()
.add(recordServerCallInterceptor(serverCallCapture))
.add(TestUtils.recordRequestHeadersInterceptor(requestHeadersCapture))
.add(recordContextInterceptor(contextCapture))
.addAll(TestServiceImpl.interceptors())
.build();
builder
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(testServiceExecutor),
allInterceptors))
.addStreamTracerFactory(serverStreamTracerFactory);
io.grpc.internal.TestingAccessor.setStatsImplementation(
builder,
new CensusStatsModule(
tagger,
tagContextBinarySerializer,
serverStatsRecorder,
GrpcUtil.STOPWATCH_SUPPLIER,
true));
try {
server = builder.build().start();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@VisibleForTesting
void start() throws Exception {
executor = Executors.newSingleThreadScheduledExecutor();
SslContext sslContext = null;
if (useAlts) {
server =
AltsServerBuilder.forPort(port)
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(executor), TestServiceImpl.interceptors()))
.build()
.start();
} else {
if (useTls) {
sslContext =
GrpcSslContexts.forServer(
TestUtils.loadCert("server1.pem"), TestUtils.loadCert("server1.key"))
.build();
}
server =
NettyServerBuilder.forPort(port)
.sslContext(sslContext)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(executor), TestServiceImpl.interceptors()))
.build()
.start();
}
}
private void startServer(int serverFlowControlWindow) {
ServerBuilder<?> builder =
NettyServerBuilder.forAddress(new InetSocketAddress("localhost", 0))
.flowControlWindow(serverFlowControlWindow);
builder.addService(ServerInterceptors.intercept(
new TestServiceImpl(Executors.newScheduledThreadPool(2)),
ImmutableList.<ServerInterceptor>of()));
try {
server = builder.build().start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}