下面列出了io.grpc.ServerCall#Listener ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
if (Objects.isNull(SecurityContextHolder.getContext().getAuthentication())) {
SecurityContextHolder.getContext().setAuthentication(new AnonymousAuthenticationToken(key,
"anonymousUser", Collections.singletonList(new SimpleGrantedAuthority("ROLE_ANONYMOUS"))));
log.debug("Populated SecurityContextHolder with anonymous token: {}",
SecurityContextHolder.getContext().getAuthentication());
} else {
log.debug("SecurityContextHolder not populated with anonymous token, as it already contained: {}",
SecurityContextHolder.getContext().getAuthentication());
}
return next.startCall(call, headers);
}
/**
* Echoes request headers with the specified key(s) from a client into response headers only.
*/
private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.merge(requestHeaders, keySet);
super.sendHeaders(responseHeaders);
}
@Override
public void close(Status status, Metadata trailers) {
super.close(status, trailers);
}
}, requestHeaders);
}
};
}
@Test
public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception {
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
callObserver.set((ServerCallStreamObserver<Integer>) responseObserver);
return new ServerCalls.NoopStreamObserver<>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().disableAutoRequest();
fail("Cannot set onCancel handler after service invocation");
} catch (IllegalStateException expected) {
// Expected
}
}
private static <ReqT, RespT> ServerCall.Listener<ReqT> startServerCallHelper(
final ServerMethodDefinition<ReqT, RespT> methodDef,
final List<Object> serializedResp) {
ServerCall<ReqT, RespT> serverCall = new NoopServerCall<ReqT, RespT>() {
@Override
public void sendMessage(RespT message) {
serializedResp.add(message);
}
@Override
public MethodDescriptor<ReqT, RespT> getMethodDescriptor() {
return methodDef.getMethodDescriptor();
}
};
return methodDef.getServerCallHandler().startCall(serverCall, new Metadata());
}
@Override
public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(ServerCall<REQUEST, RESPONSE> call,
Metadata headers, ServerCallHandler<REQUEST, RESPONSE> handler) {
final ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
String contextValue = headers.get(Metadata.Key.of(next.getHeadKey(), Metadata.ASCII_STRING_MARSHALLER));
if (!StringUtil.isEmpty(contextValue)) {
next.setHeadValue(contextValue);
}
}
final AbstractSpan span = ContextManager.createEntrySpan(OperationNameFormatUtil.formatOperationName(call.getMethodDescriptor()), contextCarrier);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);
try {
return new TracingServerCallListener<>(handler.startCall(new TracingServerCall<>(call, ContextManager.capture()), headers), call
.getMethodDescriptor(), ContextManager.capture());
} finally {
ContextManager.stopSpan();
}
}
@Test
public void clientSendsOne_errorTooManyRequests_serverStreaming() {
ServerCallRecorder serverCall = new ServerCallRecorder(SERVER_STREAMING_METHOD);
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncServerStreamingCall(
new ServerCalls.ServerStreamingMethod<Integer, Integer>() {
@Override
public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
fail("should not be reached");
}
});
ServerCall.Listener<Integer> listener = callHandler.startCall(serverCall, new Metadata());
listener.onMessage(1);
listener.onMessage(1);
assertThat(serverCall.responses).isEmpty();
assertEquals(Status.Code.INTERNAL, serverCall.status.getCode());
assertEquals(ServerCalls.TOO_MANY_REQUESTS, serverCall.status.getDescription());
// ensure onHalfClose does not invoke
listener.onHalfClose();
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
RequestMetadata meta = headers.get(TracingMetadataUtils.METADATA_KEY);
assertThat(meta.getCorrelatedInvocationsId()).isEqualTo("build-req-id");
assertThat(meta.getToolInvocationId()).isEqualTo("command-id");
assertThat(meta.getActionId()).isNotEmpty();
assertThat(meta.getToolDetails().getToolName()).isEqualTo("bazel");
assertThat(meta.getToolDetails().getToolVersion())
.isEqualTo(BlazeVersionInfo.instance().getVersion());
return next.startCall(call, headers);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> serverCall,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
ServerCall.Listener<ReqT> listener = next.startCall(new StreamResponseServerCall<>(serverCall), requestHeaders);
return new StreamRequestListener<>(listener);
}
private static ServerInterceptor recordContextInterceptor(
final AtomicReference<Context> contextCapture) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
contextCapture.set(Context.current());
return next.startCall(call, requestHeaders);
}
};
}
@Test
public void onReadyHandlerCalledForUnaryRequest() throws Exception {
final AtomicInteger onReadyCalled = new AtomicInteger();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncServerStreamingCall(
new ServerCalls.ServerStreamingMethod<Integer, Integer>() {
@Override
public void invoke(Integer req, StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
serverCallObserver.setOnReadyHandler(new Runnable() {
@Override
public void run() {
onReadyCalled.incrementAndGet();
}
});
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
serverCall.isReady = true;
serverCall.isCancelled = false;
callListener.onReady();
// On ready is not called until the unary request message is delivered
assertEquals(0, onReadyCalled.get());
// delivering the message doesn't trigger onReady listener either
callListener.onMessage(1);
assertEquals(0, onReadyCalled.get());
// half-closing triggers the unary request delivery and onReady
callListener.onHalfClose();
assertEquals(1, onReadyCalled.get());
// Next on ready event from the transport triggers listener
callListener.onReady();
assertEquals(2, onReadyCalled.get());
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call,
final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
final Context ctx = Context.current() //
.withValue(REMOTE_ADDRESS, call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
return Contexts.interceptCall(ctx, call, headers, next);
}
@Test
public void noCancellationExceptionIfOnCancelHandlerSet() throws Exception {
final AtomicBoolean onCancelCalled = new AtomicBoolean();
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override
public StreamObserver<Integer> invoke(StreamObserver<Integer> responseObserver) {
ServerCallStreamObserver<Integer> serverCallObserver =
(ServerCallStreamObserver<Integer>) responseObserver;
callObserver.set(serverCallObserver);
serverCallObserver.setOnCancelHandler(new Runnable() {
@Override
public void run() {
onCancelCalled.set(true);
}
});
return new ServerCalls.NoopStreamObserver<>();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(serverCall, new Metadata());
callListener.onReady();
callListener.onCancel();
assertTrue(onCancelCalled.get());
serverCall.isCancelled = true;
assertTrue(callObserver.get().isCancelled());
callObserver.get().onNext(null);
callObserver.get().onCompleted();
}
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(call, headers);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
log.info(serverCall.getMethodDescriptor().getFullMethodName());
return serverCallHandler.startCall(serverCall, metadata);
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void serverDeadlineLogged() {
final AtomicReference<ServerCall> interceptedCall =
new AtomicReference<ServerCall>();
final ServerCall.Listener mockListener = mock(ServerCall.Listener.class);
final MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
// We expect the contents of the "grpc-timeout" header to be installed the context
Context.current()
.withDeadlineAfter(1, TimeUnit.SECONDS, Executors.newSingleThreadScheduledExecutor())
.run(new Runnable() {
@Override
public void run() {
ServerCall.Listener<byte[]> unused =
new BinlogHelper(mockSinkWriter)
.getServerInterceptor(CALL_ID)
.interceptCall(
new NoopServerCall<byte[], byte[]>() {
@Override
public MethodDescriptor<byte[], byte[]> getMethodDescriptor() {
return method;
}
},
new Metadata(),
new ServerCallHandler<byte[], byte[]>() {
@Override
public ServerCall.Listener<byte[]> startCall(
ServerCall<byte[], byte[]> call,
Metadata headers) {
interceptedCall.set(call);
return mockListener;
}
});
}
});
ArgumentCaptor<Duration> timeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockSinkWriter).logClientHeader(
/*seq=*/ eq(1L),
eq("service/method"),
isNull(String.class),
timeoutCaptor.capture(),
any(Metadata.class),
eq(Logger.LOGGER_SERVER),
eq(CALL_ID),
isNull(SocketAddress.class));
verifyNoMoreInteractions(mockSinkWriter);
Duration timeout = timeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - Durations.toNanos(timeout))
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@SuppressWarnings({"rawtypes", "unchecked"})
private static void onServerMessageHelper(ServerCall.Listener listener, Object request) {
listener.onMessage(request);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Context ctx = Context.current().withValue(ctxKey, "ServerAcceptsContext");
return Contexts.interceptCall(ctx, call, headers, next);
}
@Override
protected ServerCall.Listener<R> delegate() {
return delegate;
}
@Test
public void statusRuntimeExceptionTransmitterIgnoresClosedCalls() {
final Status expectedStatus = Status.UNAVAILABLE;
final Status unexpectedStatus = Status.CANCELLED;
final Metadata expectedMetadata = new Metadata();
FakeServerCall<Void, Void> call =
new FakeServerCall<Void, Void>(expectedStatus, expectedMetadata);
final StatusRuntimeException exception =
new StatusRuntimeException(expectedStatus, expectedMetadata);
listener = new VoidCallListener() {
@Override
public void onMessage(Void message) {
throw exception;
}
@Override
public void onHalfClose() {
throw exception;
}
};
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
serviceDefinition,
Arrays.asList(TransmitStatusRuntimeExceptionInterceptor.instance()));
ServerCall.Listener<Void> callDoubleSreListener =
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers);
callDoubleSreListener.onMessage(null); // the only close with our exception
callDoubleSreListener.onHalfClose(); // should not trigger a close
// this listener closes the call when it is initialized with startCall
listener = new VoidCallListener() {
@Override
public void onCall(ServerCall<Void, Void> call, Metadata headers) {
call.close(unexpectedStatus, headers);
}
@Override
public void onHalfClose() {
throw exception;
}
};
ServerCall.Listener<Void> callClosedListener =
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers);
// call is already closed, does not match exception
callClosedListener.onHalfClose(); // should not trigger a close
assertEquals(1, call.numCloses);
}
/**
* Activates transaction on starting server call listener method
*
* @param listener server call listener
* @return transaction, or {@literal null} if there is none
*/
@Nullable
Transaction enterServerListenerMethod(ServerCall.Listener<?> listener);