下面列出了怎么用io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// New RPCs on client-side inherit the tag context from the current Context.
TagContext parentCtx = tagger.getCurrentTagContext();
final ClientCallTracer tracerFactory =
newClientCallTracer(parentCtx, method.getFullMethodName(),
recordStartedRpcs, recordFinishedRpcs);
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
tracerFactory.callEnded(status);
super.onClose(status, trailers);
}
},
headers);
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// New RPCs on client-side inherit the tag context from the current Context.
TagContext parentCtx = tagger.getCurrentTagContext();
final ClientCallTracer tracerFactory =
newClientCallTracer(parentCtx, method.getFullMethodName());
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
tracerFactory.callEnded(status);
super.onClose(status, trailers);
}
},
headers);
}
};
}
@Override
public <ReqT,RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override public void start(ClientCall.Listener<RespT> listener, Metadata headers) {
super.start(new SimpleForwardingClientCallListener<RespT>(listener) {
@Override public void onClose(Status status, Metadata trailers) {
onCloseCalled = true;
super.onClose(status, trailers);
}
}, headers);
}
@Override public void halfClose() {
Thread.currentThread().interrupt();
super.halfClose();
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// New RPCs on client-side inherit the tracing context from the current Context.
// Safe usage of the unsafe trace API because CONTEXT_SPAN_KEY.get() returns the same value
// as Tracer.getCurrentSpan() except when no value available when the return value is null
// for the direct access and BlankSpan when Tracer API is used.
final ClientCallTracer tracerFactory = newClientCallTracer(CONTEXT_SPAN_KEY.get(), method);
ClientCall<ReqT, RespT> call =
next.newCall(
method,
callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(io.grpc.Status status, Metadata trailers) {
tracerFactory.callEnded(status);
super.onClose(status, trailers);
}
},
headers);
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
String token = getToken(next);
if (token != null) {
headers.put(TOKEN, token);
}
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
if (isInvalidTokenError(status)) {
try {
refreshToken(next);
} catch (Exception e) {
// don't throw any error here.
// rpc will retry on expired auth token.
}
}
super.onClose(status, trailers);
}
}, headers);
}
};
}
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
ClientCall.Listener<RespT> onReadyListener = new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onReady() {
DiscardClientCall.this.reset();
super.onReady();
}
};
super.start(onReadyListener, headers);
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// New RPCs on client-side inherit the tracing context from the current Context.
// Safe usage of the unsafe trace API because CONTEXT_SPAN_KEY.get() returns the same value
// as Tracer.getCurrentSpan() except when no value available when the return value is null
// for the direct access and BlankSpan when Tracer API is used.
final ClientCallTracer tracerFactory =
newClientCallTracer(ContextUtils.getValue(Context.current()), method);
ClientCall<ReqT, RespT> call =
next.newCall(
method,
callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(io.grpc.Status status, Metadata trailers) {
tracerFactory.callEnded(status);
super.onClose(status, trailers);
}
},
headers);
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
//这里和下面不在一个线程
if (RpcRunningState.isDebugMode()) {
LOGGER.info("[1]header send from client:");
}
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(
method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata requestHeader) {
RpcInvokeContext context = RpcInvokeContext.getContext();
SofaRequest sofaRequest = (SofaRequest) context.get(TripleContants.SOFA_REQUEST_KEY);
ConsumerConfig consumerConfig = (ConsumerConfig) context.get(TripleContants.SOFA_CONSUMER_CONFIG_KEY);
TripleTracerAdapter.beforeSend(sofaRequest, consumerConfig, requestHeader);
if (RpcRunningState.isDebugMode()) {
LOGGER.info("[2]prepare to send from client:{}", requestHeader);
}
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata responseHeader) {
// 客户端收到响应Header
if (RpcRunningState.isDebugMode()) {
LOGGER.info("[3]response header received from server:{}", responseHeader);
}
super.onHeaders(responseHeader);
}
@Override
public void onMessage(RespT message) {
if (RpcRunningState.isDebugMode()) {
LOGGER.info("[4]response message received from server:{}", message);
}
super.onMessage(message);
}
@Override
public void onClose(Status status, Metadata trailers) {
if (RpcRunningState.isDebugMode()) {
LOGGER.info("[5]response close received from server:{},trailers:{}", status, trailers);
}
super.onClose(status, trailers);
}
@Override
public void onReady() {
if (RpcRunningState.isDebugMode()) {
LOGGER.info("[5]client is ready");
}
super.onReady();
}
}, requestHeader);
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
ClientCall<ReqT, RespT> newCall = channel.newCall(methodDescriptor, callOptions);
final AtomicBoolean countedCancel = new AtomicBoolean(false);
// A streaming call might be terminated in one of several possible ways:
// * The call completes normally -> onClose() will be invoked.
// * The context is cancelled -> CancellationListener.cancelled() will be called.
// * The call itself is cancelled (doesn't currently happen) -> ClientCall.cancel() called.
//
// It's possible more than one of these could happen, so we use countedCancel to make sure we
// don't double count a decrement.
Context.current()
.addListener(
context -> {
if (countedCancel.compareAndSet(false, true)) {
ongoingRequestCount.decrementAndGet();
}
},
backgroundTasksThreadPool);
return new SimpleForwardingClientCall(newCall) {
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
if (countedCancel.compareAndSet(false, true)) {
ongoingRequestCount.decrementAndGet();
}
super.cancel(message, cause);
}
@Override
public void start(Listener responseListener, Metadata headers) {
ongoingRequestCount.incrementAndGet();
this.delegate()
.start(
new SimpleForwardingClientCallListener(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
if (countedCancel.compareAndSet(false, true)) {
ongoingRequestCount.decrementAndGet();
}
super.onClose(status, trailers);
}
},
headers);
}
};
}