下面列出了io.grpc.ServerCallHandler#startCall ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
if (!fitRequestHandler.isPresent()) {
return next.startCall(call, headers);
}
FitInjection fitInjection = fitRequestHandler.get();
String injectionPoint = call.getMethodDescriptor().getFullMethodName();
// Request failure
try {
fitInjection.beforeImmediate(injectionPoint);
} catch (Exception e) {
call.close(Status.UNAVAILABLE.withDescription("FIT server failure"), new Metadata());
return new ServerCall.Listener<ReqT>() {
};
}
// Increased latency.
return fitInjection.findAction(FitLatencyAction.ACTION_ID)
.map(action -> {
int latencyMs = ExceptionExt.doTry(() -> Integer.parseInt(action.getProperties().get("latency"))).orElse(100);
return new LatencyHandler<>(call, headers, next, latencyMs).getLatencyListener();
})
.orElse(next.startCall(call, headers));
}
@Test
public void clientSendsOne_errorMissingRequest_unary() {
ServerCallRecorder serverCall = new ServerCallRecorder(UNARY_METHOD);
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncUnaryCall(
new ServerCalls.UnaryMethod<Integer, Integer>() {
@Override
public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
fail("should not be reached");
}
});
ServerCall.Listener<Integer> listener = callHandler.startCall(serverCall, new Metadata());
listener.onHalfClose();
assertThat(serverCall.responses).isEmpty();
assertEquals(Status.Code.INTERNAL, serverCall.status.getCode());
assertEquals(ServerCalls.MISSING_REQUEST, serverCall.status.getDescription());
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
if (Objects.isNull(SecurityContextHolder.getContext().getAuthentication())) {
SecurityContextHolder.getContext().setAuthentication(new AnonymousAuthenticationToken(key,
"anonymousUser", Collections.singletonList(new SimpleGrantedAuthority("ROLE_ANONYMOUS"))));
log.debug("Populated SecurityContextHolder with anonymous token: {}",
SecurityContextHolder.getContext().getAuthentication());
} else {
log.debug("SecurityContextHolder not populated with anonymous token, as it already contained: {}",
SecurityContextHolder.getContext().getAuthentication());
}
return next.startCall(call, headers);
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
long startCallMillis = System.currentTimeMillis();
String fullMethodName = call.getMethodDescriptor().getFullMethodName();
String serviceName = MethodDescriptor.extractFullServiceName(fullMethodName);
String methodName = fullMethodName.substring(fullMethodName.indexOf("/") + 1);
return next.startCall(
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
GrpcMetrics.requestLatency
.labels(serviceName, methodName, status.getCode().name())
.observe((System.currentTimeMillis() - startCallMillis) / 1000f);
super.close(status, trailers);
}
},
headers);
}
/**
* Echoes request headers with the specified key(s) from a client into response trailers only.
*/
private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
super.sendHeaders(responseHeaders);
}
@Override
public void close(Status status, Metadata trailers) {
trailers.merge(requestHeaders, keySet);
super.close(status, trailers);
}
}, requestHeaders);
}
};
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
AdmissionControllerResponse result;
try {
AdmissionControllerRequest request = AdmissionControllerRequest.newBuilder()
.withCallerId(callerIdResolver.get())
.withEndpointName(call.getMethodDescriptor().getFullMethodName())
.build();
result = admissionController.apply(request);
} catch (Exception e) {
logger.warn("Admission controller error: {}", e.getMessage());
logger.debug("Stack trace", e);
return next.startCall(call, headers);
}
if (result.isAllowed()) {
return next.startCall(call, headers);
}
call.close(Status.RESOURCE_EXHAUSTED.withDescription(result.getReasonMessage()), new Metadata());
return (ServerCall.Listener<ReqT>) NO_OP_LISTENER;
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
if (SupervisorServiceGrpc.getServiceDescriptor().getMethods().contains(call.getMethodDescriptor())) {
// Supervisor API calls are not restricted to the active leader.
return next.startCall(call, headers);
}
if (leaderActivator.isLeader()) {
if (leaderActivator.isActivated()) {
return next.startCall(call, headers);
} else {
call.close(Status.UNAVAILABLE.withDescription("Titus Master is initializing and not yet available."), new Metadata());
return new ServerCall.Listener<ReqT>() {
};
}
} else {
call.close(Status.UNAVAILABLE.withDescription("Titus Master is not leader."), new Metadata());
return new ServerCall.Listener<ReqT>() {
};
}
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
ServerCall<ReqT, RespT> wrappedCall = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendMessage(RespT message) {
super.sendMessage(message);
}
@Override
public void close(Status status, Metadata trailers) {
Throwable exception;
Status newStatus;
if (
status.getCode() == Status.Code.UNKNOWN
&& status.getDescription() == null
&& (exception = status.getCause()) != null
&& (newStatus = statusForException(exception)) != null
) {
status = newStatus
.withCause(exception)
.withDescription(stacktraceToString(exception));
}
super.close(status, trailers);
}
};
return next.startCall(wrappedCall, headers);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
MethodDescriptor<ReqT, RespT> methodDescriptor = call.getMethodDescriptor();
if (notProtectedMethods.contains(methodDescriptor)) {
return next.startCall(call, headers);
}
if (leaderActivationStatus.isActivatedLeader()) {
return next.startCall(call, headers);
} else {
call.close(Status.UNAVAILABLE.withDescription("Not a leader or not ready yet."), new Metadata());
return new ServerCall.Listener<ReqT>() {
};
}
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
return serverCallHandler
.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(serverCall) {
@Override
public void close(Status status, Metadata trailers) {
callTime = System.nanoTime();
super.close(status, trailers);
}
}, metadata);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
logger.info("服务端拦截器:header received from client:" + requestHeaders);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(CUSTOM_HEADER_KEY, "customRespondValue");
super.sendHeaders(responseHeaders);
}
}, requestHeaders);
}
/**
* Capture the request headers from a client. Useful for testing metadata propagation.
*/
public static ServerInterceptor recordRequestHeadersInterceptor(
final AtomicReference<Metadata> headersCapture) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
headersCapture.set(requestHeaders);
return next.startCall(call, requestHeaders);
}
};
}
@Test
public void binaryLogInstalled() throws Exception {
final SettableFuture<Boolean> intercepted = SettableFuture.create();
final ServerInterceptor interceptor = new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
intercepted.set(true);
return next.startCall(call, headers);
}
};
builder.binlog = new BinaryLog() {
@Override
public void close() throws IOException {
// noop
}
@Override
public <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
ServerMethodDefinition<ReqT, RespT> oMethodDef) {
return ServerMethodDefinition.create(
oMethodDef.getMethodDescriptor(),
InternalServerInterceptors.interceptCallHandlerCreate(
interceptor,
oMethodDef.getServerCallHandler()));
}
@Override
public Channel wrapChannel(Channel channel) {
return channel;
}
};
createAndStartServer();
basicExchangeHelper(METHOD, "Lots of pizza, please", 314, 50);
assertTrue(intercepted.get());
}
protected ServerInterceptor getInterceptor() {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
return new TestServerListener<ReqT>(next.startCall(call, headers), listenerExceptionMethod);
}
};
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
return serverCallHandler
.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(serverCall) {
@Override
public void close(Status status, Metadata trailers) {
callTime = System.nanoTime();
super.close(status, trailers);
}
}, metadata);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> serverCall, Metadata metadata,
ServerCallHandler<ReqT, RespT> serverCallHandler) {
accessLogger.info("gRPC [" +
serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR) +
"]: " + serverCall.getMethodDescriptor().getFullMethodName());
ServerCall.Listener<ReqT> delegate = serverCallHandler.startCall(serverCall, metadata);
return new SimpleForwardingServerCallListener<ReqT>(delegate) {
@Override
public void onHalfClose() {
try {
super.onHalfClose();
} catch (Exception e) {
logger.error("Caught an unexpected error.", e);
serverCall.close(Status.INTERNAL
.withCause(e)
.withDescription(e.toString() + "\n" + e.getMessage()),
new Metadata());
}
}
@Override
public void onMessage(ReqT request) {
accessLogger.info("Request Data: " + request);
super.onMessage(request);
}
};
}
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(call, headers);
}
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(call, headers);
}
@Test
public void runtimeStreamObserverIsServerCallStreamObserver() throws Exception {
final AtomicBoolean invokeCalled = new AtomicBoolean();
final AtomicBoolean onCancelCalled = new AtomicBoolean();
final AtomicBoolean onReadyCalled = new AtomicBoolean();
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<ServerCallStreamObserver<Integer>>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
assertTrue(responseObserver instanceof ServerCallStreamObserver);
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
callObserver.set(serverCallObserver);
serverCallObserver.setOnCancelHandler(new Runnable() {
@Override
public void run() {
onCancelCalled.set(true);
}
});
serverCallObserver.setOnReadyHandler(new Runnable() {
@Override
public void run() {
onReadyCalled.set(true);
}
});
invokeCalled.set(true);
return new ServerCalls.NoopStreamObserver<Integer>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
serverCall.isReady = true;
serverCall.isCancelled = false;
assertTrue(callObserver.get().isReady());
assertFalse(callObserver.get().isCancelled());
callListener.onReady();
callListener.onMessage(1);
callListener.onCancel();
assertTrue(invokeCalled.get());
assertTrue(onReadyCalled.get());
assertTrue(onCancelCalled.get());
serverCall.isReady = false;
serverCall.isCancelled = true;
assertFalse(callObserver.get().isReady());
assertTrue(callObserver.get().isCancelled());
// Is called twice, once to permit the first message and once again after the first message
// has been processed (auto flow control)
assertThat(serverCall.requestCalls).containsExactly(1, 1).inOrder();
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
ServerCallHandler<ReqT, RespT> serverCallHandler) {
log.info(serverCall.getMethodDescriptor().getFullMethodName());
return serverCallHandler.startCall(serverCall, metadata);
}