下面列出了怎么用io.grpc.ClientCall.Listener的API类实例代码及写法,或者点击链接到github查看源代码。
private static Listener simulateCall(
RequestLogger requestLogger,
Detail detail,
Summary summary,
MethodDescriptor methodDescriptor,
Channel nextChannel,
ClientCall nextCall) {
LoggingInterceptor interceptor =
new LoggingInterceptor(requestLogger, detail.getRawRequestHeaders(), summary.getEndpoint());
// Simulate a call (mocked channel doesn't actually make a call).
ClientCall call = interceptor.interceptCall(methodDescriptor, null, nextChannel);
Metadata upstreamHeaders = new Metadata();
call.start(new Listener() {}, upstreamHeaders);
call.sendMessage(detail.getRequest());
// Capture the response listener and return this so we can test with different responses.
ArgumentCaptor<Listener> listenerCaptor = ArgumentCaptor.forClass(Listener.class);
verify(nextCall).start(listenerCaptor.capture(), eq(upstreamHeaders));
return listenerCaptor.getValue();
}
@Override
public <ReqT, RespT> io.grpc.ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
if (serverEncoding) {
call.setCompression("fzip");
}
call.setMessageCompression(enableServerMessageCompression);
Metadata headersCopy = new Metadata();
headersCopy.merge(headers);
serverResponseHeaders = headersCopy;
return next.startCall(call, headers);
}
@Override
public <ReqT, RespT> io.grpc.ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
if (serverEncoding) {
call.setCompression("fzip");
}
call.setMessageCompression(enableServerMessageCompression);
Metadata headersCopy = new Metadata();
headersCopy.merge(headers);
serverResponseHeaders = headersCopy;
return next.startCall(call, headers);
}
@Override public void start(Listener<RespT> responseListener, Metadata headers) {
GrpcClientRequest request =
new GrpcClientRequest(nameToKey, method, callOptions, delegate(), headers);
Span span = handler.handleSendWithParent(request, invocationContext);
spanRef.set(span);
responseListener = new TracingClientCallListener<>(
responseListener,
invocationContext,
spanRef,
request
);
try (Scope scope = currentTraceContext.maybeScope(span.context())) {
super.start(responseListener, headers);
} catch (Throwable e) {
propagateIfFatal(e);
// Another interceptor may throw an exception during start, in which case no other
// callbacks are called, so go ahead and close the span here.
//
// See instrumentation/grpc/RATIONALE.md for why we don't use the handler here
spanRef.set(null);
if (span != null) span.error(e).finish();
throw e;
}
}
TracingClientCallListener(
Listener<RespT> delegate,
@Nullable TraceContext invocationContext,
AtomicReference<Span> spanRef,
GrpcClientRequest request
) {
super(delegate);
this.invocationContext = invocationContext;
this.spanRef = spanRef;
this.request = request;
}
@Override
public void start(io.grpc.ClientCall.Listener<RespT> responseListener, Metadata headers) {
super.start(new ClientHeadersCapture<RespT>(responseListener), headers);
setMessageCompression(enableClientMessageCompression);
}
private ClientHeadersCapture(Listener<RespT> delegate) {
super(delegate);
}
private void runCall(BiConsumer<Listener, Detail> callback) {
Detail detail = createDetail();
Summary summary = createSummary();
runCall(callback, detail, summary);
}
private void runCall(BiConsumer<Listener, Detail> callback, Detail detail, Summary summary) {
Listener listener =
simulateCall(requestLogger, detail, summary, methodDescriptor, nextChannel, nextCall);
callback.accept(listener, detail);
verifyLoggers(detail, summary, detailLogger, summaryLogger);
}
@Override
public void start(io.grpc.ClientCall.Listener<RespT> responseListener, Metadata headers) {
super.start(new ClientHeadersCapture<>(responseListener), headers);
setMessageCompression(enableClientMessageCompression);
}
private ClientHeadersCapture(Listener<RespT> delegate) {
super(delegate);
}