下面列出了io.grpc.ClientInterceptors#intercept ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/** Sends a cacheable unary rpc using GET. Requires that the server is behind a caching proxy. */
public void cacheableUnary() {
// THIS TEST IS BROKEN. Enabling safe just on the MethodDescriptor does nothing by itself. This
// test would need to enable GET on the channel.
// Set safe to true.
MethodDescriptor<SimpleRequest, SimpleResponse> safeCacheableUnaryCallMethod =
TestServiceGrpc.getCacheableUnaryCallMethod().toBuilder().setSafe(true).build();
// Set fake user IP since some proxies (GFE) won't cache requests from localhost.
Metadata.Key<String> userIpKey = Metadata.Key.of("x-user-ip", Metadata.ASCII_STRING_MARSHALLER);
Metadata metadata = new Metadata();
metadata.put(userIpKey, "1.2.3.4");
Channel channelWithUserIpKey =
ClientInterceptors.intercept(channel, MetadataUtils.newAttachHeadersInterceptor(metadata));
SimpleRequest requests1And2 =
SimpleRequest.newBuilder()
.setPayload(
Payload.newBuilder()
.setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
.build();
SimpleRequest request3 =
SimpleRequest.newBuilder()
.setPayload(
Payload.newBuilder()
.setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
.build();
SimpleResponse response1 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
SimpleResponse response2 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
SimpleResponse response3 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, request3);
assertEquals(response1, response2);
assertNotEquals(response1, response3);
// THIS TEST IS BROKEN. See comment at start of method.
}
@Before
public void setUp() throws Exception {
grpcServerRule
.getServiceRegistry()
.addService(
ServerInterceptors.intercept(greeterServiceImpl, injectCacheControlInterceptor));
grpcServerRule.getServiceRegistry().addService(anotherGreeterServiceImpl);
baseChannel = grpcServerRule.getChannel();
SafeMethodCachingInterceptor interceptor =
SafeMethodCachingInterceptor.newSafeMethodCachingInterceptor(cache);
channelToUse = ClientInterceptors.intercept(baseChannel, interceptor);
}
@Test
public void invalidResponseMaxAge_usesDefault() throws Exception {
SafeMethodCachingInterceptor interceptorWithCustomMaxAge =
SafeMethodCachingInterceptor.newSafeMethodCachingInterceptor(cache, 1);
channelToUse = ClientInterceptors.intercept(baseChannel, interceptorWithCustomMaxAge);
cacheControlDirectives.add("max-age=-10");
HelloReply reply1 =
ClientCalls.blockingUnaryCall(
channelToUse, safeGreeterSayHelloMethod, CallOptions.DEFAULT, message);
HelloReply reply2 =
ClientCalls.blockingUnaryCall(
channelToUse, safeGreeterSayHelloMethod, CallOptions.DEFAULT, message);
assertEquals(reply1, reply2);
// Wait for cache entry to expire
sleepAtLeast(1001);
assertNotEquals(
reply1,
ClientCalls.blockingUnaryCall(
channelToUse, safeGreeterSayHelloMethod, CallOptions.DEFAULT, message));
Truth.assertThat(cache.removedKeys).hasSize(1);
assertEquals(
new SafeMethodCachingInterceptor.Key(
GreeterGrpc.getSayHelloMethod().getFullMethodName(), message),
cache.removedKeys.get(0));
}
@Test
public void afterDefaultMaxAge_cacheEntryInvalidated() throws Exception {
SafeMethodCachingInterceptor interceptorWithCustomMaxAge =
SafeMethodCachingInterceptor.newSafeMethodCachingInterceptor(cache, 1);
channelToUse = ClientInterceptors.intercept(baseChannel, interceptorWithCustomMaxAge);
HelloReply reply1 =
ClientCalls.blockingUnaryCall(
channelToUse, safeGreeterSayHelloMethod, CallOptions.DEFAULT, message);
HelloReply reply2 =
ClientCalls.blockingUnaryCall(
channelToUse, safeGreeterSayHelloMethod, CallOptions.DEFAULT, message);
assertSame(reply1, reply2);
// Wait for cache entry to expire
sleepAtLeast(1001);
assertNotEquals(
reply1,
ClientCalls.blockingUnaryCall(
channelToUse, safeGreeterSayHelloMethod, CallOptions.DEFAULT, message));
Truth.assertThat(cache.removedKeys).hasSize(1);
assertEquals(
new SafeMethodCachingInterceptor.Key(
GreeterGrpc.getSayHelloMethod().getFullMethodName(), message),
cache.removedKeys.get(0));
}
/** Sends a cacheable unary rpc using GET. Requires that the server is behind a caching proxy. */
public void cacheableUnary() {
// Set safe to true.
MethodDescriptor<SimpleRequest, SimpleResponse> safeCacheableUnaryCallMethod =
TestServiceGrpc.getCacheableUnaryCallMethod().toBuilder().setSafe(true).build();
// Set fake user IP since some proxies (GFE) won't cache requests from localhost.
Metadata.Key<String> userIpKey = Metadata.Key.of("x-user-ip", Metadata.ASCII_STRING_MARSHALLER);
Metadata metadata = new Metadata();
metadata.put(userIpKey, "1.2.3.4");
Channel channelWithUserIpKey =
ClientInterceptors.intercept(channel, MetadataUtils.newAttachHeadersInterceptor(metadata));
SimpleRequest requests1And2 =
SimpleRequest.newBuilder()
.setPayload(
Payload.newBuilder()
.setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
.build();
SimpleRequest request3 =
SimpleRequest.newBuilder()
.setPayload(
Payload.newBuilder()
.setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
.build();
SimpleResponse response1 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
SimpleResponse response2 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
SimpleResponse response3 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, request3);
assertEquals(response1, response2);
assertNotEquals(response1, response3);
}
public Driver(Builder builder) {
this.segments = builder.segments;
this.runtime = builder.runtimeSeconds;
this.latencyAccumulator = builder.latencyAccumulator;
Metadata metadata = new Metadata();
metadata.put(ID_HEADER, builder.id);
this.channel = ClientInterceptors.intercept(NettyChannelBuilder.forTarget("localhost:" + builder.port)
.usePlaintext(true)
.build(),
MetadataUtils.newAttachHeadersInterceptor(metadata));
}
/** Return {@link io.grpc.Channel} which is used by Cloud Pub/Sub gRPC API's. */
public static Channel getChannel(CredentialsProvider credentialsProvider) throws IOException {
ManagedChannel channelImpl =
NettyChannelBuilder.forAddress(ENDPOINT, 443)
.negotiationType(NegotiationType.TLS)
// Maximum Pub/Sub message size is 10MB.
.maxInboundMessageSize(10 * 1024 * 1024)
.build();
final ClientAuthInterceptor interceptor =
new ClientAuthInterceptor(
credentialsProvider.getCredentials(),
Executors.newCachedThreadPool());
return ClientInterceptors.intercept(channelImpl, interceptor);
}
@Before
public void setUp() throws Exception {
grpcServerRule
.getServiceRegistry()
.addService(
ServerInterceptors.intercept(greeterServiceImpl, injectCacheControlInterceptor));
grpcServerRule.getServiceRegistry().addService(anotherGreeterServiceImpl);
baseChannel = grpcServerRule.getChannel();
SafeMethodCachingInterceptor interceptor =
SafeMethodCachingInterceptor.newSafeMethodCachingInterceptor(cache);
channelToUse = ClientInterceptors.intercept(baseChannel, interceptor);
}
/** Construct client connecting to HelloWorld server at {@code host:port}. */
public HelloWorldClient(String address, String apiKey) {
channel = ManagedChannelBuilder.forTarget(address)
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
// needing certificates.
.usePlaintext(true)
.build();
Channel ch = ClientInterceptors.intercept(channel, new Interceptor(apiKey));
blockingStub = GreeterGrpc.newBlockingStub(ch);
}
static BookstoreGrpc.BookstoreBlockingStub createBookstoreStub(
String address, String apiKey, String authToken) {
Channel channel = ManagedChannelBuilder.forTarget(address)
.usePlaintext(true)
.build();
channel = ClientInterceptors.intercept(channel, new Interceptor(apiKey, authToken));
return BookstoreGrpc.newBlockingStub(channel);
}
@Test
public void binaryLogInstalled() throws Exception {
final SettableFuture<Boolean> intercepted = SettableFuture.create();
channelBuilder.binlog = new BinaryLog() {
@Override
public void close() throws IOException {
// noop
}
@Override
public <ReqT, RespT> ServerMethodDefinition<?, ?> wrapMethodDefinition(
ServerMethodDefinition<ReqT, RespT> oMethodDef) {
return oMethodDef;
}
@Override
public Channel wrapChannel(Channel channel) {
return ClientInterceptors.intercept(channel,
new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
intercepted.set(true);
return next.newCall(method, callOptions);
}
});
}
};
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
assertTrue(intercepted.get());
}
/**
* A custom client.
*/
private CustomHeaderClient(String host, int port) {
originChannel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.build();
ClientInterceptor interceptor = new HeaderClientInterceptor();
Channel channel = ClientInterceptors.intercept(originChannel, interceptor);
blockingStub = GreeterGrpc.newBlockingStub(channel);
}
@Test
public void invalidResponseMaxAge_usesDefault() throws Exception {
SafeMethodCachingInterceptor interceptorWithCustomMaxAge =
SafeMethodCachingInterceptor.newSafeMethodCachingInterceptor(cache, 1);
channelToUse = ClientInterceptors.intercept(baseChannel, interceptorWithCustomMaxAge);
cacheControlDirectives.add("max-age=-10");
HelloReply reply1 =
ClientCalls.blockingUnaryCall(
channelToUse, safeGreeterSayHelloMethod, CallOptions.DEFAULT, message);
HelloReply reply2 =
ClientCalls.blockingUnaryCall(
channelToUse, safeGreeterSayHelloMethod, CallOptions.DEFAULT, message);
assertEquals(reply1, reply2);
// Wait for cache entry to expire
sleepAtLeast(1001);
assertNotEquals(
reply1,
ClientCalls.blockingUnaryCall(
channelToUse, safeGreeterSayHelloMethod, CallOptions.DEFAULT, message));
Truth.assertThat(cache.removedKeys).hasSize(1);
assertEquals(
new SafeMethodCachingInterceptor.Key(
GreeterGrpc.getSayHelloMethod().getFullMethodName(), message),
cache.removedKeys.get(0));
}
@Override
protected String doInBackground(Object... params) {
String host = (String) params[0];
String message = (String) params[1];
String portStr = (String) params[2];
boolean useGet = (boolean) params[3];
boolean noCache = (boolean) params[4];
boolean onlyIfCached = (boolean) params[5];
int port = TextUtils.isEmpty(portStr) ? 0 : Integer.valueOf(portStr);
try {
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
Channel channelToUse =
ClientInterceptors.intercept(
channel, SafeMethodCachingInterceptor.newSafeMethodCachingInterceptor(cache));
HelloRequest request = HelloRequest.newBuilder().setName(message).build();
HelloReply reply;
if (useGet) {
MethodDescriptor<HelloRequest, HelloReply> safeCacheableUnaryCallMethod =
GreeterGrpc.getSayHelloMethod().toBuilder().setSafe(true).build();
CallOptions callOptions = CallOptions.DEFAULT;
if (noCache) {
callOptions =
callOptions.withOption(SafeMethodCachingInterceptor.NO_CACHE_CALL_OPTION, true);
}
if (onlyIfCached) {
callOptions =
callOptions.withOption(
SafeMethodCachingInterceptor.ONLY_IF_CACHED_CALL_OPTION, true);
}
reply =
ClientCalls.blockingUnaryCall(
channelToUse, safeCacheableUnaryCallMethod, callOptions, request);
} else {
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channelToUse);
reply = stub.sayHello(request);
}
return reply.getMessage();
} catch (Exception e) {
Log.e(TAG, "RPC failed", e);
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
pw.flush();
return String.format("Failed... : %n%s", sw);
}
}
@Test
public void testCallOkMultipleRequests() {
WriteRequest request1 =
WriteRequest.newBuilder()
.setResourceName("test")
.setData(ByteString.copyFromUtf8("abc"))
.build();
WriteRequest request2 =
WriteRequest.newBuilder()
.setResourceName("test")
.setData(ByteString.copyFromUtf8("def"))
.build();
WriteResponse response = WriteResponse.newBuilder().setCommittedSize(6).build();
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest writeRequest) {}
@Override
public void onError(Throwable throwable) {}
@Override
public void onCompleted() {
streamObserver.onNext(response);
streamObserver.onCompleted();
}
};
}
});
@SuppressWarnings("unchecked")
LoggingHandler<WriteRequest, WriteResponse> handler = Mockito.mock(LoggingHandler.class);
RpcCallDetails details = RpcCallDetails.getDefaultInstance();
Mockito.when(handler.getDetails()).thenReturn(details);
AsynchronousFileOutputStream output = Mockito.mock(AsynchronousFileOutputStream.class);
LoggingInterceptor interceptor = getInterceptorWithAlwaysThisHandler(handler, output);
Channel channel =
ClientInterceptors.intercept(
InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), interceptor);
ByteStreamStub stub = ByteStreamGrpc.newStub(channel);
clock.advanceMillis(1000);
@SuppressWarnings("unchecked")
StreamObserver<WriteResponse> responseObserver = Mockito.mock(StreamObserver.class);
// Write both responses.
StreamObserver<WriteRequest> requester = stub.write(responseObserver);
requester.onNext(request1);
requester.onNext(request2);
clock.advanceMillis(1000);
requester.onCompleted();
ArgumentCaptor<WriteRequest> resultCaptor = ArgumentCaptor.forClass(WriteRequest.class);
LogEntry expectedEntry =
LogEntry.newBuilder()
.setMethodName(ByteStreamGrpc.getWriteMethod().getFullMethodName())
.setDetails(details)
.setStatus(com.google.rpc.Status.getDefaultInstance())
.setStartTime(Timestamp.newBuilder().setSeconds(1))
.setEndTime(Timestamp.newBuilder().setSeconds(2))
.build();
verify(handler, times(2)).handleReq(resultCaptor.capture());
assertThat(resultCaptor.getAllValues().get(0)).isEqualTo(request1);
assertThat(resultCaptor.getAllValues().get(1)).isEqualTo(request2);
verify(handler).handleResp(response);
verify(handler).getDetails();
verify(output).write(expectedEntry);
}
/**
* Wraps a channel to provide binary logging on {@link ClientCall}s as needed.
*/
@Override
public final Channel wrapChannel(Channel channel) {
return ClientInterceptors.intercept(channel, binaryLogShim);
}
ManagedChannelImpl(
AbstractManagedChannelImplBuilder<?> builder,
ClientTransportFactory clientTransportFactory,
BackoffPolicy.Provider backoffPolicyProvider,
ObjectPool<? extends Executor> balancerRpcExecutorPool,
Supplier<Stopwatch> stopwatchSupplier,
List<ClientInterceptor> interceptors,
final TimeProvider timeProvider) {
this.target = checkNotNull(builder.target, "target");
this.nameResolverFactory = builder.getNameResolverFactory();
this.nameResolverParams = checkNotNull(builder.getNameResolverParams(), "nameResolverParams");
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverParams);
//----begin----注册Consumer信息,设置注册对象----
this.nameResolver.setRegistry(consumerServiceRegistry);
this.nameResolver.setManagedChannel(this);
//----end----注册Consumer信息,设置注册对象----
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
maxTraceEvents = builder.maxTraceEvents;
channelTracer = new ChannelTracer(
logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
"Channel for '" + target + "'");
channelLogger = new ChannelLoggerImpl(channelTracer, timeProvider);
if (builder.loadBalancerFactory == null) {
this.loadBalancerFactory = new AutoConfiguredLoadBalancerFactory();
} else {
this.loadBalancerFactory = builder.loadBalancerFactory;
}
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
this.executor = checkNotNull(executorPool.getObject(), "executor");
this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
this.delayedTransport.start(delayedTransportListener);
this.backoffPolicyProvider = backoffPolicyProvider;
this.transportFactory =
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
this.scheduledExecutorForBalancer =
new ScheduledExecutorForBalancer(transportFactory.getScheduledExecutorService());
this.retryEnabled = builder.retryEnabled && !builder.temporarilyDisableRetry;
serviceConfigInterceptor = new ServiceConfigInterceptor(
retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
Channel channel = new RealChannel(nameResolver.getServiceAuthority());
channel = ClientInterceptors.intercept(channel, serviceConfigInterceptor);
if (builder.binlog != null) {
channel = builder.binlog.wrapChannel(channel);
}
this.interceptorChannel = ClientInterceptors.intercept(channel, interceptors);
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
this.idleTimeoutMillis = builder.idleTimeoutMillis;
} else {
checkArgument(
builder.idleTimeoutMillis
>= AbstractManagedChannelImplBuilder.IDLE_MODE_MIN_TIMEOUT_MILLIS,
"invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
this.idleTimeoutMillis = builder.idleTimeoutMillis;
}
idleTimer = new Rescheduler(
new IdleModeTimer(),
syncContext,
transportFactory.getScheduledExecutorService(),
stopwatchSupplier.get());
this.fullStreamDecompression = builder.fullStreamDecompression;
this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
this.userAgent = builder.userAgent;
this.channelBufferLimit = builder.retryBufferSize;
this.perRpcBufferLimit = builder.perRpcBufferLimit;
final class ChannelCallTracerFactory implements CallTracer.Factory {
@Override
public CallTracer create() {
return new CallTracer(timeProvider);
}
}
this.callTracerFactory = new ChannelCallTracerFactory();
channelCallTracer = callTracerFactory.create();
this.channelz = checkNotNull(builder.channelz);
channelz.addRootChannel(this);
//----begin----注册Consumer信息,调用注册方法----
new Thread(registryRunnable, CLIENT_REGISTRY_THREAD_NAME).start();
//----end----功能描述,调用注册方法----
}
@Override
protected String doInBackground(Object... params) {
String host = (String) params[0];
String message = (String) params[1];
String portStr = (String) params[2];
boolean useGet = (boolean) params[3];
boolean noCache = (boolean) params[4];
boolean onlyIfCached = (boolean) params[5];
int port = TextUtils.isEmpty(portStr) ? 0 : Integer.valueOf(portStr);
try {
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
Channel channelToUse =
ClientInterceptors.intercept(
channel, SafeMethodCachingInterceptor.newSafeMethodCachingInterceptor(cache));
HelloRequest request = HelloRequest.newBuilder().setName(message).build();
HelloReply reply;
if (useGet) {
MethodDescriptor<HelloRequest, HelloReply> safeCacheableUnaryCallMethod =
GreeterGrpc.getSayHelloMethod().toBuilder().setSafe(true).build();
CallOptions callOptions = CallOptions.DEFAULT;
if (noCache) {
callOptions =
callOptions.withOption(SafeMethodCachingInterceptor.NO_CACHE_CALL_OPTION, true);
}
if (onlyIfCached) {
callOptions =
callOptions.withOption(
SafeMethodCachingInterceptor.ONLY_IF_CACHED_CALL_OPTION, true);
}
reply =
ClientCalls.blockingUnaryCall(
channelToUse, safeCacheableUnaryCallMethod, callOptions, request);
} else {
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channelToUse);
reply = stub.sayHello(request);
}
return reply.getMessage();
} catch (Exception e) {
Log.e(TAG, "RPC failed", e);
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
pw.flush();
return String.format("Failed... : %n%s", sw);
}
}
public static Object build(final Object channel) {
return ClientInterceptors.intercept((Channel)channel, TracingClientInterceptor.newBuilder().build());
}
/**
* Use this interceptor to trace all requests made by this client channel.
*
* @param channel to be traced
* @return intercepted channel
*/
public Channel intercept(Channel channel) {
return ClientInterceptors.intercept(channel, this);
}