下面列出了怎么用io.grpc.ForwardingServerCall.SimpleForwardingServerCall的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Echo the request headers from a client into response headers and trailers. Useful for
* testing end-to-end metadata propagation.
*/
private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.merge(requestHeaders, keySet);
super.sendHeaders(responseHeaders);
}
@Override
public void close(Status status, Metadata trailers) {
trailers.merge(requestHeaders, keySet);
super.close(status, trailers);
}
}, requestHeaders);
}
};
}
/**
* Echoes request headers with the specified key(s) from a client into response headers only.
*/
private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.merge(requestHeaders, keySet);
super.sendHeaders(responseHeaders);
}
@Override
public void close(Status status, Metadata trailers) {
super.close(status, trailers);
}
}, requestHeaders);
}
};
}
/**
* Echoes request headers with the specified key(s) from a client into response trailers only.
*/
private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
super.sendHeaders(responseHeaders);
}
@Override
public void close(Status status, Metadata trailers) {
trailers.merge(requestHeaders, keySet);
super.close(status, trailers);
}
}, requestHeaders);
}
};
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
long startCallMillis = System.currentTimeMillis();
String fullMethodName = call.getMethodDescriptor().getFullMethodName();
String methodName = fullMethodName.substring(fullMethodName.indexOf("/") + 1);
return next.startCall(
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
Metrics.requestLatency
.labels(methodName)
.observe((System.currentTimeMillis() - startCallMillis) / 1000f);
Metrics.grpcRequestCount.labels(methodName, status.getCode().name()).inc();
super.close(status, trailers);
}
},
headers);
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
long startCallMillis = System.currentTimeMillis();
String fullMethodName = call.getMethodDescriptor().getFullMethodName();
String serviceName = MethodDescriptor.extractFullServiceName(fullMethodName);
String methodName = fullMethodName.substring(fullMethodName.indexOf("/") + 1);
return next.startCall(
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
GrpcMetrics.requestLatency
.labels(serviceName, methodName, status.getCode().name())
.observe((System.currentTimeMillis() - startCallMillis) / 1000f);
super.close(status, trailers);
}
},
headers);
}
@SuppressWarnings("checkstyle:MethodTypeParameterName")
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
final ServerCall<ReqT, RespT> call, final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
TL.set(call);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(final Status status, final Metadata trailers) {
super.close(status, trailers);
TL.remove();
}
}, headers);
}
@Override
public <REQ, RESP> Listener<REQ> interceptCall(ServerCall<REQ, RESP> call, Metadata headers,
ServerCallHandler<REQ, RESP> next) {
if (!call.getMethodDescriptor().equals(UnitTestServiceGrpc.getErrorReplaceExceptionMethod())) {
return next.startCall(call, headers);
}
return next.startCall(new SimpleForwardingServerCall<REQ, RESP>(call) {
@Override
public void close(Status status, Metadata trailers) {
if (status.getCause() instanceof IllegalStateException &&
status.getCause().getMessage().equals("This error should be replaced")) {
status = status.withDescription("Error was replaced");
}
delegate().close(status, trailers);
}
}, headers);
}
/**
* Echo the request headers from a client into response headers and trailers. Useful for
* testing end-to-end metadata propagation.
*/
private static ServerInterceptor echoRequestHeadersInterceptor(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.merge(requestHeaders, keySet);
super.sendHeaders(responseHeaders);
}
@Override
public void close(Status status, Metadata trailers) {
trailers.merge(requestHeaders, keySet);
super.close(status, trailers);
}
}, requestHeaders);
}
};
}
/**
* Echoes request headers with the specified key(s) from a client into response headers only.
*/
private static ServerInterceptor echoRequestMetadataInHeaders(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.merge(requestHeaders, keySet);
super.sendHeaders(responseHeaders);
}
@Override
public void close(Status status, Metadata trailers) {
super.close(status, trailers);
}
}, requestHeaders);
}
};
}
/**
* Echoes request headers with the specified key(s) from a client into response trailers only.
*/
private static ServerInterceptor echoRequestMetadataInTrailers(final Metadata.Key<?>... keys) {
final Set<Metadata.Key<?>> keySet = new HashSet<>(Arrays.asList(keys));
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
super.sendHeaders(responseHeaders);
}
@Override
public void close(Status status, Metadata trailers) {
trailers.merge(requestHeaders, keySet);
super.close(status, trailers);
}
}, requestHeaders);
}
};
}
@Test public void userInterceptor_throwsOnSendMessage() throws IOException {
init(new ServerInterceptor() {
@Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata metadata, ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override public void sendMessage(RespT message) {
throw new IllegalStateException("I'm a bad interceptor.");
}
}, metadata);
}
});
assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST))
.isInstanceOf(StatusRuntimeException.class);
testSpanHandler.takeRemoteSpanWithErrorMessage(Span.Kind.SERVER, "I'm a bad interceptor.");
}
@Test public void userInterceptor_throwsOnClose() throws IOException {
init(new ServerInterceptor() {
@Override public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata metadata, ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override public void close(Status status, Metadata trailers) {
throw new IllegalStateException("I'm a bad interceptor.");
}
}, metadata);
}
});
assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST))
.isInstanceOf(StatusRuntimeException.class);
testSpanHandler.takeRemoteSpanWithErrorMessage(Span.Kind.SERVER, "I'm a bad interceptor.");
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
logger.info("header received from client:" + requestHeaders);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(CUSTOM_HEADER_KEY, "customRespondValue");
super.sendHeaders(responseHeaders);
}
}, requestHeaders);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
logger.info("服务端拦截器:header received from client:" + requestHeaders);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(CUSTOM_HEADER_KEY, "customRespondValue");
super.sendHeaders(responseHeaders);
}
}, requestHeaders);
}
@Test
public void debugClientInterceptTest() {
LinkedList<String> logs = new LinkedList<String>();
Metadata requestHeaders = new Metadata();
requestHeaders.put(Metadata.Key.of("request_header", Metadata.ASCII_STRING_MARSHALLER), "request_header_value");
// Setup
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(Metadata.Key.of("response_header", Metadata.ASCII_STRING_MARSHALLER),
"response_header_value");
super.sendHeaders(responseHeaders);
}
}, headers);
}
}));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel())
.withInterceptors(new DebugClientInterceptor(Level.STATUS, Level.HEADERS, Level.MESSAGE) {
@Override
protected void log(String message) {
logs.add(message);
}
}, MetadataUtils.newAttachHeadersInterceptor(requestHeaders));
stub.sayHello(HelloRequest.newBuilder().setName("World").build());
assertThat(logs.poll()).contains("SayHello"); // request method name
assertThat(logs.poll()).contains(requestHeaders.toString()); // request header value
assertThat(logs.poll()).contains("World"); // request message
assertThat(logs.poll()).contains("response_header_value"); // response header
assertThat(logs.poll()).contains("Hello World"); // response message
assertThat(logs.poll()).contains("0 OK"); // response status
}
@Test
public void debugServerInterceptTest() {
LinkedList<String> logs = new LinkedList<String>();
Metadata requestHeaders = new Metadata();
requestHeaders.put(Metadata.Key.of("request_header", Metadata.ASCII_STRING_MARSHALLER), "request_header_value");
// Setup
serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(Metadata.Key.of("response_header", Metadata.ASCII_STRING_MARSHALLER),
"response_header_value");
super.sendHeaders(responseHeaders);
}
}, headers);
}
}, new DebugServerInterceptor(DebugServerInterceptor.Level.METHOD, DebugServerInterceptor.Level.MESSAGE,
DebugServerInterceptor.Level.HEADERS) {
@Override
protected void log(String logmessage) {
logs.add(logmessage);
}
}));
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel())
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(requestHeaders));
stub.sayHello(HelloRequest.newBuilder().setName("World").build());
assertThat(logs.poll()).contains("SayHello"); // request method name
assertThat(logs.poll()).contains("request_header_value"); // request header value
assertThat(logs.poll()).contains("World"); // request message
assertThat(logs.poll()).contains("SayHello"); // response method name
assertThat(logs.poll()).contains("response_header_value"); // response header
assertThat(logs.poll()).contains("Hello World"); // response message
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
logger.info("header received from client:" + requestHeaders);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(CUSTOM_HEADER_KEY, "customRespondValue");
super.sendHeaders(responseHeaders);
}
}, requestHeaders);
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Context ctx = Context.current();
CallMetricRecorder callMetricRecorder = InternalCallMetricRecorder.CONTEXT_KEY.get(ctx);
if (callMetricRecorder == null) {
callMetricRecorder = InternalCallMetricRecorder.newCallMetricRecorder();
ctx = ctx.withValue(InternalCallMetricRecorder.CONTEXT_KEY, callMetricRecorder);
}
final CallMetricRecorder finalCallMetricRecorder = callMetricRecorder;
ServerCall<ReqT, RespT> trailerAttachingCall =
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
Map<String, Double> metricValues =
InternalCallMetricRecorder.finalizeAndDump(finalCallMetricRecorder);
// Only attach a metric report if there are some metric values to be reported.
if (!metricValues.isEmpty()) {
OrcaLoadReport report =
OrcaLoadReport.newBuilder().putAllRequestCost(metricValues).build();
trailers.put(ORCA_ENDPOINT_LOAD_METRICS_KEY, report);
}
super.close(status, trailers);
}
};
return Contexts.interceptCall(
ctx,
trailerAttachingCall,
headers,
next);
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
lastK8sJwtTokenValue = null;
byte[] value =
requestHeaders.get(SdsClientFileBasedMetadataTest.K8S_SA_JWT_TOKEN_HEADER_METADATA_KEY);
if (value != null) {
lastK8sJwtTokenValue = new String(value, StandardCharsets.UTF_8);
}
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {}, requestHeaders);
}
@Override
protected void configure(ServerBuilder sb) {
sb.workerGroup(EventLoopGroups.newEventLoopGroup(1), true);
sb.maxRequestLength(MAX_MESSAGE_SIZE);
sb.idleTimeoutMillis(0);
sb.http(0);
sb.https(0);
sb.tlsSelfSigned();
final ServerServiceDefinition interceptService =
ServerInterceptors.intercept(
new TestServiceImpl(Executors.newSingleThreadScheduledExecutor()),
new ServerInterceptor() {
@Override
public <REQ, RESP> Listener<REQ> interceptCall(
ServerCall<REQ, RESP> call,
Metadata requestHeaders,
ServerCallHandler<REQ, RESP> next) {
final HttpHeadersBuilder fromClient = HttpHeaders.builder();
MetadataUtil.fillHeaders(requestHeaders, fromClient);
CLIENT_HEADERS_CAPTURE.set(fromClient.build());
return next.startCall(
new SimpleForwardingServerCall<REQ, RESP>(call) {
@Override
public void close(Status status, Metadata trailers) {
trailers.merge(requestHeaders);
super.close(status, trailers);
}
}, requestHeaders);
}
});
sb.serviceUnder("/",
GrpcService.builder()
.addService(interceptService)
.setMaxInboundMessageSizeBytes(MAX_MESSAGE_SIZE)
.setMaxOutboundMessageSizeBytes(MAX_MESSAGE_SIZE)
.useClientTimeoutHeader(false)
.build()
.decorate((client, ctx, req) -> {
final HttpResponse res = client.serve(ctx, req);
return new FilteredHttpResponse(res) {
private boolean headersReceived;
@Override
protected HttpObject filter(HttpObject obj) {
if (obj instanceof HttpHeaders) {
if (!headersReceived) {
headersReceived = true;
} else {
SERVER_TRAILERS_CAPTURE.set((HttpHeaders) obj);
}
}
return obj;
}
};
}));
}
/**
* This shows that a {@link ServerInterceptor} can see the server server span when processing the
* request and response.
*/
@Test public void bodyTaggingExample() throws IOException {
SpanCustomizer customizer = CurrentSpanCustomizer.create(tracing);
AtomicInteger sends = new AtomicInteger();
AtomicInteger recvs = new AtomicInteger();
init(new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
call = new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override public void sendMessage(RespT message) {
delegate().sendMessage(message);
customizer.tag("grpc.message_send." + sends.getAndIncrement(), message.toString());
}
};
return new SimpleForwardingServerCallListener<ReqT>(next.startCall(call, headers)) {
@Override public void onMessage(ReqT message) {
customizer.tag("grpc.message_recv." + recvs.getAndIncrement(), message.toString());
delegate().onMessage(message);
}
};
}
});
GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST);
assertThat(testSpanHandler.takeRemoteSpan(Span.Kind.SERVER).tags()).containsKeys(
"grpc.message_recv.0", "grpc.message_send.0"
);
Iterator<HelloReply> replies = GreeterGrpc.newBlockingStub(client)
.sayHelloWithManyReplies(HELLO_REQUEST);
assertThat(replies).toIterable().hasSize(10);
// Intentionally verbose here to show that only one recv and 10 replies
assertThat(testSpanHandler.takeRemoteSpan(Span.Kind.SERVER).tags()).containsKeys(
"grpc.message_recv.1",
"grpc.message_send.1",
"grpc.message_send.2",
"grpc.message_send.3",
"grpc.message_send.4",
"grpc.message_send.5",
"grpc.message_send.6",
"grpc.message_send.7",
"grpc.message_send.8",
"grpc.message_send.9",
"grpc.message_send.10"
);
}