下面列出了怎么用io.grpc.ServerCallHandler的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata headers) {
for (String cacheControlDirective : cacheControlDirectives) {
headers.put(CACHE_CONTROL_METADATA_KEY, cacheControlDirective);
}
super.sendHeaders(headers);
}
},
requestHeaders);
}
/**
* Echo the request headers from a client into response headers and trailers. Useful for
* testing end-to-end metadata propagation.
*/
private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.merge(requestHeaders, keySet);
super.sendHeaders(responseHeaders);
}
@Override
public void close(Status status, Metadata trailers) {
trailers.merge(requestHeaders, keySet);
super.close(status, trailers);
}
}, requestHeaders);
}
};
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
logMethod(REQUEST, call.getMethodDescriptor());
logHeaders(REQUEST, headers);
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
logMethod(RESPONSE, call.getMethodDescriptor());
logHeaders(RESPONSE, responseHeaders);
super.sendHeaders(responseHeaders);
}
@Override
public void sendMessage(RespT message) {
logMessage(RESPONSE, message);
super.sendMessage(message);
}
}, headers)) {
@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder.forPort(0)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.compressorRegistry(compressors)
.decompressorRegistry(decompressors)
.intercept(new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Listener<ReqT> listener = next.startCall(call, headers);
// TODO(carl-mastrangelo): check that encoding was set.
call.setMessageCompression(true);
return listener;
}
});
}
/**
* Wraps a {@link ServerMethodDefinition} such that it performs binary logging if needed.
*/
@Override
public final <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
ServerMethodDefinition<ReqT, RespT> oMethodDef) {
ServerInterceptor binlogInterceptor =
getServerInterceptor(oMethodDef.getMethodDescriptor().getFullMethodName());
if (binlogInterceptor == null) {
return oMethodDef;
}
MethodDescriptor<byte[], byte[]> binMethod =
BinaryLogProvider.toByteBufferMethod(oMethodDef.getMethodDescriptor());
ServerMethodDefinition<byte[], byte[]> binDef =
InternalServerInterceptors.wrapMethod(oMethodDef, binMethod);
ServerCallHandler<byte[], byte[]> binlogHandler =
InternalServerInterceptors.interceptCallHandlerCreate(
binlogInterceptor, binDef.getServerCallHandler());
return ServerMethodDefinition.create(binMethod, binlogHandler);
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
if (SupervisorServiceGrpc.getServiceDescriptor().getMethods().contains(call.getMethodDescriptor())) {
// Supervisor API calls are not restricted to the active leader.
return next.startCall(call, headers);
}
if (leaderActivator.isLeader()) {
if (leaderActivator.isActivated()) {
return next.startCall(call, headers);
} else {
call.close(Status.UNAVAILABLE.withDescription("Titus Master is initializing and not yet available."), new Metadata());
return new ServerCall.Listener<ReqT>() {
};
}
} else {
call.close(Status.UNAVAILABLE.withDescription("Titus Master is not leader."), new Metadata());
return new ServerCall.Listener<ReqT>() {
};
}
}
/** Never returns {@code null}. */
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
Context.CancellableContext context, StatsTraceContext statsTraceCtx) {
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
statsTraceCtx.serverCallStarted(
new ServerCallInfoImpl<ReqT, RespT>(
methodDef.getMethodDescriptor(), // notify with original method descriptor
stream.getAttributes(),
stream.getAuthority()));
ServerCallHandler<ReqT, RespT> handler = methodDef.getServerCallHandler();
for (ServerInterceptor interceptor : interceptors) {
handler = InternalServerInterceptors.interceptCallHandler(interceptor, handler);
}
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
ServerMethodDefinition<?, ?> wMethodDef = binlog == null
? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context);
}
@Test
public void cannotDisableAutoFlowControlAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<ServerCallStreamObserver<Integer>>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
callObserver.set((ServerCallStreamObserver<Integer>) responseObserver);
return new ServerCalls.NoopStreamObserver<Integer>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().disableAutoInboundFlowControl();
fail("Cannot set onCancel handler after service invocation");
} catch (IllegalStateException expected) {
// Expected
}
}
@Test
public void disablingInboundAutoFlowControlSuppressesRequestsForMoreMessages() throws Exception {
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.disableAutoInboundFlowControl();
return new ServerCalls.NoopStreamObserver<Integer>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onReady();
// Transport should not call this if nothing has been requested but forcing it here
// to verify that message delivery does not trigger a call to request(1).
callListener.onMessage(1);
// Should never be called
assertThat(serverCall.requestCalls).isEmpty();
}
@Test
public void disablingInboundAutoFlowControlForUnaryHasNoEffect() throws Exception {
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncUnaryCall(
new ServerCalls.UnaryMethod<Integer, Integer>() {
@Override
public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.disableAutoInboundFlowControl();
}
});
callHandler.startCall(serverCall, new Metadata());
// Auto inbound flow-control always requests 2 messages for unary to detect a violation
// of the unary semantic.
assertThat(serverCall.requestCalls).containsExactly(2);
}
@Test
public void clientSendsOne_errorMissingRequest_unary() {
ServerCallRecorder serverCall = new ServerCallRecorder(UNARY_METHOD);
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncUnaryCall(
new ServerCalls.UnaryMethod<Integer, Integer>() {
@Override
public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
fail("should not be reached");
}
});
ServerCall.Listener<Integer> listener = callHandler.startCall(serverCall, new Metadata());
listener.onHalfClose();
assertThat(serverCall.responses).isEmpty();
assertEquals(Status.Code.INTERNAL, serverCall.status.getCode());
assertEquals(ServerCalls.MISSING_REQUEST, serverCall.status.getDescription());
}
@Test
public void clientSendsOne_errorMissingRequest_serverStreaming() {
ServerCallRecorder serverCall = new ServerCallRecorder(SERVER_STREAMING_METHOD);
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncServerStreamingCall(
new ServerCalls.ServerStreamingMethod<Integer, Integer>() {
@Override
public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
fail("should not be reached");
}
});
ServerCall.Listener<Integer> listener = callHandler.startCall(serverCall, new Metadata());
listener.onHalfClose();
assertThat(serverCall.responses).isEmpty();
assertEquals(Status.Code.INTERNAL, serverCall.status.getCode());
assertEquals(ServerCalls.MISSING_REQUEST, serverCall.status.getDescription());
}
@Test
public void clientSendsOne_errorTooManyRequests_unary() {
ServerCallRecorder serverCall = new ServerCallRecorder(UNARY_METHOD);
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncUnaryCall(
new ServerCalls.UnaryMethod<Integer, Integer>() {
@Override
public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
fail("should not be reached");
}
});
ServerCall.Listener<Integer> listener = callHandler.startCall(serverCall, new Metadata());
listener.onMessage(1);
listener.onMessage(1);
assertThat(serverCall.responses).isEmpty();
assertEquals(Status.Code.INTERNAL, serverCall.status.getCode());
assertEquals(ServerCalls.TOO_MANY_REQUESTS, serverCall.status.getDescription());
// ensure onHalfClose does not invoke
listener.onHalfClose();
}
@Test
public void clientSendsOne_errorTooManyRequests_serverStreaming() {
ServerCallRecorder serverCall = new ServerCallRecorder(SERVER_STREAMING_METHOD);
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncServerStreamingCall(
new ServerCalls.ServerStreamingMethod<Integer, Integer>() {
@Override
public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
fail("should not be reached");
}
});
ServerCall.Listener<Integer> listener = callHandler.startCall(serverCall, new Metadata());
listener.onMessage(1);
listener.onMessage(1);
assertThat(serverCall.responses).isEmpty();
assertEquals(Status.Code.INTERNAL, serverCall.status.getCode());
assertEquals(ServerCalls.TOO_MANY_REQUESTS, serverCall.status.getDescription());
// ensure onHalfClose does not invoke
listener.onHalfClose();
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
if (Objects.isNull(SecurityContextHolder.getContext().getAuthentication())) {
SecurityContextHolder.getContext().setAuthentication(new AnonymousAuthenticationToken(key,
"anonymousUser", Collections.singletonList(new SimpleGrantedAuthority("ROLE_ANONYMOUS"))));
log.debug("Populated SecurityContextHolder with anonymous token: {}",
SecurityContextHolder.getContext().getAuthentication());
} else {
log.debug("SecurityContextHolder not populated with anonymous token, as it already contained: {}",
SecurityContextHolder.getContext().getAuthentication());
}
return next.startCall(call, headers);
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
long startCallMillis = System.currentTimeMillis();
String fullMethodName = call.getMethodDescriptor().getFullMethodName();
String methodName = fullMethodName.substring(fullMethodName.indexOf("/") + 1);
return next.startCall(
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
Metrics.requestLatency
.labels(methodName)
.observe((System.currentTimeMillis() - startCallMillis) / 1000f);
Metrics.grpcRequestCount.labels(methodName, status.getCode().name()).inc();
super.close(status, trailers);
}
},
headers);
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
long startCallMillis = System.currentTimeMillis();
String fullMethodName = call.getMethodDescriptor().getFullMethodName();
String serviceName = MethodDescriptor.extractFullServiceName(fullMethodName);
String methodName = fullMethodName.substring(fullMethodName.indexOf("/") + 1);
return next.startCall(
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
GrpcMetrics.requestLatency
.labels(serviceName, methodName, status.getCode().name())
.observe((System.currentTimeMillis() - startCallMillis) / 1000f);
super.close(status, trailers);
}
},
headers);
}
private ServerServiceDefinition buildSofaServiceDef(GenericServiceImpl genericService,
ProviderConfig providerConfig, Invoker instance) {
ServerServiceDefinition templateDefinition = genericService.bindService();
ServerCallHandler<Request, Response> templateHandler = (ServerCallHandler<Request, Response>) templateDefinition
.getMethods().iterator().next().getServerCallHandler();
List<MethodDescriptor<Request, Response>> methodDescriptor = getMethodDescriptor(providerConfig);
List<ServerMethodDefinition<Request, Response>> methodDefs = getMethodDefinitions(templateHandler,
methodDescriptor);
ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(getServiceDescriptor(
templateDefinition, providerConfig, methodDescriptor));
for (ServerMethodDefinition<Request, Response> methodDef : methodDefs) {
builder.addMethod(methodDef);
}
return builder.build();
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
if (!fitRequestHandler.isPresent()) {
return next.startCall(call, headers);
}
FitInjection fitInjection = fitRequestHandler.get();
String injectionPoint = call.getMethodDescriptor().getFullMethodName();
// Request failure
try {
fitInjection.beforeImmediate(injectionPoint);
} catch (Exception e) {
call.close(Status.UNAVAILABLE.withDescription("FIT server failure"), new Metadata());
return new ServerCall.Listener<ReqT>() {
};
}
// Increased latency.
return fitInjection.findAction(FitLatencyAction.ACTION_ID)
.map(action -> {
int latencyMs = ExceptionExt.doTry(() -> Integer.parseInt(action.getProperties().get("latency"))).orElse(100);
return new LatencyHandler<>(call, headers, next, latencyMs).getLatencyListener();
})
.orElse(next.startCall(call, headers));
}
@SuppressWarnings("unchecked")
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call, final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
final MethodDescriptor<ReqT, RespT> methodDescriptor = call.getMethodDescriptor();
final InterceptorStatusToken token;
try {
token = beforeInvocation(methodDescriptor);
} catch (final AuthenticationException | AccessDeniedException e) {
log.debug("Access denied");
throw e;
}
log.debug("Access granted");
final Listener<ReqT> result;
try {
result = next.startCall(call, headers);
} finally {
finallyInvocation(token);
}
// TODO: Call that here or in onHalfClose?
return (Listener<ReqT>) afterInvocation(token, result);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
ServerCall<ReqT, RespT> wrappedCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendMessage(RespT message) {
super.sendMessage(message);
}
@Override
public void close(Status status, Metadata trailers) {
Throwable exception;
Status newStatus;
if (
status.getCode() == Status.Code.UNKNOWN
&& status.getDescription() == null
&& (exception = status.getCause()) != null
&& (newStatus = statusForException(exception)) != null
) {
status = newStatus
.withCause(exception)
.withDescription(stacktraceToString(exception));
}
super.close(status, trailers);
}
};
return next.startCall(wrappedCall, headers);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
logger.info("header received from client:" + requestHeaders);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(CUSTOM_HEADER_KEY, "customRespondValue");
super.sendHeaders(responseHeaders);
}
}, requestHeaders);
}
@Test
public void clientHeaderDeliveredToServer() throws Exception {
// Generate a unique in-process server name.
String serverName = InProcessServerBuilder.generateName();
// Create a server, add service, start, and register for automatic graceful shutdown.
grpcCleanup.register(InProcessServerBuilder.forName(serverName).directExecutor()
.addService(ServerInterceptors.intercept(new GreeterImplBase() {}, mockServerInterceptor))
.build().start());
// Create a client channel and register for automatic graceful shutdown.
ManagedChannel channel = grpcCleanup.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(
ClientInterceptors.intercept(channel, new HeaderClientInterceptor()));
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
try {
blockingStub.sayHello(HelloRequest.getDefaultInstance());
fail();
} catch (StatusRuntimeException expected) {
// expected because the method is not implemented at server side
}
verify(mockServerInterceptor).interceptCall(
Matchers.<ServerCall<HelloRequest, HelloReply>>any(),
metadataCaptor.capture(),
Matchers.<ServerCallHandler<HelloRequest, HelloReply>>any());
assertEquals(
"customRequestValue",
metadataCaptor.getValue().get(HeaderClientInterceptor.CUSTOM_HEADER_KEY));
}
/**
* Captures the request attributes. Useful for testing ServerCalls.
* {@link ServerCall#getAttributes()}
*/
private static ServerInterceptor recordServerCallInterceptor(
final AtomicReference<ServerCall<?, ?>> serverCallCapture) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
serverCallCapture.set(call);
return next.startCall(call, requestHeaders);
}
};
}
private static ServerInterceptor recordContextInterceptor(
final AtomicReference<Context> contextCapture) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
contextCapture.set(Context.current());
return next.startCall(call, requestHeaders);
}
};
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
RequestMetadata meta = requestMetadataFromHeaders(headers);
if (meta == null) {
meta = RequestMetadata.getDefaultInstance();
}
Context ctx = Context.current().withValue(CONTEXT_KEY, meta);
return Contexts.interceptCall(ctx, call, headers, next);
}
@Override
public <ReqT, RespT> io.grpc.ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
if (serverEncoding) {
call.setCompression("fzip");
}
call.setMessageCompression(enableServerMessageCompression);
Metadata headersCopy = new Metadata();
headersCopy.merge(headers);
serverResponseHeaders = headersCopy;
return next.startCall(call, headers);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
logger.info("服务端拦截器:header received from client:" + requestHeaders);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(CUSTOM_HEADER_KEY, "customRespondValue");
super.sendHeaders(responseHeaders);
}
}, requestHeaders);
}
@Test
public void wrapMethodDefinition_methodDescriptor() throws Exception {
ServerMethodDefinition<String, Integer> methodDef =
ServerMethodDefinition.create(
method,
new ServerCallHandler<String, Integer>() {
@Override
public Listener<String> startCall(
ServerCall<String, Integer> call, Metadata headers) {
throw new UnsupportedOperationException();
}
});
ServerMethodDefinition<?, ?> wMethodDef = binlogProvider.wrapMethodDefinition(methodDef);
validateWrappedMethod(wMethodDef.getMethodDescriptor());
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> serverCall,
Metadata metadata,
ServerCallHandler<ReqT, RespT> nextHandler) {
serverCall.close(status, new Metadata());
return new Listener<ReqT>() {};
}