下面列出了io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener#io.grpc.ClientCall 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Cancels a call, and throws the exception.
*
* @param t must be a RuntimeException or Error
*/
private static RuntimeException cancelThrow(ClientCall<?, ?> call, Throwable t) {
try {
call.cancel(null, t);
} catch (Throwable e) {
assert e instanceof RuntimeException || e instanceof Error;
logger.log(Level.SEVERE, "RuntimeException encountered while closing call", e);
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else if (t instanceof Error) {
throw (Error) t;
}
// should be impossible
throw new AssertionError(t);
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT,RespT> method, CallOptions callOptions, Channel next) {
LOGGER.info("Intercepted " + method.getFullMethodName());
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
call = new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
if (apiKey != null && !apiKey.isEmpty()) {
LOGGER.info("Attaching API Key: " + apiKey);
headers.put(API_KEY_HEADER, apiKey);
}
super.start(responseListener, headers);
}
};
return call;
}
@Test
public void testCopyCredentialToHeaders() throws IOException {
ListMultimap<String, String> values = LinkedListMultimap.create();
values.put("Authorization", "token1");
values.put("Authorization", "token2");
values.put("Extra-Authorization", "token3");
values.put("Extra-Authorization", "token4");
when(credentials.getRequestMetadata(any(URI.class))).thenReturn(Multimaps.asMap(values));
ClientCall<String, Integer> interceptedCall =
interceptor.interceptCall(descriptor, CallOptions.DEFAULT, channel);
Metadata headers = new Metadata();
interceptedCall.start(listener, headers);
assertEquals(listener, call.responseListener);
assertEquals(headers, call.headers);
Iterable<String> authorization = headers.getAll(AUTHORIZATION);
Assert.assertArrayEquals(new String[]{"token1", "token2"},
Iterables.toArray(authorization, String.class));
Iterable<String> extraAuthorization = headers.getAll(EXTRA_AUTHORIZATION);
Assert.assertArrayEquals(new String[]{"token3", "token4"},
Iterables.toArray(extraAuthorization, String.class));
}
@Test
public void testWithOAuth2Credential() {
final AccessToken token = new AccessToken("allyourbase", new Date(Long.MAX_VALUE));
final OAuth2Credentials oAuth2Credentials = new OAuth2Credentials() {
@Override
public AccessToken refreshAccessToken() throws IOException {
return token;
}
};
interceptor = new ClientAuthInterceptor(oAuth2Credentials, executor);
ClientCall<String, Integer> interceptedCall =
interceptor.interceptCall(descriptor, CallOptions.DEFAULT, channel);
Metadata headers = new Metadata();
interceptedCall.start(listener, headers);
assertEquals(listener, call.responseListener);
assertEquals(headers, call.headers);
Iterable<String> authorization = headers.getAll(AUTHORIZATION);
Assert.assertArrayEquals(new String[]{"Bearer allyourbase"},
Iterables.toArray(authorization, String.class));
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// New RPCs on client-side inherit the tag context from the current Context.
TagContext parentCtx = tagger.getCurrentTagContext();
final ClientCallTracer tracerFactory =
newClientCallTracer(parentCtx, method.getFullMethodName(),
recordStartedRpcs, recordFinishedRpcs);
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
tracerFactory.callEnded(status);
super.onClose(status, trailers);
}
},
headers);
}
};
}
@Test
public void updateSubchannelAddresses_existingAddressDoesNotConnect() {
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata()); // Create LB
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
Helper helper = helperCaptor.getValue();
Subchannel subchannel = createSubchannelSafely(helper, servers.get(0), Attributes.EMPTY);
subchannel.requestConnection();
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();
List<SocketAddress> changedList = new ArrayList<>(servers.get(0).getAddresses());
changedList.add(new FakeSocketAddress("aDifferentServer"));
helper.updateSubchannelAddresses(subchannel, new EquivalentAddressGroup(changedList));
subchannel.requestConnection();
assertNull(newTransports.poll());
}
@Test
public void unaryFutureCallFailed() throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =
new AtomicReference<ClientCall.Listener<String>>();
NoopClientCall<Integer, String> call = new NoopClientCall<Integer, String>() {
@Override
public void start(io.grpc.ClientCall.Listener<String> responseListener, Metadata headers) {
listener.set(responseListener);
}
};
Integer req = 2;
ListenableFuture<String> future = ClientCalls.futureUnaryCall(call, req);
Metadata trailers = new Metadata();
listener.get().onClose(Status.INTERNAL, trailers);
try {
future.get();
fail("Should fail");
} catch (ExecutionException e) {
Status status = Status.fromThrowable(e);
assertEquals(Status.INTERNAL, status);
Metadata metadata = Status.trailersFromThrowable(e);
assertSame(trailers, metadata);
}
}
@Test
public void channelStat_callStarted() throws Exception {
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
assertEquals(0, getStats(channel).callsStarted);
call.start(mockCallListener, new Metadata());
assertEquals(1, getStats(channel).callsStarted);
assertEquals(executor.getTicker().read(), getStats(channel).lastCallStartedNanos);
}
@Override
public <Q, A> ClientCall<Q, A> interceptCall(final MethodDescriptor<Q, A> methodDescriptor,
final CallOptions callOptions, final Channel channel) {
final MetricSet metrics = metricsFor(methodDescriptor);
return new MetricCollectingClientCall<>(
channel.newCall(methodDescriptor, callOptions),
this.registry,
metrics.getRequestCounter(),
metrics.getResponseCounter(),
metrics.getTimerFunction());
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new CheckedForwardingClientCall<ReqT, RespT>(
next.newCall(method.toBuilder().setSafe(true).build(), callOptions)) {
@Override
public void checkedStart(Listener<RespT> responseListener, Metadata headers) {
delegate().start(responseListener, headers);
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
if (MethodDescriptor.MethodType.CLIENT_STREAMING == method.getType()) {
if (logger.isDebugEnabled()) {
logger.debug("interceptCall {}", method.getFullMethodName());
}
final ClientCall<ReqT, RespT> newCall = next.newCall(method, callOptions);
return new DiscardClientCall<ReqT, RespT>(newCall, this.listener, maxPendingThreshold);
} else {
return next.newCall(method, callOptions);
}
}
@Test
public void test() throws InterruptedException, ExecutionException {
final CountDownLatch latch = new CountDownLatch(1);
final ClientBuilder builder = Client.builder().endpoints(cluster.getClientEndpoints())
.header("MyHeader1", "MyHeaderVal1").header("MyHeader2", "MyHeaderVal2").interceptor(new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(responseListener, headers);
assertThat(headers.get(Metadata.Key.of("MyHeader1", Metadata.ASCII_STRING_MARSHALLER)))
.isEqualTo("MyHeaderVal1");
assertThat(headers.get(Metadata.Key.of("MyHeader2", Metadata.ASCII_STRING_MARSHALLER)))
.isEqualTo("MyHeaderVal2");
latch.countDown();
}
};
}
});
try (Client client = builder.build()) {
CompletableFuture<PutResponse> future = client.getKVClient().put(bytesOf("sample_key"), bytesOf("sample_key"));
latch.await(1, TimeUnit.MINUTES);
future.get();
}
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
if (clientEncoding && serverAcceptEncoding) {
callOptions = callOptions.withCompression("fzip");
}
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
return new ClientCompressor<ReqT, RespT>(call);
}
@Override
public void run() {
while (true) {
maxOutstanding.acquireUninterruptibly();
if (shutdown) {
maxOutstanding.release();
return;
}
final ClientCall<ByteBuf, ByteBuf> call =
channel.newCall(LoadServer.GENERIC_STREAMING_PING_PONG_METHOD, CallOptions.DEFAULT);
call.start(new ClientCall.Listener<ByteBuf>() {
long now = System.nanoTime();
@Override
public void onMessage(ByteBuf message) {
delay(System.nanoTime() - now);
if (shutdown) {
call.cancel("Shutting down", null);
return;
}
call.request(1);
call.sendMessage(genericRequest.slice());
now = System.nanoTime();
}
@Override
public void onClose(Status status, Metadata trailers) {
maxOutstanding.release();
Level level = shutdown ? Level.FINE : Level.INFO;
if (!status.isOk() && status.getCode() != Status.Code.CANCELLED) {
log.log(level, "Error in Generic Async Ping-Pong call", status.getCause());
}
}
}, new Metadata());
call.request(1);
call.sendMessage(genericRequest.slice());
}
}
@Test
public void subchannelChannel_failWaitForReady() {
createChannel();
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
Channel sChannel = subchannel.asChannel();
Metadata headers = new Metadata();
// Subchannel must be READY when creating the RPC.
requestConnectionSafely(helper, subchannel);
verify(mockTransportFactory)
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
transportListener.transportReady();
assertEquals(0, balancerRpcExecutor.numPendingTasks());
// Wait-for-ready RPC is not allowed
ClientCall<String, Integer> call =
sChannel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
call.start(mockCallListener, headers);
verify(mockTransport, never()).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verifyZeroInteractions(mockCallListener);
assertEquals(1, balancerRpcExecutor.runDueTasks());
verify(mockCallListener).onClose(
same(SubchannelChannel.WAIT_FOR_READY_ERROR), any(Metadata.class));
}
TracingClientCall(ClientCall<REQUEST, RESPONSE> delegate, MethodDescriptor<REQUEST, RESPONSE> method,
Channel channel) {
super(delegate);
this.methodDescriptor = method;
this.serviceName = formatOperationName(method);
this.remotePeer = channel.authority();
this.operationPrefix = OperationNameFormatUtil.formatOperationName(method) + CLIENT;
}
/**
* {@inheritDoc}
*/
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method,
final CallOptions callOptions,
final Channel next
) {
final String methodType = method.getType().toString();
final String methodName = method.getFullMethodName();
final int channelId = next.hashCode();
log.info("gRPC {} call: {} (channel: {})", methodType, methodName, channelId);
return next.newCall(method, callOptions);
}
@Test
public void immediateDeadlineExceeded() {
createChannel();
ClientCall<String, Integer> call =
channel.newCall(method, CallOptions.DEFAULT.withDeadlineAfter(0, TimeUnit.NANOSECONDS));
call.start(mockCallListener, new Metadata());
assertEquals(1, executor.runDueTasks());
verify(mockCallListener).onClose(statusCaptor.capture(), any(Metadata.class));
Status status = statusCaptor.getValue();
assertSame(Status.DEADLINE_EXCEEDED.getCode(), status.getCode());
}
@Override
public void clientCallStartExit(ClientCall.Listener<?> listener, @Nullable Throwable thrown) {
if (thrown == null) {
return;
}
// when there is an exception, we have to end span and perform some cleanup
Span span = clientCallListenerSpans.remove(listener);
if (span != null) {
span.end();
}
}
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
private static void onExit(@Advice.Argument(0) ClientCall.Listener<?> listener,
@Advice.Thrown @Nullable Throwable thrown,
@Advice.Local("span") @Nullable Span span) {
if (span == null) {
return;
}
GrpcHelper helper = grpcHelperManager.getForClassLoaderOfClass(ClientCall.class);
if (helper != null) {
helper.clientCallStartExit(listener, thrown);
}
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
return new ClientCallImpl<ReqT, RespT>(
method,
getCallExecutor(callOptions),
callOptions,
transportProvider,
terminated ? null : transportFactory.getScheduledExecutorService(),
channelCallTracer,
retryEnabled)
.setFullStreamDecompression(fullStreamDecompression)
.setDecompressorRegistry(decompressorRegistry)
.setCompressorRegistry(compressorRegistry);
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> listener, Metadata metadata) {
metadata.put(Metadata.Key.of("token", ASCII_STRING_MARSHALLER), tokenValue);
super.start(listener, metadata);
}
};
}
@Test
public void serverHeaderDeliveredToClient() {
class SpyingClientInterceptor implements ClientInterceptor {
ClientCall.Listener<?> spyListener;
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
spyListener = responseListener =
mock(ClientCall.Listener.class, delegatesTo(responseListener));
super.start(responseListener, headers);
}
};
}
}
SpyingClientInterceptor clientInterceptor = new SpyingClientInterceptor();
GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(channel)
.withInterceptors(clientInterceptor);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
blockingStub.sayHello(HelloRequest.getDefaultInstance());
assertNotNull(clientInterceptor.spyListener);
verify(clientInterceptor.spyListener).onHeaders(metadataCaptor.capture());
assertEquals(
"customRespondValue",
metadataCaptor.getValue().get(HeaderServerInterceptor.CUSTOM_HEADER_KEY));
}
public MirrorSubscriptionHandle subscribe(
MirrorClient mirrorClient,
Consumer<MirrorConsensusTopicResponse> onNext,
Consumer<Throwable> onError)
{
final ClientCall<ConsensusTopicQuery, ConsensusTopicResponse> call =
mirrorClient.channel.newCall(ConsensusServiceGrpc.getSubscribeTopicMethod(), CallOptions.DEFAULT);
final MirrorSubscriptionHandle subscriptionHandle = new MirrorSubscriptionHandle(() -> {
call.cancel("unsubscribed", null);
});
ClientCalls.asyncServerStreamingCall(call, builder.build(), new StreamObserver<ConsensusTopicResponse>() {
@Override
public void onNext(ConsensusTopicResponse consensusTopicResponse) {
onNext.accept(new MirrorConsensusTopicResponse(consensusTopicResponse));
}
@Override
public void onError(Throwable throwable) {
onError.accept(throwable);
}
@Override
public void onCompleted() {
// Do nothing
}
});
return subscriptionHandle;
}
@Test
public void updateBalancingStateDoesUpdatePicker() {
ClientStream mockStream = mock(ClientStream.class);
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
// Make the transport available with subchannel2
Subchannel subchannel1 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
Subchannel subchannel2 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel2);
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
transportListener.transportReady();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel1));
updateBalancingStateSafely(helper, READY, mockPicker);
executor.runDueTasks();
verify(mockTransport, never())
.newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verify(mockStream, never()).start(any(ClientStreamListener.class));
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(subchannel2));
updateBalancingStateSafely(helper, READY, mockPicker);
executor.runDueTasks();
verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
verify(mockStream).start(any(ClientStreamListener.class));
}
@Override
public void registerSpan(@Nullable ClientCall<?, ?> clientCall, Span span) {
if (clientCall != null) {
clientCallSpans.put(clientCall, span);
}
span.deactivate();
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
@SuppressWarnings("unchecked") // handler matches method, but that type is inexpressible
LoggingHandler<ReqT, RespT> handler = selectHandler(method);
if (handler != null) {
return new LoggingForwardingCall<>(call, handler, method);
} else {
return call;
}
}
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> interceptCall(
MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions, Channel next) {
assertThat(callOptions.getCredentials()).isEqualTo(credentials);
// Remove the call credentials to allow testing with dummy ones.
return next.newCall(method, callOptions.withCallCredentials(null));
}
public static void attachCancellingCallback(Emitter emitter, ClientCall... clientCalls) {
emitter.setCancellation(() -> {
for (ClientCall call : clientCalls) {
call.cancel(CANCELLING_MESSAGE, null);
}
});
}
@Test
public void cancelInOnMessageShouldInvokeStreamCancel() throws Exception {
final ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
method,
MoreExecutors.directExecutor(),
baseCallOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
/* retryEnabled= */ false);
final Exception cause = new Exception();
ClientCall.Listener<Void> callListener =
new ClientCall.Listener<Void>() {
@Override
public void onMessage(Void message) {
call.cancel("foo", cause);
}
};
call.start(callListener, new Metadata());
call.halfClose();
call.request(1);
verify(stream).start(listenerArgumentCaptor.capture());
ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
streamListener.onReady();
streamListener.headersRead(new Metadata());
streamListener
.messagesAvailable(new SingleMessageProducer(new ByteArrayInputStream(new byte[0])));
verify(stream).cancel(statusCaptor.capture());
Status status = statusCaptor.getValue();
assertEquals(Status.CANCELLED.getCode(), status.getCode());
assertEquals("foo", status.getDescription());
assertSame(cause, status.getCause());
}