下面列出了io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener#io.grpc.ForwardingClientCall.SimpleForwardingClientCall 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// New RPCs on client-side inherit the tag context from the current Context.
TagContext parentCtx = tagger.getCurrentTagContext();
final ClientCallTracer tracerFactory =
newClientCallTracer(parentCtx, method.getFullMethodName(),
recordStartedRpcs, recordFinishedRpcs);
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
tracerFactory.callEnded(status);
super.onClose(status, trailers);
}
},
headers);
}
};
}
@Override
public <R, S> ClientCall<R, S> interceptCall(
MethodDescriptor<R, S> method,
CallOptions callOptions,
Channel next) {
return new SimpleForwardingClientCall<R, S>(
next.newCall(method, callOptions)) {
@Override
public void start(
Listener<S> responseListener,
Metadata headers) {
Metadata metadata = new Metadata();
metadata.put(REQUEST_ID_METADATA_KEY, requestId);
metadata.put(ACTOR_CRN_METADATA_KEY, actorCrn);
headers.merge(metadata);
super.start(responseListener, headers);
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// New RPCs on client-side inherit the tag context from the current Context.
TagContext parentCtx = tagger.getCurrentTagContext();
final ClientCallTracer tracerFactory =
newClientCallTracer(parentCtx, method.getFullMethodName());
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
tracerFactory.callEnded(status);
super.onClose(status, trailers);
}
},
headers);
}
};
}
@Override
public <ReqT,RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override public void start(ClientCall.Listener<RespT> listener, Metadata headers) {
super.start(new SimpleForwardingClientCallListener<RespT>(listener) {
@Override public void onClose(Status status, Metadata trailers) {
onCloseCalled = true;
super.onClose(status, trailers);
}
}, headers);
}
@Override public void halfClose() {
Thread.currentThread().interrupt();
super.halfClose();
}
};
}
@Test public void userInterceptor_throwsOnStart() {
closeClient(client);
client = newClient(new ClientInterceptor() {
@Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions,
Channel channel) {
ClientCall<ReqT, RespT> call = channel.newCall(methodDescriptor, callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override public void start(Listener<RespT> responseListener, Metadata headers) {
throw new IllegalStateException("I'm a bad interceptor.");
}
};
}
}, grpcTracing.newClientInterceptor());
assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST))
.isInstanceOf(IllegalStateException.class);
testSpanHandler.takeRemoteSpanWithErrorMessage(CLIENT, "I'm a bad interceptor.");
}
@Test public void userInterceptor_throwsOnHalfClose() {
closeClient(client);
client = newClient(new ClientInterceptor() {
@Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions,
Channel channel) {
ClientCall<ReqT, RespT> call = channel.newCall(methodDescriptor, callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override public void halfClose() {
throw new IllegalStateException("I'm a bad interceptor.");
}
};
}
}, grpcTracing.newClientInterceptor());
assertThatThrownBy(() -> GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST))
.isInstanceOf(IllegalStateException.class);
testSpanHandler.takeRemoteSpanWithErrorMessage(CLIENT, "I'm a bad interceptor.");
}
@Test
public void serverHeaderDeliveredToClient() {
class SpyingClientInterceptor implements ClientInterceptor {
ClientCall.Listener<?> spyListener;
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
spyListener = responseListener =
mock(ClientCall.Listener.class, delegatesTo(responseListener));
super.start(responseListener, headers);
}
};
}
}
SpyingClientInterceptor clientInterceptor = new SpyingClientInterceptor();
GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(channel)
.withInterceptors(clientInterceptor);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
blockingStub.sayHello(HelloRequest.getDefaultInstance());
assertNotNull(clientInterceptor.spyListener);
verify(clientInterceptor.spyListener).onHeaders(metadataCaptor.capture());
assertEquals(
"customRespondValue",
metadataCaptor.getValue().get(HeaderServerInterceptor.CUSTOM_HEADER_KEY));
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// New RPCs on client-side inherit the tracing context from the current Context.
// Safe usage of the unsafe trace API because CONTEXT_SPAN_KEY.get() returns the same value
// as Tracer.getCurrentSpan() except when no value available when the return value is null
// for the direct access and BlankSpan when Tracer API is used.
final ClientCallTracer tracerFactory = newClientCallTracer(CONTEXT_SPAN_KEY.get(), method);
ClientCall<ReqT, RespT> call =
next.newCall(
method,
callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(io.grpc.Status status, Metadata trailers) {
tracerFactory.callEnded(status);
super.onClose(status, trailers);
}
},
headers);
}
};
}
@Test
public void addOutboundHeaders() {
final Metadata.Key<String> credKey = Metadata.Key.of("Cred", Metadata.ASCII_STRING_MARSHALLER);
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(ClientCall.Listener<RespT> responseListener, Metadata headers) {
headers.put(credKey, "abcd");
super.start(responseListener, headers);
}
};
}
};
Channel intercepted = ClientInterceptors.intercept(channel, interceptor);
@SuppressWarnings("unchecked")
ClientCall.Listener<Void> listener = mock(ClientCall.Listener.class);
ClientCall<Void, Void> interceptedCall = intercepted.newCall(method, CallOptions.DEFAULT);
// start() on the intercepted call will eventually reach the call created by the real channel
interceptedCall.start(listener, new Metadata());
// The headers passed to the real channel call will contain the information inserted by the
// interceptor.
assertSame(listener, call.listener);
assertEquals("abcd", call.headers.get(credKey));
}
@Test
public void normalCall() {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(call) { };
}
};
Channel intercepted = ClientInterceptors.intercept(channel, interceptor);
ClientCall<Void, Void> interceptedCall = intercepted.newCall(method, CallOptions.DEFAULT);
assertNotSame(call, interceptedCall);
@SuppressWarnings("unchecked")
ClientCall.Listener<Void> listener = mock(ClientCall.Listener.class);
Metadata headers = new Metadata();
interceptedCall.start(listener, headers);
assertSame(listener, call.listener);
assertSame(headers, call.headers);
interceptedCall.sendMessage(null /*request*/);
assertThat(call.messages).containsExactly((Void) null /*request*/);
interceptedCall.halfClose();
assertTrue(call.halfClosed);
interceptedCall.request(1);
assertThat(call.requests).containsExactly(1);
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
String token = getToken(next);
if (token != null) {
headers.put(TOKEN, token);
}
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
if (isInvalidTokenError(status)) {
try {
refreshToken(next);
} catch (Exception e) {
// don't throw any error here.
// rpc will retry on expired auth token.
}
}
super.onClose(status, trailers);
}
}, headers);
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
final ClientCall<ReqT, RespT> clientCall = next.newCall(method, callOptions);
final ClientCall<ReqT, RespT> forwardingClientCall = new SimpleForwardingClientCall<ReqT, RespT>(clientCall) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
final String socketId = nextSocketId();
headers.put(Header.SOCKET_ID, socketId);
super.start(responseListener, headers);
}
};
return forwardingClientCall;
}
@Test
public void serverHeaderDeliveredToClient() {
class SpyingClientInterceptor implements ClientInterceptor {
ClientCall.Listener<?> spyListener;
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
spyListener = responseListener =
mock(ClientCall.Listener.class, delegatesTo(responseListener));
super.start(responseListener, headers);
}
};
}
}
SpyingClientInterceptor clientInterceptor = new SpyingClientInterceptor();
GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(channel)
.withInterceptors(clientInterceptor);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
blockingStub.sayHello(HelloRequest.getDefaultInstance());
assertNotNull(clientInterceptor.spyListener);
verify(clientInterceptor.spyListener).onHeaders(metadataCaptor.capture());
assertEquals(
"customRespondValue",
metadataCaptor.getValue().get(HeaderServerInterceptor.CUSTOM_HEADER_KEY));
}
@Test
public void addOutboundHeaders() {
final Metadata.Key<String> credKey = Metadata.Key.of("Cred", Metadata.ASCII_STRING_MARSHALLER);
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(ClientCall.Listener<RespT> responseListener, Metadata headers) {
headers.put(credKey, "abcd");
super.start(responseListener, headers);
}
};
}
};
Channel intercepted = ClientInterceptors.intercept(channel, interceptor);
@SuppressWarnings("unchecked")
ClientCall.Listener<Void> listener = mock(ClientCall.Listener.class);
ClientCall<Void, Void> interceptedCall = intercepted.newCall(method, CallOptions.DEFAULT);
// start() on the intercepted call will eventually reach the call created by the real channel
interceptedCall.start(listener, new Metadata());
// The headers passed to the real channel call will contain the information inserted by the
// interceptor.
assertSame(listener, call.listener);
assertEquals("abcd", call.headers.get(credKey));
}
@Test
public void normalCall() {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
return new SimpleForwardingClientCall<ReqT, RespT>(call) { };
}
};
Channel intercepted = ClientInterceptors.intercept(channel, interceptor);
ClientCall<Void, Void> interceptedCall = intercepted.newCall(method, CallOptions.DEFAULT);
assertNotSame(call, interceptedCall);
@SuppressWarnings("unchecked")
ClientCall.Listener<Void> listener = mock(ClientCall.Listener.class);
Metadata headers = new Metadata();
interceptedCall.start(listener, headers);
assertSame(listener, call.listener);
assertSame(headers, call.headers);
interceptedCall.sendMessage(null /*request*/);
assertThat(call.messages).containsExactly((Void) null /*request*/);
interceptedCall.halfClose();
assertTrue(call.halfClosed);
interceptedCall.request(1);
assertThat(call.requests).containsExactly(1);
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// New RPCs on client-side inherit the tracing context from the current Context.
// Safe usage of the unsafe trace API because CONTEXT_SPAN_KEY.get() returns the same value
// as Tracer.getCurrentSpan() except when no value available when the return value is null
// for the direct access and BlankSpan when Tracer API is used.
final ClientCallTracer tracerFactory =
newClientCallTracer(ContextUtils.getValue(Context.current()), method);
ClientCall<ReqT, RespT> call =
next.newCall(
method,
callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(io.grpc.Status status, Metadata trailers) {
tracerFactory.callEnded(status);
super.onClose(status, trailers);
}
},
headers);
}
};
}
Channel clientWithB3SingleHeader(TraceContext parent) {
return ClientInterceptors.intercept(client, new ClientInterceptor() {
@Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(Key.of("b3", ASCII_STRING_MARSHALLER),
B3SingleFormat.writeB3SingleFormat(parent));
super.start(responseListener, headers);
}
};
}
});
}
/**
* NOTE: for this to work, the tracing interceptor must be last (so that it executes first)
*
* <p>Also notice that we are only making the current context available in the request side.
*/
@Test public void currentSpanVisibleToUserInterceptors() {
closeClient(client);
client = newClient(
new ClientInterceptor() {
@Override public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
tracing.tracer().currentSpanCustomizer().annotate("start");
super.start(responseListener, headers);
}
@Override public void sendMessage(ReqT message) {
tracing.tracer().currentSpanCustomizer().annotate("sendMessage");
super.sendMessage(message);
}
};
}
},
grpcTracing.newClientInterceptor()
);
GreeterGrpc.newBlockingStub(client).sayHello(HELLO_REQUEST);
assertThat(testSpanHandler.takeRemoteSpan(CLIENT).annotations())
.extracting(Entry::getValue)
.containsOnly("start", "sendMessage");
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
ClientCall<ReqT, RespT> newCall = channel.newCall(methodDescriptor, callOptions);
final AtomicBoolean countedCancel = new AtomicBoolean(false);
// A streaming call might be terminated in one of several possible ways:
// * The call completes normally -> onClose() will be invoked.
// * The context is cancelled -> CancellationListener.cancelled() will be called.
// * The call itself is cancelled (doesn't currently happen) -> ClientCall.cancel() called.
//
// It's possible more than one of these could happen, so we use countedCancel to make sure we
// don't double count a decrement.
Context.current()
.addListener(
context -> {
if (countedCancel.compareAndSet(false, true)) {
ongoingRequestCount.decrementAndGet();
}
},
backgroundTasksThreadPool);
return new SimpleForwardingClientCall(newCall) {
@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
if (countedCancel.compareAndSet(false, true)) {
ongoingRequestCount.decrementAndGet();
}
super.cancel(message, cause);
}
@Override
public void start(Listener responseListener, Metadata headers) {
ongoingRequestCount.incrementAndGet();
this.delegate()
.start(
new SimpleForwardingClientCallListener(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
if (countedCancel.compareAndSet(false, true)) {
ongoingRequestCount.decrementAndGet();
}
super.onClose(status, trailers);
}
},
headers);
}
};
}