下面列出了怎么用io.grpc.ServerCall的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("checkstyle:MethodTypeParameterName")
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
final ServerCall<ReqT, RespT> call, final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
TL.set(call);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(final Status status, final Metadata trailers) {
super.close(status, trailers);
TL.remove();
}
}, headers);
}
@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder.forPort(0)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.compressorRegistry(compressors)
.decompressorRegistry(decompressors)
.intercept(new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Listener<ReqT> listener = next.startCall(call, headers);
// TODO(carl-mastrangelo): check that encoding was set.
call.setMessageCompression(true);
return listener;
}
});
}
@Test
public void testStreamClose_clientOkTriggersDelayedCancellation() throws Exception {
AtomicBoolean contextCancelled = new AtomicBoolean(false);
AtomicReference<Context> context = new AtomicReference<>();
AtomicReference<ServerCall<String, Integer>> callReference = new AtomicReference<>();
ServerStreamListener streamListener = testStreamClose_setup(callReference,
context, contextCancelled, null);
// For close status OK:
// isCancelled is expected to be true after all pending work is done
assertFalse(callReference.get().isCancelled());
assertFalse(context.get().isCancelled());
streamListener.closed(Status.OK);
assertFalse(callReference.get().isCancelled());
assertFalse(context.get().isCancelled());
assertEquals(1, executor.runDueTasks());
assertTrue(callReference.get().isCancelled());
assertTrue(context.get().isCancelled());
assertTrue(contextCancelled.get());
}
private static <ReqT, RespT> ServerCall.Listener<ReqT> startServerCallHelper(
final ServerMethodDefinition<ReqT, RespT> methodDef,
final List<Object> serializedResp) {
ServerCall<ReqT, RespT> serverCall = new NoopServerCall<ReqT, RespT>() {
@Override
public void sendMessage(RespT message) {
serializedResp.add(message);
}
@Override
public MethodDescriptor<ReqT, RespT> getMethodDescriptor() {
return methodDef.getMethodDescriptor();
}
};
return methodDef.getServerCallHandler().startCall(serverCall, new Metadata());
}
@Test
public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
callObserver.set((ServerCallStreamObserver<Integer>) responseObserver);
return new ServerCalls.NoopStreamObserver<>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().disableAutoRequest();
fail("Cannot set onCancel handler after service invocation");
} catch (IllegalStateException expected) {
// Expected
}
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
logMethod(REQUEST, call.getMethodDescriptor());
logHeaders(REQUEST, headers);
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
logMethod(RESPONSE, call.getMethodDescriptor());
logHeaders(RESPONSE, responseHeaders);
super.sendHeaders(responseHeaders);
}
@Override
public void sendMessage(RespT message) {
logMessage(RESPONSE, message);
super.sendMessage(message);
}
}, headers)) {
@SuppressWarnings("unchecked")
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call, final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
final MethodDescriptor<ReqT, RespT> methodDescriptor = call.getMethodDescriptor();
final InterceptorStatusToken token;
try {
token = beforeInvocation(methodDescriptor);
} catch (final AuthenticationException | AccessDeniedException e) {
log.debug("Access denied");
throw e;
}
log.debug("Access granted");
final Listener<ReqT> result;
try {
result = next.startCall(call, headers);
} finally {
finallyInvocation(token);
}
// TODO: Call that here or in onHalfClose?
return (Listener<ReqT>) afterInvocation(token, result);
}
private <WReqT, WRespT> ServerStreamListener startWrappedCall(
String fullMethodName,
ServerMethodDefinition<WReqT, WRespT> methodDef,
ServerStream stream,
Metadata headers,
Context.CancellableContext context,
Tag tag) {
ServerCallImpl<WReqT, WRespT> call = new ServerCallImpl<>(
stream,
methodDef.getMethodDescriptor(),
headers,
context,
decompressorRegistry,
compressorRegistry,
serverCallTracer,
tag);
ServerCall.Listener<WReqT> listener =
methodDef.getServerCallHandler().startCall(call, headers);
if (listener == null) {
throw new NullPointerException(
"startCall() returned a null listener for method " + fullMethodName);
}
return call.newServerStreamListener(listener);
}
@Test
public void disablingInboundAutoFlowControlSuppressesRequestsForMoreMessages() throws Exception {
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.disableAutoInboundFlowControl();
return new ServerCalls.NoopStreamObserver<>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onReady();
// Transport should not call this if nothing has been requested but forcing it here
// to verify that message delivery does not trigger a call to request(1).
callListener.onMessage(1);
// Should never be called
assertThat(serverCall.requestCalls).isEmpty();
}
@Test
public void clientSendsOne_errorMissingRequest_serverStreaming() {
ServerCallRecorder serverCall = new ServerCallRecorder(SERVER_STREAMING_METHOD);
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncServerStreamingCall(
new ServerCalls.ServerStreamingMethod<Integer, Integer>() {
@Override
public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
fail("should not be reached");
}
});
ServerCall.Listener<Integer> listener = callHandler.startCall(serverCall, new Metadata());
listener.onHalfClose();
assertThat(serverCall.responses).isEmpty();
assertEquals(Status.Code.INTERNAL, serverCall.status.getCode());
assertEquals(ServerCalls.MISSING_REQUEST, serverCall.status.getDescription());
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, final Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// Check if this RPC has tags to track request (e.g., older clients).
if (headers != null && headers.containsKey(DESCRIPTOR_HEADER) && headers.containsKey(ID_HEADER)) {
RequestTag requestTag = new RequestTag(headers.get(DESCRIPTOR_HEADER), Long.parseLong(headers.get(ID_HEADER)));
requestTracker.trackRequest(requestTag);
log.debug(requestTag.getRequestId(), "Received tag from RPC request {}.",
requestTag.getRequestDescriptor());
} else {
log.debug("No tags provided for call {} in headers: {}.", call.getMethodDescriptor().getFullMethodName(),
headers);
}
return next.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
super.sendHeaders(responseHeaders);
}
}, headers);
}
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
private static void onExit(@Advice.Thrown @Nullable Throwable thrown,
@Advice.This ServerCall.Listener<?> listener,
@Advice.Local("transaction") @Nullable Transaction transaction) {
if (null == tracer || grpcHelperManager == null || transaction == null) {
return;
}
GrpcHelper helper = grpcHelperManager.getForClassLoaderOfClass(ServerCall.Listener.class);
if (helper == null) {
return;
}
helper.exitServerListenerMethod(thrown, listener, transaction, false);
}
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
private static void onExit(@Advice.Thrown @Nullable Throwable thrown,
@Advice.This ServerCall.Listener<?> listener,
@Advice.Local("transaction") @Nullable Transaction transaction) {
if (null == tracer || grpcHelperManager == null) {
return;
}
GrpcHelper helper = grpcHelperManager.getForClassLoaderOfClass(ServerCall.Listener.class);
if (helper == null) {
return;
}
helper.exitServerListenerMethod(thrown, listener, transaction, true);
}
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
private static void onExit(@Advice.Thrown @Nullable Throwable thrown,
@Advice.Argument(0) ServerCall<?, ?> serverCall,
@Advice.Return ServerCall.Listener<?> listener,
@Advice.Local("transaction") @Nullable Transaction transaction) {
if (transaction == null) {
return;
}
if (thrown != null) {
// terminate transaction in case of exception as it won't be stored
transaction.deactivate().end();
return;
}
GrpcHelper helper = grpcHelperManager.getForClassLoaderOfClass(ServerCall.class);
if (helper != null) {
helper.registerTransaction(serverCall, listener, transaction);
}
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
GrpcServerRequest request = new GrpcServerRequest(nameToKey, call, headers);
Span span = handler.handleReceive(request);
AtomicReference<Span> spanRef = new AtomicReference<>(span);
// startCall invokes user interceptors, so we place the span in scope here
Listener<ReqT> result;
try (Scope scope = currentTraceContext.maybeScope(span.context())) {
result = next.startCall(new TracingServerCall<>(call, span, spanRef, request), headers);
} catch (Throwable e) {
// Another interceptor may throw an exception during startCall, in which case no other
// callbacks are called, so go ahead and close the span here.
//
// See instrumentation/grpc/RATIONALE.md for why we don't use the handler here
spanRef.set(null);
if (span != null) span.error(e).finish();
throw e;
}
return new TracingServerCallListener<>(result, span, spanRef, request);
}
/**
* intercept point of call.
*
* @param serverCall call of server.
* @param metadata of call.
* @param serverCallHandler handler of call.
* @param <REQUEST> of call.
* @param <RESPONSE> of call.
* @return lister of call.
*/
@Override
public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(ServerCall<REQUEST, RESPONSE> serverCall,
Metadata metadata, ServerCallHandler<REQUEST, RESPONSE> serverCallHandler) {
String token = metadata.get(AUTH_HEAD_HEADER_NAME);
if (expectedToken.equals(token)) {
return serverCallHandler.startCall(serverCall, metadata);
} else {
serverCall.close(Status.PERMISSION_DENIED, new Metadata());
return listener;
}
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
logger.info("header received from client:" + requestHeaders);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(CUSTOM_HEADER_KEY, "customRespondValue");
super.sendHeaders(responseHeaders);
}
}, requestHeaders);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
String value = headers.get(CONTEXT_KEY);
SampleContext context;
if (value != null) {
context = new SampleContext(value);
} else {
context = CONTEXT_UNDEFINED;
}
return Contexts.interceptCall(Context.current().withValue(CALLER_ID_CONTEXT_KEY, context), call, headers, next);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
String value = metadata.get(Constant.AUTHORIZATION_METADATA_KEY);
Status status = Status.OK;
if (value == null) {
status = Status.UNAUTHENTICATED.withDescription("Authorization token is missing");
} else if (!value.startsWith(Constant.BEARER_TYPE)) {
status = Status.UNAUTHENTICATED.withDescription("Unknown authorization type");
} else {
Jws<Claims> claims = null;
// remove authorization type prefix
String token = value.substring(Constant.BEARER_TYPE.length()).trim();
try {
// verify token signature and parse claims
claims = parser.parseClaimsJws(token);
} catch (JwtException e) {
status = Status.UNAUTHENTICATED.withDescription(e.getMessage()).withCause(e);
}
if (claims != null) {
// set client id into current context
Context ctx = Context.current()
.withValue(Constant.CLIENT_ID_CONTEXT_KEY, claims.getBody().getSubject());
return Contexts.interceptCall(ctx, serverCall, metadata, serverCallHandler);
}
}
serverCall.close(status, new Metadata());
return new ServerCall.Listener<ReqT>() {
// noop
};
}
@Override
public Authentication readAuthentication(final ServerCall<?, ?> call, final Metadata headers)
throws AuthenticationException {
final String header = headers.get(AUTHORIZATION_HEADER);
if (header == null || !header.toLowerCase().startsWith(PREFIX)) {
log.debug("No basic auth header found");
return null;
}
final String[] decoded = extractAndDecodeHeader(header);
return new UsernamePasswordAuthenticationToken(decoded[0], decoded[1]);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
logger.info("header received from client:" + requestHeaders);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(CUSTOM_HEADER_KEY, "customRespondValue");
super.sendHeaders(responseHeaders);
}
}, requestHeaders);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
logger.info("服务端拦截器:header received from client:" + requestHeaders);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(CUSTOM_HEADER_KEY, "customRespondValue");
super.sendHeaders(responseHeaders);
}
}, requestHeaders);
}
@Override
public ServerCall.Listener<ByteBuf> startCall(
final ServerCall<ByteBuf, ByteBuf> call, Metadata headers) {
call.sendHeaders(new Metadata());
call.request(1);
return new ServerCall.Listener<ByteBuf>() {
@Override
public void onMessage(ByteBuf message) {
// no-op
message.release();
call.request(1);
call.sendMessage(genericResponse.slice());
}
@Override
public void onHalfClose() {
call.close(Status.OK, new Metadata());
}
@Override
public void onCancel() {
}
@Override
public void onComplete() {
}
};
}
/**
* Set up the registry.
*/
@Setup(Level.Trial)
public void setup() throws Exception {
registry = new MutableHandlerRegistry();
fullMethodNames = new ArrayList<>(serviceCount * methodCountPerService);
for (int serviceIndex = 0; serviceIndex < serviceCount; ++serviceIndex) {
String serviceName = randomString();
ServerServiceDefinition.Builder serviceBuilder = ServerServiceDefinition.builder(serviceName);
for (int methodIndex = 0; methodIndex < methodCountPerService; ++methodIndex) {
String methodName = randomString();
MethodDescriptor<Void, Void> methodDescriptor = MethodDescriptor.<Void, Void>newBuilder()
.setType(MethodDescriptor.MethodType.UNKNOWN)
.setFullMethodName(MethodDescriptor.generateFullMethodName(serviceName, methodName))
.setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
.setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
.build();
serviceBuilder.addMethod(methodDescriptor,
new ServerCallHandler<Void, Void>() {
@Override
public Listener<Void> startCall(ServerCall<Void, Void> call,
Metadata headers) {
return null;
}
});
fullMethodNames.add(methodDescriptor.getFullMethodName());
}
registry.addService(serviceBuilder.build());
}
}
@Test
public void exceptionInStartCallPropagatesToStream() throws Exception {
createAndStartServer();
final Status status = Status.ABORTED.withDescription("Oh, no!");
mutableFallbackRegistry.addService(ServerServiceDefinition.builder(
new ServiceDescriptor("Waiter", METHOD))
.addMethod(METHOD,
new ServerCallHandler<String, Integer>() {
@Override
public ServerCall.Listener<String> startCall(
ServerCall<String, Integer> call,
Metadata headers) {
throw status.asRuntimeException();
}
}).build());
ServerTransportListener transportListener
= transportServer.registerNewServerTransport(new SimpleServerTransport());
transportListener.transportReady(Attributes.EMPTY);
Metadata requestHeaders = new Metadata();
StatsTraceContext statsTraceCtx =
StatsTraceContext.newServerContext(streamTracerFactories, "Waiter/serve", requestHeaders);
when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
transportListener.streamCreated(stream, "Waiter/serve", requestHeaders);
verify(stream).setListener(streamListenerCaptor.capture());
ServerStreamListener streamListener = streamListenerCaptor.getValue();
assertNotNull(streamListener);
verify(stream, atLeast(1)).statsTraceContext();
verifyNoMoreInteractions(stream);
verify(fallbackRegistry, never()).lookupMethod(any(String.class), any(String.class));
assertEquals(1, executor.runDueTasks());
verify(fallbackRegistry).lookupMethod("Waiter/serve", AUTHORITY);
verify(stream).close(same(status), notNull(Metadata.class));
verify(stream, atLeast(1)).statsTraceContext();
}
@Test
public void greet() {
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
String retVal = client.greet("John");
verify(mockServerInterceptor).interceptCall(
ArgumentMatchers.<ServerCall<HelloRequest, HelloReply>>any(),
metadataCaptor.capture(),
ArgumentMatchers.<ServerCallHandler<HelloRequest, HelloReply>>any());
String token = metadataCaptor.getValue().get(Constant.AUTHORIZATION_METADATA_KEY);
assertNotNull(token);
assertTrue(token.startsWith("Bearer"));
assertEquals("AuthClientTest user=John", retVal);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
RequestMetadata meta = headers.get(TracingMetadataUtils.METADATA_KEY);
assertThat(meta.getCorrelatedInvocationsId()).isEqualTo("build-req-id");
assertThat(meta.getToolInvocationId()).isEqualTo("command-id");
assertThat(meta.getActionId()).isNotEmpty();
assertThat(meta.getToolDetails().getToolName()).isEqualTo("bazel");
assertThat(meta.getToolDetails().getToolVersion())
.isEqualTo(BlazeVersionInfo.instance().getVersion());
return next.startCall(call, headers);
}
@Override
public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
Preconditions.checkArgument(
call.getMethodDescriptor().getType().clientSendsOneMessage(),
"asyncUnaryRequestCall is only for clientSendsOneMessage methods");
ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
new ServerCallStreamObserverImpl<ReqT, RespT>(call);
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
// sends more than 1 requests, ServerCall will catch it. Note that disabling auto
// inbound flow control has no effect on unary calls.
call.request(2);
return new UnaryServerCallListener(responseObserver, call);
}
@Test
public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<ServerCallStreamObserver<Integer>>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
callObserver.set((ServerCallStreamObserver<Integer>) responseObserver);
return new ServerCalls.NoopStreamObserver<Integer>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().setOnCancelHandler(new Runnable() {
@Override
public void run() {
}
});
fail("Cannot set onCancel handler after service invocation");
} catch (IllegalStateException expected) {
// Expected
}
}
@Test
public void exceptionInStartCallPropagatesToStream() throws Exception {
createAndStartServer();
final Status status = Status.ABORTED.withDescription("Oh, no!");
mutableFallbackRegistry.addService(ServerServiceDefinition.builder(
new ServiceDescriptor("Waiter", METHOD))
.addMethod(METHOD,
new ServerCallHandler<String, Integer>() {
@Override
public ServerCall.Listener<String> startCall(
ServerCall<String, Integer> call,
Metadata headers) {
throw status.asRuntimeException();
}
}).build());
ServerTransportListener transportListener
= transportServer.registerNewServerTransport(new SimpleServerTransport());
transportListener.transportReady(Attributes.EMPTY);
Metadata requestHeaders = new Metadata();
StatsTraceContext statsTraceCtx =
StatsTraceContext.newServerContext(streamTracerFactories, "Waiter/serve", requestHeaders);
when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
transportListener.streamCreated(stream, "Waiter/serve", requestHeaders);
verify(stream).streamId();
verify(stream).setListener(streamListenerCaptor.capture());
ServerStreamListener streamListener = streamListenerCaptor.getValue();
assertNotNull(streamListener);
verify(stream, atLeast(1)).statsTraceContext();
verifyNoMoreInteractions(stream);
verify(fallbackRegistry, never()).lookupMethod(any(String.class), any(String.class));
assertEquals(1, executor.runDueTasks());
verify(fallbackRegistry).lookupMethod("Waiter/serve", AUTHORITY);
verify(stream).close(same(status), ArgumentMatchers.<Metadata>notNull());
verify(stream, atLeast(1)).statsTraceContext();
}