下面列出了怎么用io.grpc.ServerInterceptor的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Echo the request headers from a client into response headers and trailers. Useful for
* testing end-to-end metadata propagation.
*/
private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.merge(requestHeaders, keySet);
super.sendHeaders(responseHeaders);
}
@Override
public void close(Status status, Metadata trailers) {
trailers.merge(requestHeaders, keySet);
super.close(status, trailers);
}
}, requestHeaders);
}
};
}
/**
* Echoes request headers with the specified key(s) from a client into response headers only.
*/
private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.merge(requestHeaders, keySet);
super.sendHeaders(responseHeaders);
}
@Override
public void close(Status status, Metadata trailers) {
super.close(status, trailers);
}
}, requestHeaders);
}
};
}
/**
* Echoes request headers with the specified key(s) from a client into response trailers only.
*/
private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
super.sendHeaders(responseHeaders);
}
@Override
public void close(Status status, Metadata trailers) {
trailers.merge(requestHeaders, keySet);
super.close(status, trailers);
}
}, requestHeaders);
}
};
}
@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder.forPort(0)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.compressorRegistry(compressors)
.decompressorRegistry(decompressors)
.intercept(new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Listener<ReqT> listener = next.startCall(call, headers);
// TODO(carl-mastrangelo): check that encoding was set.
call.setMessageCompression(true);
return listener;
}
});
}
/**
* Wraps a {@link ServerMethodDefinition} such that it performs binary logging if needed.
*/
@Override
public final <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
ServerMethodDefinition<ReqT, RespT> oMethodDef) {
ServerInterceptor binlogInterceptor =
getServerInterceptor(oMethodDef.getMethodDescriptor().getFullMethodName());
if (binlogInterceptor == null) {
return oMethodDef;
}
MethodDescriptor<byte[], byte[]> binMethod =
BinaryLogProvider.toByteBufferMethod(oMethodDef.getMethodDescriptor());
ServerMethodDefinition<byte[], byte[]> binDef =
InternalServerInterceptors.wrapMethod(oMethodDef, binMethod);
ServerCallHandler<byte[], byte[]> binlogHandler =
InternalServerInterceptors.interceptCallHandlerCreate(
binlogInterceptor, binDef.getServerCallHandler());
return ServerMethodDefinition.create(binMethod, binlogHandler);
}
/**
* Construct a server.
*
* @param builder builder with configuration for server
* @param transportServer transport server that will create new incoming transports
* @param rootContext context that callbacks for new RPCs should be derived from
*/
ServerImpl(
AbstractServerImplBuilder<?> builder,
InternalServer transportServer,
Context rootContext) {
this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
this.fallbackRegistry =
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
// Fork from the passed in context so that it does not propagate cancellation, it only
// inherits values.
this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork();
this.decompressorRegistry = builder.decompressorRegistry;
this.compressorRegistry = builder.compressorRegistry;
this.transportFilters = Collections.unmodifiableList(
new ArrayList<>(builder.transportFilters));
this.interceptors =
builder.interceptors.toArray(new ServerInterceptor[builder.interceptors.size()]);
this.handshakeTimeoutMillis = builder.handshakeTimeoutMillis;
this.binlog = builder.binlog;
this.channelz = builder.channelz;
this.serverCallTracer = builder.callTracerFactory.create();
channelz.addServer(this);
}
/** Never returns {@code null}. */
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
Context.CancellableContext context, StatsTraceContext statsTraceCtx) {
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
statsTraceCtx.serverCallStarted(
new ServerCallInfoImpl<ReqT, RespT>(
methodDef.getMethodDescriptor(), // notify with original method descriptor
stream.getAttributes(),
stream.getAuthority()));
ServerCallHandler<ReqT, RespT> handler = methodDef.getServerCallHandler();
for (ServerInterceptor interceptor : interceptors) {
handler = InternalServerInterceptors.interceptCallHandler(interceptor, handler);
}
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
ServerMethodDefinition<?, ?> wMethodDef = binlog == null
? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context);
}
/**
* 启动服务
* @throws Exception 异常
*/
public void start() throws Exception{
int port = grpcProperties.getPort();
if (serverInterceptor != null){
server = ServerBuilder.forPort(port).addService(ServerInterceptors.intercept(commonService, serverInterceptor)).build().start();
}else {
Class clazz = grpcProperties.getServerInterceptor();
if (clazz == null){
server = ServerBuilder.forPort(port).addService(commonService).build().start();
}else {
server = ServerBuilder.forPort(port).addService(ServerInterceptors.intercept(commonService, (ServerInterceptor) clazz.newInstance())).build().start();
}
}
log.info("gRPC Server started, listening on port " + server.getPort());
startDaemonAwaitThread();
}
List<ServerInterceptor> getSortedInterceptors() {
if (interceptors.isUnsatisfied()) {
return Collections.emptyList();
}
return interceptors.stream().sorted(new Comparator<ServerInterceptor>() { // NOSONAR
@Override
public int compare(ServerInterceptor si1, ServerInterceptor si2) {
int p1 = 0;
int p2 = 0;
if (si1 instanceof Prioritized) {
p1 = ((Prioritized) si1).getPriority();
}
if (si2 instanceof Prioritized) {
p2 = ((Prioritized) si2).getPriority();
}
if (si1.equals(si2)) {
return 0;
}
return Integer.compare(p1, p2);
}
}).collect(Collectors.toList());
}
private ServerServiceDefinition bindInterceptors(final ServerServiceDefinition serviceDefinition,
final GrpcService grpcServiceAnnotation,
final GlobalServerInterceptorRegistry globalServerInterceptorRegistry) {
final List<ServerInterceptor> interceptors = Lists.newArrayList();
interceptors.addAll(globalServerInterceptorRegistry.getServerInterceptors());
for (final Class<? extends ServerInterceptor> interceptorClass : grpcServiceAnnotation.interceptors()) {
final ServerInterceptor serverInterceptor;
if (this.applicationContext.getBeanNamesForType(interceptorClass).length > 0) {
serverInterceptor = this.applicationContext.getBean(interceptorClass);
} else {
try {
serverInterceptor = interceptorClass.getConstructor().newInstance();
} catch (final Exception e) {
throw new BeanCreationException("Failed to create interceptor instance", e);
}
}
interceptors.add(serverInterceptor);
}
for (final String interceptorName : grpcServiceAnnotation.interceptorNames()) {
interceptors.add(this.applicationContext.getBean(interceptorName, ServerInterceptor.class));
}
if (grpcServiceAnnotation.sortInterceptors()) {
globalServerInterceptorRegistry.sortInterceptors(interceptors);
}
return ServerInterceptors.interceptForward(serviceDefinition, interceptors);
}
@Test
void testOrderingOfTheDefaultInterceptors() {
List<ServerInterceptor> expected = new ArrayList<>();
expected.add(this.applicationContext.getBean(GrpcRequestScope.class));
expected.add(this.applicationContext.getBean(MetricCollectingServerInterceptor.class));
expected.add(this.applicationContext.getBean(ExceptionTranslatingServerInterceptor.class));
expected.add(this.applicationContext.getBean(AuthenticatingServerInterceptor.class));
expected.add(this.applicationContext.getBean(AuthorizationCheckingServerInterceptor.class));
List<ServerInterceptor> actual = new ArrayList<>(this.registry.getServerInterceptors());
assertEquals(expected, actual);
Collections.shuffle(actual);
AnnotationAwareOrderComparator.sort(actual);
assertEquals(expected, actual);
}
@Test
public void noExceptionDoesNotInterfere() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build());
responseObserver.onCompleted();
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor();
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
stub.sayHello(HelloRequest.newBuilder().setName("World").build());
}
@Test
public void exactTypeMatches() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
responseObserver.onError(new ArithmeticException("Divide by zero"));
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forExactType(ArithmeticException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.INTERNAL.getCode()), "is Status.INTERNAL")
.hasMessageContaining("Divide by zero");
}
@Test
public void parentTypeMatches() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
responseObserver.onError(new ArithmeticException("Divide by zero"));
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forParentType(RuntimeException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.INTERNAL.getCode()), "is Status.INTERNAL")
.hasMessageContaining("Divide by zero");
}
@Test
public void parentTypeMatchesExactly() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
responseObserver.onError(new RuntimeException("Divide by zero"));
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forParentType(RuntimeException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.INTERNAL.getCode()), "is Status.INTERNAL")
.hasMessageContaining("Divide by zero");
}
@Test
public void alleMatches() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
responseObserver.onError(new ArithmeticException("Divide by zero"));
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forAllExceptions();
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.INTERNAL.getCode()), "is Status.INTERNAL")
.hasMessageContaining("Divide by zero");
}
@Test
public void unexpectedExceptionCanMatch() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
throw new ArithmeticException("Divide by zero");
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forExactType(ArithmeticException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.INTERNAL.getCode()), "is Status.INTERNAL")
.hasMessageContaining("Divide by zero");
}
@Test
public void unexpectedExceptionCanNotMatch() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
throw new ArithmeticException("Divide by zero");
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forExactType(NullPointerException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.UNKNOWN.getCode()), "is Status.UNKNOWN")
.hasMessageContaining("UNKNOWN");
}
@Test
public void unexpectedExceptionCanMatchStreaming() {
GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
@Override
public void sayHelloStream(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
responseObserver.onNext(HelloResponse.getDefaultInstance());
responseObserver.onNext(HelloResponse.getDefaultInstance());
throw new ArithmeticException("Divide by zero");
}
};
ServerInterceptor interceptor = new TransmitUnexpectedExceptionInterceptor().forExactType(ArithmeticException.class);
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, interceptor));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
Iterator<HelloResponse> it = stub.sayHelloStream(HelloRequest.newBuilder().setName("World").build());
it.next();
it.next();
assertThatThrownBy(it::next)
.isInstanceOf(StatusRuntimeException.class)
.matches(sre -> ((StatusRuntimeException) sre).getStatus().getCode().equals(Status.INTERNAL.getCode()), "is Status.INTERNAL")
.hasMessageContaining("Divide by zero");
}
public TitusGrpcServer build() {
Preconditions.checkArgument(port >= 0, "Port number is negative");
Preconditions.checkNotNull(titusRuntime, "TitusRuntime not set");
List<ServerInterceptor> commonInterceptors = new ArrayList<>();
commonInterceptors.add(new CommonErrorCatchingServerInterceptor(new GrpcExceptionMapper(serviceExceptionMappers)));
GrpcFitInterceptor.getIfFitEnabled(titusRuntime).ifPresent(commonInterceptors::add);
commonInterceptors.addAll(interceptors);
ServerBuilder serverBuilder = ServerBuilder.forPort(port);
if (serverConfigurer != null) {
serverBuilder = serverConfigurer.apply(serverBuilder);
}
for (ServiceBuilder serviceBuilder : serviceBuilders.values()) {
serverBuilder.addService(serviceBuilder.build(commonInterceptors));
}
return new TitusGrpcServer(serverBuilder.build(), shutdownTime);
}
private ServerServiceDefinition bindInterceptors(final ServerServiceDefinition serviceDefinition,
final GrpcService grpcServiceAnnotation,
final GlobalServerInterceptorRegistry globalServerInterceptorRegistry) {
final List<ServerInterceptor> interceptors = Lists.newArrayList();
interceptors.addAll(globalServerInterceptorRegistry.getServerInterceptors());
for (final Class<? extends ServerInterceptor> interceptorClass : grpcServiceAnnotation.interceptors()) {
final ServerInterceptor serverInterceptor;
if (this.applicationContext.getBeanNamesForType(interceptorClass).length > 0) {
serverInterceptor = this.applicationContext.getBean(interceptorClass);
} else {
try {
serverInterceptor = interceptorClass.getConstructor().newInstance();
} catch (final Exception e) {
throw new BeanCreationException("Failed to create interceptor instance", e);
}
}
interceptors.add(serverInterceptor);
}
for (final String interceptorName : grpcServiceAnnotation.interceptorNames()) {
interceptors.add(this.applicationContext.getBean(interceptorName, ServerInterceptor.class));
}
if (grpcServiceAnnotation.sortInterceptors()) {
globalServerInterceptorRegistry.sortInterceptors(interceptors);
}
return ServerInterceptors.interceptForward(serviceDefinition, interceptors);
}
@Test
void testOrderingOfTheDefaultInterceptors() {
List<ServerInterceptor> expected = new ArrayList<>();
expected.add(this.applicationContext.getBean(GrpcRequestScope.class));
expected.add(this.applicationContext.getBean(MetricCollectingServerInterceptor.class));
expected.add(this.applicationContext.getBean(ExceptionTranslatingServerInterceptor.class));
expected.add(this.applicationContext.getBean(AuthenticatingServerInterceptor.class));
expected.add(this.applicationContext.getBean(AuthorizationCheckingServerInterceptor.class));
List<ServerInterceptor> actual = new ArrayList<>(this.registry.getServerInterceptors());
assertEquals(expected, actual);
Collections.shuffle(actual);
AnnotationAwareOrderComparator.sort(actual);
assertEquals(expected, actual);
}
private ServerInterceptor mutualTLSInterceptor(byte[] expectedClientCert, AtomicBoolean toggleHandshakeOccured) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
SSLSession sslSession = serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
try {
javax.security.cert.X509Certificate[] certChain = sslSession.getPeerCertificateChain();
Assert.assertFalse("Client didn't send TLS certificate", certChain == null || certChain.length == 0);
byte[] clientRawCert = certChain[0].getEncoded();
// Ensure the client TLS cert matches the expected one - the one it was created with
boolean equalCerts = Arrays.equals(clientRawCert, expectedClientCert);
Assert.assertTrue("Expected certificate doesn't match actual", equalCerts);
toggleHandshakeOccured.set(true);
} catch (Exception e) {
Assert.fail(String.format("Uncaught exception: %s", e.toString()));
e.printStackTrace();
}
return serverCallHandler.startCall(serverCall, metadata);
}
};
}
private void startServer() {
AbstractServerImplBuilder<?> builder = getServerBuilder();
if (builder == null) {
server = null;
return;
}
testServiceExecutor = Executors.newScheduledThreadPool(2);
List<ServerInterceptor> allInterceptors = ImmutableList.<ServerInterceptor>builder()
.add(recordServerCallInterceptor(serverCallCapture))
.add(TestUtils.recordRequestHeadersInterceptor(requestHeadersCapture))
.add(recordContextInterceptor(contextCapture))
.addAll(TestServiceImpl.interceptors())
.build();
builder
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(testServiceExecutor),
allInterceptors))
.addStreamTracerFactory(serverStreamTracerFactory);
io.grpc.internal.TestingAccessor.setStatsImplementation(
builder,
new CensusStatsModule(
tagger,
tagContextBinarySerializer,
serverStatsRecorder,
GrpcUtil.STOPWATCH_SUPPLIER,
true));
try {
server = builder.build().start();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
/**
* Captures the request attributes. Useful for testing ServerCalls.
* {@link ServerCall#getAttributes()}
*/
private static ServerInterceptor recordServerCallInterceptor(
final AtomicReference<ServerCall<?, ?>> serverCallCapture) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
serverCallCapture.set(call);
return next.startCall(call, requestHeaders);
}
};
}
private static ServerInterceptor recordContextInterceptor(
final AtomicReference<Context> contextCapture) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
contextCapture.set(Context.current());
return next.startCall(call, requestHeaders);
}
};
}
/** Returns interceptors necessary for full service implementation. */
public static List<ServerInterceptor> interceptors() {
return Arrays.asList(
echoRequestHeadersInterceptor(Util.METADATA_KEY),
echoRequestMetadataInHeaders(Util.ECHO_INITIAL_METADATA_KEY),
echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY));
}
private void startServer(int serverFlowControlWindow) {
ServerBuilder<?> builder =
NettyServerBuilder.forAddress(new InetSocketAddress("localhost", 0))
.flowControlWindow(serverFlowControlWindow);
builder.addService(ServerInterceptors.intercept(
new TestServiceImpl(Executors.newScheduledThreadPool(2)),
ImmutableList.<ServerInterceptor>of()));
try {
server = builder.build().start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Nullable
@Override
public ServerInterceptor getServerInterceptor(String fullMethodName) {
BinlogHelper helperForMethod = factory.getLog(fullMethodName);
if (helperForMethod == null) {
return null;
}
return helperForMethod.getServerInterceptor(counter.getAndIncrement());
}
/**
* Capture the request headers from a client. Useful for testing metadata propagation.
*/
public static ServerInterceptor recordRequestHeadersInterceptor(
final AtomicReference<Metadata> headersCapture) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
headersCapture.set(requestHeaders);
return next.startCall(call, requestHeaders);
}
};
}