下面列出了怎么用io.grpc.ServerCall.Listener的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder.forPort(0)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.compressorRegistry(compressors)
.decompressorRegistry(decompressors)
.intercept(new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Listener<ReqT> listener = next.startCall(call, headers);
// TODO(carl-mastrangelo): check that encoding was set.
call.setMessageCompression(true);
return listener;
}
});
}
private static <ReqT, RespT> ServerCall.Listener<ReqT> startServerCallHelper(
final ServerMethodDefinition<ReqT, RespT> methodDef,
final List<Object> serializedResp) {
ServerCall<ReqT, RespT> serverCall = new NoopServerCall<ReqT, RespT>() {
@Override
public void sendMessage(RespT message) {
serializedResp.add(message);
}
@Override
public MethodDescriptor<ReqT, RespT> getMethodDescriptor() {
return methodDef.getMethodDescriptor();
}
};
return methodDef.getServerCallHandler().startCall(serverCall, new Metadata());
}
@Test
public void callNextTwice() {
ServerInterceptor interceptor = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// Calling next twice is permitted, although should only rarely be useful.
assertSame(listener, next.startCall(call, headers));
return next.startCall(call, headers);
}
};
ServerServiceDefinition intercepted = ServerInterceptors.intercept(serviceDefinition,
interceptor);
assertSame(listener,
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers));
verify(handler, times(2)).startCall(same(call), same(headers));
verifyNoMoreInteractions(handler);
}
@Test
public void argumentsPassed() {
@SuppressWarnings("unchecked")
final ServerCall<String, Integer> call2 = new NoopServerCall<String, Integer>();
@SuppressWarnings("unchecked")
final ServerCall.Listener<String> listener2 = mock(ServerCall.Listener.class);
ServerInterceptor interceptor = new ServerInterceptor() {
@SuppressWarnings("unchecked") // Lot's of casting for no benefit. Not intended use.
@Override
public <R1, R2> ServerCall.Listener<R1> interceptCall(
ServerCall<R1, R2> call,
Metadata headers,
ServerCallHandler<R1, R2> next) {
assertSame(call, ServerInterceptorsTest.this.call);
assertSame(listener,
next.startCall((ServerCall<R1, R2>)call2, headers));
return (ServerCall.Listener<R1>) listener2;
}
};
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
serviceDefinition, Arrays.asList(interceptor));
assertSame(listener2,
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers));
verify(handler).startCall(call2, headers);
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
long startCallMillis = System.currentTimeMillis();
String fullMethodName = call.getMethodDescriptor().getFullMethodName();
String methodName = fullMethodName.substring(fullMethodName.indexOf("/") + 1);
return next.startCall(
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
Metrics.requestLatency
.labels(methodName)
.observe((System.currentTimeMillis() - startCallMillis) / 1000f);
Metrics.grpcRequestCount.labels(methodName, status.getCode().name()).inc();
super.close(status, trailers);
}
},
headers);
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
long startCallMillis = System.currentTimeMillis();
String fullMethodName = call.getMethodDescriptor().getFullMethodName();
String serviceName = MethodDescriptor.extractFullServiceName(fullMethodName);
String methodName = fullMethodName.substring(fullMethodName.indexOf("/") + 1);
return next.startCall(
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
GrpcMetrics.requestLatency
.labels(serviceName, methodName, status.getCode().name())
.observe((System.currentTimeMillis() - startCallMillis) / 1000f);
super.close(status, trailers);
}
},
headers);
}
@SuppressWarnings("unchecked")
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call, final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
final MethodDescriptor<ReqT, RespT> methodDescriptor = call.getMethodDescriptor();
final InterceptorStatusToken token;
try {
token = beforeInvocation(methodDescriptor);
} catch (final AuthenticationException | AccessDeniedException e) {
log.debug("Access denied");
throw e;
}
log.debug("Access granted");
final Listener<ReqT> result;
try {
result = next.startCall(call, headers);
} finally {
finallyInvocation(token);
}
// TODO: Call that here or in onHalfClose?
return (Listener<ReqT>) afterInvocation(token, result);
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
logMethod(REQUEST, call.getMethodDescriptor());
logHeaders(REQUEST, headers);
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
logMethod(RESPONSE, call.getMethodDescriptor());
logHeaders(RESPONSE, responseHeaders);
super.sendHeaders(responseHeaders);
}
@Override
public void sendMessage(RespT message) {
logMessage(RESPONSE, message);
super.sendMessage(message);
}
}, headers)) {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
if (SupervisorServiceGrpc.getServiceDescriptor().getMethods().contains(call.getMethodDescriptor())) {
// Supervisor API calls are not restricted to the active leader.
return next.startCall(call, headers);
}
if (leaderActivator.isLeader()) {
if (leaderActivator.isActivated()) {
return next.startCall(call, headers);
} else {
call.close(Status.UNAVAILABLE.withDescription("Titus Master is initializing and not yet available."), new Metadata());
return new ServerCall.Listener<ReqT>() {
};
}
} else {
call.close(Status.UNAVAILABLE.withDescription("Titus Master is not leader."), new Metadata());
return new ServerCall.Listener<ReqT>() {
};
}
}
@SuppressWarnings("unchecked")
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call, final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
final MethodDescriptor<ReqT, RespT> methodDescriptor = call.getMethodDescriptor();
final InterceptorStatusToken token;
try {
token = beforeInvocation(methodDescriptor);
} catch (final AuthenticationException | AccessDeniedException e) {
log.debug("Access denied");
throw e;
}
log.debug("Access granted");
final Listener<ReqT> result;
try {
result = next.startCall(call, headers);
} finally {
finallyInvocation(token);
}
// TODO: Call that here or in onHalfClose?
return (Listener<ReqT>) afterInvocation(token, result);
}
@SuppressWarnings("checkstyle:MethodTypeParameterName")
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
final ServerCall<ReqT, RespT> call, final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
TL.set(call);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(final Status status, final Metadata trailers) {
super.close(status, trailers);
TL.remove();
}
}, headers);
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,ServerCallHandler<ReqT, RespT> next) {
logger.info("Call intercepted "+headers.toString());
String token = headers.get(authKey);
if (StringUtils.notEmpty(token))
{
try
{
logger.info("Token "+token);
ConsumerBean consumer = resourceServer.validateResourceFromToken(token);
logger.info("Setting call to client "+consumer.getShort_name());
return new SeldonServerCallListener<ReqT>(next.startCall(call, headers),consumer.getShort_name(),this);
}
catch (APIException e)
{
logger.warn("API exception on getting token ",e);
return next.startCall(call, headers);
}
}
else
{
logger.warn("Empty token ignoring call");
return next.startCall(call, headers);
}
}
@Override
public <REQ, RESP> Listener<REQ> interceptCall(ServerCall<REQ, RESP> call, Metadata headers,
ServerCallHandler<REQ, RESP> next) {
if (!call.getMethodDescriptor().equals(UnitTestServiceGrpc.getErrorReplaceExceptionMethod())) {
return next.startCall(call, headers);
}
return next.startCall(new SimpleForwardingServerCall<REQ, RESP>(call) {
@Override
public void close(Status status, Metadata trailers) {
if (status.getCause() instanceof IllegalStateException &&
status.getCause().getMessage().equals("This error should be replaced")) {
status = status.withDescription("Error was replaced");
}
delegate().close(status, trailers);
}
}, headers);
}
@Override
public <HelloRequestT, HelloReplyT> Listener<HelloRequestT> interceptCall(
ServerCall<HelloRequestT, HelloReplyT> call,
Metadata headers, ServerCallHandler<HelloRequestT, HelloReplyT> next) {
int random = new Random().nextInt(100);
long delay = 0;
if (random < 1) {
delay = 10_000;
} else if (random < 5) {
delay = 5_000;
} else if (random < 10) {
delay = 2_000;
}
if (delay > 0) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return next.startCall(call, headers);
}
@Test
public void callNextTwice() {
ServerInterceptor interceptor = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// Calling next twice is permitted, although should only rarely be useful.
assertSame(listener, next.startCall(call, headers));
return next.startCall(call, headers);
}
};
ServerServiceDefinition intercepted = ServerInterceptors.intercept(serviceDefinition,
interceptor);
assertSame(listener,
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers));
verify(handler, times(2)).startCall(same(call), same(headers));
verifyNoMoreInteractions(handler);
}
@Test
public void argumentsPassed() {
@SuppressWarnings("unchecked")
final ServerCall<String, Integer> call2 = new NoopServerCall<>();
@SuppressWarnings("unchecked")
final ServerCall.Listener<String> listener2 = mock(ServerCall.Listener.class);
ServerInterceptor interceptor = new ServerInterceptor() {
@SuppressWarnings("unchecked") // Lot's of casting for no benefit. Not intended use.
@Override
public <R1, R2> ServerCall.Listener<R1> interceptCall(
ServerCall<R1, R2> call,
Metadata headers,
ServerCallHandler<R1, R2> next) {
assertSame(call, ServerInterceptorsTest.this.call);
assertSame(listener,
next.startCall((ServerCall<R1, R2>)call2, headers));
return (ServerCall.Listener<R1>) listener2;
}
};
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
serviceDefinition, Arrays.asList(interceptor));
assertSame(listener2,
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers));
verify(handler).startCall(call2, headers);
}
@Override
protected AbstractServerImplBuilder<?> getServerBuilder() {
return NettyServerBuilder.forPort(0)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.compressorRegistry(compressors)
.decompressorRegistry(decompressors)
.intercept(new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Listener<ReqT> listener = next.startCall(call, headers);
// TODO(carl-mastrangelo): check that encoding was set.
call.setMessageCompression(true);
return listener;
}
});
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
GrpcServerRequest request = new GrpcServerRequest(nameToKey, call, headers);
Span span = handler.handleReceive(request);
AtomicReference<Span> spanRef = new AtomicReference<>(span);
// startCall invokes user interceptors, so we place the span in scope here
Listener<ReqT> result;
try (Scope scope = currentTraceContext.maybeScope(span.context())) {
result = next.startCall(new TracingServerCall<>(call, span, spanRef, request), headers);
} catch (Throwable e) {
// Another interceptor may throw an exception during startCall, in which case no other
// callbacks are called, so go ahead and close the span here.
//
// See instrumentation/grpc/RATIONALE.md for why we don't use the handler here
spanRef.set(null);
if (span != null) span.error(e).finish();
throw e;
}
return new TracingServerCallListener<>(result, span, spanRef, request);
}
@Test
public void wrapMethodDefinition_methodDescriptor() throws Exception {
ServerMethodDefinition<String, Integer> methodDef =
ServerMethodDefinition.create(
method,
new ServerCallHandler<String, Integer>() {
@Override
public Listener<String> startCall(
ServerCall<String, Integer> call, Metadata headers) {
throw new UnsupportedOperationException();
}
});
ServerMethodDefinition<?, ?> wMethodDef = binlogProvider.wrapMethodDefinition(methodDef);
validateWrappedMethod(wMethodDef.getMethodDescriptor());
}
/**
* Set up the registry.
*/
@Setup(Level.Trial)
public void setup() throws Exception {
registry = new MutableHandlerRegistry();
fullMethodNames = new ArrayList<>(serviceCount * methodCountPerService);
for (int serviceIndex = 0; serviceIndex < serviceCount; ++serviceIndex) {
String serviceName = randomString();
ServerServiceDefinition.Builder serviceBuilder = ServerServiceDefinition.builder(serviceName);
for (int methodIndex = 0; methodIndex < methodCountPerService; ++methodIndex) {
String methodName = randomString();
MethodDescriptor<Void, Void> methodDescriptor = MethodDescriptor.<Void, Void>newBuilder()
.setType(MethodDescriptor.MethodType.UNKNOWN)
.setFullMethodName(MethodDescriptor.generateFullMethodName(serviceName, methodName))
.setRequestMarshaller(TestMethodDescriptors.voidMarshaller())
.setResponseMarshaller(TestMethodDescriptors.voidMarshaller())
.build();
serviceBuilder.addMethod(methodDescriptor,
new ServerCallHandler<Void, Void>() {
@Override
public Listener<Void> startCall(ServerCall<Void, Void> call,
Metadata headers) {
return null;
}
});
fullMethodNames.add(methodDescriptor.getFullMethodName());
}
registry.addService(serviceBuilder.build());
}
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> serverCall,
Metadata metadata,
ServerCallHandler<ReqT, RespT> nextHandler) {
serverCall.close(status, new Metadata());
return new Listener<ReqT>() {};
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(call, headers);
}
TracingServerCallListener(
Listener<RespT> delegate,
Span span,
AtomicReference<Span> spanRef,
GrpcServerRequest request
) {
super(delegate);
this.context = span.context();
this.spanRef = spanRef;
this.request = request;
}
@Test
public void binaryLogInstalled() throws Exception {
final SettableFuture<Boolean> intercepted = SettableFuture.create();
final ServerInterceptor interceptor = new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
intercepted.set(true);
return next.startCall(call, headers);
}
};
builder.binlog = new BinaryLog() {
@Override
public void close() throws IOException {
// noop
}
@Override
public <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
ServerMethodDefinition<ReqT, RespT> oMethodDef) {
return ServerMethodDefinition.create(
oMethodDef.getMethodDescriptor(),
InternalServerInterceptors.interceptCallHandlerCreate(
interceptor,
oMethodDef.getServerCallHandler()));
}
@Override
public Channel wrapChannel(Channel channel) {
return channel;
}
};
createAndStartServer();
basicExchangeHelper(METHOD, "Lots of pizza, please", 314, 50);
assertTrue(intercepted.get());
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
ServerCallHandler<ReqT, RespT> serverCallHandler) {
String resourceName = serverCall.getMethodDescriptor().getFullMethodName();
// Remote address: serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
Entry entry = null;
try {
ContextUtil.enter(resourceName);
entry = SphU.entry(resourceName, EntryType.IN);
// Allow access, forward the call.
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
serverCallHandler.startCall(
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(serverCall) {
@Override
public void close(Status status, Metadata trailers) {
super.close(status, trailers);
// Record the exception metrics.
if (!status.isOk()) {
recordException(status.asRuntimeException());
}
}
}, metadata)) {};
} catch (BlockException e) {
serverCall.close(FLOW_CONTROL_BLOCK, new Metadata());
return new ServerCall.Listener<ReqT>() {};
} finally {
if (entry != null) {
entry.exit();
}
ContextUtil.exit();
}
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
ScopedBeansContainer container = new ScopedBeansContainer();
Context context = Context.current().withValue(GRPC_REQUEST_KEY, container);
context.addListener(this, MoreExecutors.directExecutor());
return Contexts.interceptCall(context, call, headers, next);
}
@Test
public void debugClientInterceptTest() {
LinkedList<String> logs = new LinkedList<String>();
Metadata requestHeaders = new Metadata();
requestHeaders.put(Metadata.Key.of("request_header", Metadata.ASCII_STRING_MARSHALLER), "request_header_value");
// Setup
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(Metadata.Key.of("response_header", Metadata.ASCII_STRING_MARSHALLER),
"response_header_value");
super.sendHeaders(responseHeaders);
}
}, headers);
}
}));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel())
.withInterceptors(new DebugClientInterceptor(Level.STATUS, Level.HEADERS, Level.MESSAGE) {
@Override
protected void log(String message) {
logs.add(message);
}
}, MetadataUtils.newAttachHeadersInterceptor(requestHeaders));
stub.sayHello(HelloRequest.newBuilder().setName("World").build());
assertThat(logs.poll()).contains("SayHello"); // request method name
assertThat(logs.poll()).contains(requestHeaders.toString()); // request header value
assertThat(logs.poll()).contains("World"); // request message
assertThat(logs.poll()).contains("response_header_value"); // response header
assertThat(logs.poll()).contains("Hello World"); // response message
assertThat(logs.poll()).contains("0 OK"); // response status
}
@Test
public void debugServerInterceptTest() {
LinkedList<String> logs = new LinkedList<String>();
Metadata requestHeaders = new Metadata();
requestHeaders.put(Metadata.Key.of("request_header", Metadata.ASCII_STRING_MARSHALLER), "request_header_value");
// Setup
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(Metadata.Key.of("response_header", Metadata.ASCII_STRING_MARSHALLER),
"response_header_value");
super.sendHeaders(responseHeaders);
}
}, headers);
}
}, new DebugServerInterceptor(DebugServerInterceptor.Level.METHOD, DebugServerInterceptor.Level.MESSAGE,
DebugServerInterceptor.Level.HEADERS) {
@Override
protected void log(String logmessage) {
logs.add(logmessage);
}
}));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel())
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(requestHeaders));
stub.sayHello(HelloRequest.newBuilder().setName("World").build());
assertThat(logs.poll()).contains("SayHello"); // request method name
assertThat(logs.poll()).contains("request_header_value"); // request header value
assertThat(logs.poll()).contains("World"); // request message
assertThat(logs.poll()).contains("SayHello"); // response method name
assertThat(logs.poll()).contains("response_header_value"); // response header
assertThat(logs.poll()).contains("Hello World"); // response message
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
RequestMetadata meta = requestMetadataFromHeaders(headers);
if (meta == null) {
meta = RequestMetadata.getDefaultInstance();
}
Context ctx = Context.current().withValue(CONTEXT_KEY, meta);
return Contexts.interceptCall(ctx, call, headers, next);
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
String clientId = headers.get(CLIENT_ID_HEADER_KEY);
if (clientId == null || !authenticator.authenticate(clientId)) {
call.close(Status.UNAUTHENTICATED.withDescription("Invalid or unknown client: " + clientId), headers);
return NOOP_LISTENER;
}
Context context = Context.current().withValue(CLIENT_ID_CONTEXT_KEY, clientId);
return Contexts.interceptCall(context, call, headers, next);
}