下面列出了怎么用io.grpc.CallOptions的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void wrapChannel_methodDescriptor() throws Exception {
final AtomicReference<MethodDescriptor<?, ?>> methodRef =
new AtomicReference<>();
Channel channel = new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> method, CallOptions callOptions) {
methodRef.set(method);
return new NoopClientCall<>();
}
@Override
public String authority() {
throw new UnsupportedOperationException();
}
};
Channel wChannel = binlogProvider.wrapChannel(channel);
ClientCall<String, Integer> unusedClientCall = wChannel.newCall(method, CallOptions.DEFAULT);
validateWrappedMethod(methodRef.get());
}
@Test
public void reprocess_NoPendingStream() {
SubchannelPicker picker = mock(SubchannelPicker.class);
AbstractSubchannel subchannel = mock(AbstractSubchannel.class);
when(subchannel.obtainActiveTransport()).thenReturn(mockRealTransport);
when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel));
when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class),
any(CallOptions.class))).thenReturn(mockRealStream);
delayedTransport.reprocess(picker);
verifyNoMoreInteractions(picker);
verifyNoMoreInteractions(transportListener);
// Though picker was not originally used, it will be saved and serve future streams.
ClientStream stream = delayedTransport.newStream(method, headers, CallOptions.DEFAULT);
verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT));
verify(subchannel).obtainActiveTransport();
assertSame(mockRealStream, stream);
}
protected AbstractClientStream(
WritableBufferAllocator bufferAllocator,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer,
Metadata headers,
CallOptions callOptions,
boolean useGet) {
checkNotNull(headers, "headers");
this.transportTracer = checkNotNull(transportTracer, "transportTracer");
this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions);
this.useGet = useGet;
if (!useGet) {
framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
this.headers = headers;
} else {
framer = new GetFramer(headers, statsTraceCtx);
}
}
@Test
public void receiveLongEnoughDataWithoutHeaderAndTrailer() throws Exception {
initTransport();
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
stream.request(1);
Buffer buffer = createMessageFrame(new byte[1000]);
frameHandler().data(false, 3, buffer, (int) buffer.size());
// Once we receive enough detail, we cancel the stream. so we should have sent cancel.
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
listener.waitUntilStreamClosed();
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
assertTrue(listener.status.getDescription().startsWith("headers not received before payload"));
assertEquals(0, listener.messages.size());
shutdownAndVerify();
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
ClientInterceptor binlogInterceptor = getClientInterceptor(
method.getFullMethodName(), callOptions);
if (binlogInterceptor == null) {
return next.newCall(method, callOptions);
} else {
return InternalClientInterceptors
.wrapClientInterceptor(
binlogInterceptor,
BYTEARRAY_MARSHALLER,
BYTEARRAY_MARSHALLER)
.interceptCall(method, callOptions, next);
}
}
@Test
public void addDefaultUserAgent() throws Exception {
initTransport();
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
Header userAgentHeader = new Header(GrpcUtil.USER_AGENT_KEY.name(),
GrpcUtil.getGrpcUserAgent("okhttp", null));
List<Header> expectedHeaders = Arrays.asList(HTTP_SCHEME_HEADER, METHOD_HEADER,
new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"),
new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()),
userAgentHeader, CONTENT_TYPE_HEADER, TE_HEADER);
verify(frameWriter, timeout(TIME_OUT_MS))
.synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders));
getStream(3).cancel(Status.CANCELLED);
shutdownAndVerify();
}
@Test
public void cancelBeforeConnected() throws Exception {
initTransportAndDelayConnected();
final String message = "Hello Server";
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
InputStream input = new ByteArrayInputStream(message.getBytes(UTF_8));
stream.writeMessage(input);
stream.flush();
stream.cancel(Status.CANCELLED);
verifyNoMoreInteractions(frameWriter);
allowTransportConnected();
verifyNoMoreInteractions(frameWriter);
shutdownAndVerify();
}
@Test
public void blockingUnaryCall_HasBlockingStubType() {
NoopClientCall<Integer, Integer> call = new NoopClientCall<Integer, Integer>() {
@Override
public void start(io.grpc.ClientCall.Listener<Integer> listener, Metadata headers) {
listener.onMessage(1);
listener.onClose(Status.OK, new Metadata());
}
};
when(mockChannel.newCall(
ArgumentMatchers.<MethodDescriptor<Integer, Integer>>any(), any(CallOptions.class)))
.thenReturn(call);
Integer unused =
ClientCalls.blockingUnaryCall(mockChannel, UNARY_METHOD, CallOptions.DEFAULT, 1);
verify(mockChannel).newCall(methodDescriptorCaptor.capture(), callOptionsCaptor.capture());
CallOptions capturedCallOption = callOptionsCaptor.getValue();
assertThat(capturedCallOption.getOption(ClientCalls.STUB_TYPE_OPTION))
.isEquivalentAccordingToCompareTo(StubType.BLOCKING);
}
@Test
public void alwaysUsePutTrue_cronetStreamIsIdempotent() throws Exception {
CronetChannelBuilder builder =
CronetChannelBuilder.forAddress("address", 1234, mockEngine).alwaysUsePut(true);
CronetTransportFactory transportFactory =
(CronetTransportFactory) builder.buildTransportFactory();
CronetClientTransport transport =
(CronetClientTransport)
transportFactory.newClientTransport(
new InetSocketAddress("localhost", 443),
new ClientTransportOptions(),
channelLogger);
CronetClientStream stream = transport.newStream(method, new Metadata(), CallOptions.DEFAULT);
assertTrue(stream.idempotent);
}
@Test
public void fullMethodMatched() {
// Put in service that matches, but has no deadline. It should be lower priority
JsonObj name1 = new JsonObj("service", "service");
JsonObj methodConfig1 = new JsonObj("name", new JsonList(name1));
JsonObj name2 = new JsonObj("service", "service", "method", "method");
JsonObj methodConfig2 = new JsonObj("name", new JsonList(name2), "timeout", "1s");
JsonObj serviceConfig = new JsonObj("methodConfig", new JsonList(methodConfig1, methodConfig2));
interceptor.handleUpdate(serviceConfig);
interceptor.interceptCall(methodDescriptor, CallOptions.DEFAULT, channel);
verify(channel).newCall(eq(methodDescriptor), callOptionsCap.capture());
assertThat(callOptionsCap.getValue().getDeadline()).isNotNull();
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
// New RPCs on client-side inherit the tag context from the current Context.
TagContext parentCtx = tagger.getCurrentTagContext();
final ClientCallTracer tracerFactory =
newClientCallTracer(parentCtx, method.getFullMethodName());
ClientCall<ReqT, RespT> call =
next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
tracerFactory.callEnded(status);
super.onClose(status, trailers);
}
},
headers);
}
};
}
/**
* Test that even if an Error is thrown from the reading loop of the transport,
* it can still clean up and call transportShutdown() and transportTerminated() as expected
* by the channel.
*/
@Test
public void nextFrameThrowsError() throws Exception {
initTransport();
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
stream.request(1);
assertEquals(1, activeStreamCount());
assertContainStream(3);
frameReader.throwErrorForNextFrame();
listener.waitUntilStreamClosed();
assertEquals(0, activeStreamCount());
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
assertEquals(ERROR_MESSAGE, listener.status.getCause().getMessage());
verify(transportListener, timeout(TIME_OUT_MS)).transportShutdown(isA(Status.class));
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
shutdownAndVerify();
}
@Test
public void receiveDataWithoutHeaderAndTrailer() throws Exception {
initTransport();
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
stream.request(1);
Buffer buffer = createMessageFrame(new byte[1]);
frameHandler().data(false, 3, buffer, (int) buffer.size());
// Trigger the failure by a data frame.
buffer = createMessageFrame(new byte[1]);
frameHandler().data(true, 3, buffer, (int) buffer.size());
listener.waitUntilStreamClosed();
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
assertTrue(listener.status.getDescription().startsWith("headers not received before payload"));
assertEquals(0, listener.messages.size());
shutdownAndVerify();
}
/**
* Implements a bidirectional stream → stream call as {@link Flux} → {@link Flux}, where both the client
* and the server independently stream to each other.
*/
@SuppressWarnings("unchecked")
public static <TRequest, TResponse> Flux<TResponse> manyToMany(
Flux<TRequest> fluxSource,
Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate,
CallOptions options) {
try {
final int prefetch = ReactorCallOptions.getPrefetch(options);
final int lowTide = ReactorCallOptions.getLowTide(options);
ReactorSubscriberAndClientProducer<TRequest> subscriberAndGRPCProducer =
fluxSource.subscribeWith(new ReactorSubscriberAndClientProducer<>());
ReactorClientStreamObserverAndPublisher<TResponse> observerAndPublisher =
new ReactorClientStreamObserverAndPublisher<>(
s -> subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) s),
subscriberAndGRPCProducer::cancel, prefetch, lowTide
);
delegate.apply(observerAndPublisher);
return Flux.from(observerAndPublisher);
} catch (Throwable throwable) {
return Flux.error(throwable);
}
}
@Test
public void startAddsMaxSize() {
CallOptions callOptions =
baseCallOptions.withMaxInboundMessageSize(1).withMaxOutboundMessageSize(2);
ClientCallImpl<Void, Void> call = new ClientCallImpl<Void, Void>(
method,
new SerializingExecutor(Executors.newSingleThreadExecutor()),
callOptions,
provider,
deadlineCancellationExecutor,
channelCallTracer,
false /* retryEnabled */)
.setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata());
verify(stream).setMaxInboundMessageSize(1);
verify(stream).setMaxOutboundMessageSize(2);
}
@Test
public void receiveDataWithoutHeader() throws Exception {
initTransport();
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
stream.request(1);
Buffer buffer = createMessageFrame(new byte[1]);
frameHandler().data(false, 3, buffer, (int) buffer.size());
// Trigger the failure by a trailer.
frameHandler().headers(
true, true, 3, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
listener.waitUntilStreamClosed();
assertEquals(Status.INTERNAL.getCode(), listener.status.getCode());
assertTrue(listener.status.getDescription().startsWith("headers not received before payload"));
assertEquals(0, listener.messages.size());
shutdownAndVerify();
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
protected Listener<RespT> delegate() {
callTime = System.nanoTime();
return super.delegate();
}
}, headers);
}
};
}
private static ClientInterceptor metadataInterceptor(Map<String, Object> metaDataMap) {
return new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, final Channel next) {
return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
protected void checkedStart(Listener<RespT> responseListener, Metadata headers) {
metaDataMap.forEach((k, v) -> {
Key<String> mKey = Key.of(k, ASCII_STRING_MARSHALLER);
headers.put(mKey, String.valueOf(v));
});
delegate().start(responseListener, headers);
}
};
}
};
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT,RespT> method, CallOptions callOptions, Channel next) {
LOGGER.info("Intercepted " + method.getFullMethodName());
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
call = new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
if (apiKey != null && !apiKey.isEmpty()) {
LOGGER.info("Attaching API Key: " + apiKey);
headers.put(API_KEY_HEADER, apiKey);
}
super.start(responseListener, headers);
}
};
return call;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
ClientInterceptor binlogInterceptor = getClientInterceptor(
method.getFullMethodName(), callOptions);
if (binlogInterceptor == null) {
return next.newCall(method, callOptions);
} else {
return InternalClientInterceptors
.wrapClientInterceptor(
binlogInterceptor,
BYTEARRAY_MARSHALLER,
BYTEARRAY_MARSHALLER)
.interceptCall(method, callOptions, next);
}
}
/**
* The {@link StackdriverSender} bean.
* @param cloudConfiguration The google cloud configuration
* @param credentials The credentials
* @param channel The channel to use
* @return The sender
*/
@RequiresGoogleProjectId
@Requires(classes = StackdriverSender.class)
@Singleton
protected @Nonnull Sender stackdriverSender(
@Nonnull GoogleCloudConfiguration cloudConfiguration,
@Nonnull GoogleCredentials credentials,
@Nonnull @Named("stackdriverTraceSenderChannel") ManagedChannel channel) {
GoogleCredentials traceCredentials = credentials.createScoped(Arrays.asList(TRACE_SCOPE.toString()));
return StackdriverSender.newBuilder(channel)
.projectId(cloudConfiguration.getProjectId())
.callOptions(CallOptions.DEFAULT
.withCallCredentials(MoreCallCredentials.from(traceCredentials)))
.build();
}
NettyClientStream(
TransportState state,
MethodDescriptor<?, ?> method,
Metadata headers,
Channel channel,
AsciiString authority,
AsciiString scheme,
AsciiString userAgent,
StatsTraceContext statsTraceCtx,
TransportTracer transportTracer,
CallOptions callOptions,
boolean useGetForSafeMethods) {
super(
new NettyWritableBufferAllocator(channel.alloc()),
statsTraceCtx,
transportTracer,
headers,
callOptions,
useGetForSafeMethods && method.isSafe());
this.state = checkNotNull(state, "transportState");
this.writeQueue = state.handler.getWriteQueue();
this.method = checkNotNull(method, "method");
this.authority = checkNotNull(authority, "authority");
this.scheme = checkNotNull(scheme, "scheme");
this.userAgent = userAgent;
}
@Test
public void withMaxRequestSize_pickSmallerNew() {
JsonObj name = new JsonObj("service", "service");
JsonObj methodConfig = new JsonObj("name", new JsonList(name), "maxRequestMessageBytes", 5d);
JsonObj serviceConfig = new JsonObj("methodConfig", new JsonList(methodConfig));
ManagedChannelServiceConfig parsedServiceConfig =
createManagedChannelServiceConfig(serviceConfig);
interceptor.handleUpdate(parsedServiceConfig);
interceptor.interceptCall(
methodDescriptor, CallOptions.DEFAULT.withMaxOutboundMessageSize(10), channel);
verify(channel).newCall(eq(methodDescriptor), callOptionsCap.capture());
assertThat(callOptionsCap.getValue().getMaxOutboundMessageSize()).isEqualTo(5);
}
@Test
public void shutdownDuringConnecting() throws Exception {
initTransportAndDelayConnected();
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
clientTransport.shutdown(SHUTDOWN_REASON);
allowTransportConnected();
// The new stream should be failed, but not the pending stream.
assertNewStreamFail();
verify(frameWriter, timeout(TIME_OUT_MS))
.synStream(anyBoolean(), anyBoolean(), eq(3), anyInt(), anyListHeader());
assertEquals(1, activeStreamCount());
stream.cancel(Status.CANCELLED);
listener.waitUntilStreamClosed();
assertEquals(Status.CANCELLED.getCode(), listener.status.getCode());
shutdownAndVerify();
}
@Test
public void start_userAgentRemoved() {
Metadata metaData = new Metadata();
metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application");
stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, INITIAL_WINDOW_SIZE, "localhost",
"good-application", StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT);
stream.start(new BaseClientStreamListener());
stream.transportState().start(3);
verify(frameWriter).synStream(eq(false), eq(false), eq(3), eq(0), headersCaptor.capture());
assertThat(headersCaptor.getValue())
.contains(new Header(GrpcUtil.USER_AGENT_KEY.name(), "good-application"));
}
@Test
public void cancelStreamForDeadlineExceeded() throws Exception {
initTransport();
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
getStream(3).cancel(Status.DEADLINE_EXCEEDED);
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(3), eq(ErrorCode.CANCEL));
listener.waitUntilStreamClosed();
shutdownAndVerify();
}
protected ClientInterceptor getClientInterceptor() {
return new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new TestClientCallImpl<ReqT, RespT>(next.newCall(method, callOptions), exeptionMethod);
}
};
}
@Test
public void newCall() {
NoopClientCall<Void, Void> clientCall = new NoopClientCall<Void, Void>();
CallOptions callOptions = CallOptions.DEFAULT.withoutWaitForReady();
MethodDescriptor<Void, Void> method = TestMethodDescriptors.voidMethod();
when(mock.newCall(same(method), same(callOptions))).thenReturn(clientCall);
assertSame(clientCall, forward.newCall(method, callOptions));
}
@Test
public void withAnnotation() {
Object annotation1 = new Object();
Object annotation2 = new Object();
CallOptions callOptions = CronetCallOptions.withAnnotation(CallOptions.DEFAULT, annotation1);
callOptions = CronetCallOptions.withAnnotation(callOptions, annotation2);
SetStreamFactoryRunnable callback = new SetStreamFactoryRunnable(factory);
CronetClientStream stream =
new CronetClientStream(
"https://www.google.com:443",
"cronet",
executor,
metadata,
transport,
callback,
lock,
100,
false /* alwaysUsePut */,
method,
StatsTraceContext.NOOP,
callOptions,
transportTracer);
callback.setStream(stream);
when(factory.newBidirectionalStreamBuilder(
any(String.class), any(BidirectionalStream.Callback.class), any(Executor.class)))
.thenReturn(builder);
stream.start(clientListener);
verify(builder).addRequestAnnotation(annotation1);
verify(builder).addRequestAnnotation(annotation2);
}
private ListenableFuture<AuthenticateResponse> authenticate() {
AuthenticateRequest request = AuthenticateRequest.newBuilder()
.setNameBytes(name).setPasswordBytes(password).build();
// no call creds for auth call
CallOptions callOpts = CallOptions.DEFAULT;
return Futures.catchingAsync(
grpc.fuCall(METHOD_AUTHENTICATE, request, callOpts, 0L),
Exception.class, ex -> !retryAuthRequest(ex)
? Futures.immediateFailedFuture(ex)
: grpc.fuCall(METHOD_AUTHENTICATE, request, callOpts, 0L),
directExecutor());
}