下面列出了怎么用io.grpc.ClientStreamTracer的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
ClientTracer tracer = new ClientTracer();
// TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than
// one streams. We will need to update this file to support them.
if (streamTracerUpdater != null) {
checkState(
streamTracerUpdater.compareAndSet(this, null, tracer),
"Are you creating multiple streams per call? This class doesn't yet support this case");
} else {
checkState(
streamTracer == null,
"Are you creating multiple streams per call? This class doesn't yet support this case");
streamTracer = tracer;
}
if (module.propagateTags) {
headers.discardAll(module.statsHeader);
if (!module.tagger.empty().equals(parentCtx)) {
headers.put(module.statsHeader, parentCtx);
}
}
return tracer;
}
private Substream createSubstream(int previousAttempts) {
Substream sub = new Substream(previousAttempts);
// one tracer per substream
final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
@Override
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
return bufferSizeTracer;
}
};
Metadata newHeaders = updateHeaders(headers, previousAttempts);
// NOTICE: This set _must_ be done before stream.start() and it actually is.
sub.stream = newSubstream(tracerFactory, newHeaders);
return sub;
}
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
Attributes transportAttrs = checkNotNull(info.getTransportAttrs(), "transportAttrs");
Attributes eagAttrs =
checkNotNull(transportAttrs.get(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS), "eagAttrs");
String token = eagAttrs.get(GrpclbConstants.TOKEN_ATTRIBUTE_KEY);
headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
if (token != null) {
headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
}
if (delegate != null) {
return delegate.newClientStreamTracer(info, headers);
} else {
return NOOP_TRACER;
}
}
@Test
public void hasToken() {
TokenAttachingTracerFactory factory = new TokenAttachingTracerFactory(delegate);
Attributes eagAttrs = Attributes.newBuilder()
.set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, "token0001").build();
ClientStreamTracer.StreamInfo info = ClientStreamTracer.StreamInfo.newBuilder()
.setTransportAttrs(
Attributes.newBuilder().set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs).build())
.build();
Metadata headers = new Metadata();
// Preexisting token should be replaced
headers.put(GrpclbConstants.TOKEN_METADATA_KEY, "preexisting-token");
ClientStreamTracer tracer = factory.newClientStreamTracer(info, headers);
verify(delegate).newClientStreamTracer(same(info), same(headers));
assertThat(tracer).isSameInstanceAs(fakeTracer);
assertThat(headers.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("token0001");
}
@Test
public void noToken() {
TokenAttachingTracerFactory factory = new TokenAttachingTracerFactory(delegate);
ClientStreamTracer.StreamInfo info = ClientStreamTracer.StreamInfo.newBuilder()
.setTransportAttrs(
Attributes.newBuilder()
.set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, Attributes.EMPTY).build())
.build();
Metadata headers = new Metadata();
// Preexisting token should be removed
headers.put(GrpclbConstants.TOKEN_METADATA_KEY, "preexisting-token");
ClientStreamTracer tracer = factory.newClientStreamTracer(info, headers);
verify(delegate).newClientStreamTracer(same(info), same(headers));
assertThat(tracer).isSameInstanceAs(fakeTracer);
assertThat(headers.get(GrpclbConstants.TOKEN_METADATA_KEY)).isNull();
}
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
counter.recordCallStarted();
final ClientStreamTracer delegateTracer = delegate.newClientStreamTracer(info, headers);
return new ForwardingClientStreamTracer() {
@Override
protected ClientStreamTracer delegate() {
return delegateTracer;
}
@Override
public void streamClosed(Status status) {
counter.recordCallFinished(status);
delegate().streamClosed(status);
}
};
}
@Test
public void subchannelPickerInterceptedWithLoadRecording() {
List<EquivalentAddressGroup> backendAddrs = createResolvedBackendAddresses(2);
deliverResolvedAddresses(backendAddrs, "round_robin");
FakeLoadBalancer childBalancer = (FakeLoadBalancer) childBalancers.poll();
NoopSubchannel subchannel = childBalancer.subchannels.values().iterator().next();
deliverSubchannelState(subchannel, ConnectivityState.READY);
assertThat(loadRecorder.recording).isTrue();
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
assertThat(picker).isInstanceOf(LoadRecordingSubchannelPicker.class);
PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
ClientStreamTracer.Factory tracerFactory = result.getStreamTracerFactory();
assertThat(((LoadRecordingStreamTracerFactory) tracerFactory).getCounter())
.isSameInstanceAs(counter);
loadBalancer.shutdown();
assertThat(childBalancer.shutdown).isTrue();
assertThat(loadRecorder.recording).isFalse();
}
/**
* Tests the case when parent policy creates its own {@link ClientStreamTracer.Factory}, ORCA
* reports are only forwarded to the parent's listener.
*/
@Test
public void onlyParentPolicyReceivesReportsIfCreatesOwnTracer() {
ClientStreamTracer.Factory parentFactory =
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(orcaListener1);
ClientStreamTracer.Factory childFactory =
mock(ClientStreamTracer.Factory.class,
delegatesTo(OrcaPerRequestUtil.getInstance()
.newOrcaClientStreamTracerFactory(parentFactory, orcaListener2)));
ClientStreamTracer parentTracer =
parentFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
Metadata trailer = new Metadata();
trailer.put(
OrcaReportingTracerFactory.ORCA_ENDPOINT_LOAD_METRICS_KEY,
OrcaLoadReport.getDefaultInstance());
parentTracer.inboundTrailers(trailer);
verify(orcaListener1).onLoadReport(eq(OrcaLoadReport.getDefaultInstance()));
verifyZeroInteractions(childFactory);
verifyZeroInteractions(orcaListener2);
}
@Test
public void loadRecordingStreamTracerFactory_clientSideQueryCountsAggregation() {
LoadRecordingStreamTracerFactory factory1 =
new LoadRecordingStreamTracerFactory(counter, NOOP_CLIENT_STREAM_TRACER_FACTORY);
ClientStreamTracer tracer = factory1.newClientStreamTracer(STREAM_INFO, new Metadata());
ClientLoadSnapshot snapshot = counter.snapshot();
assertQueryCounts(snapshot, 0, 1, 0, 1);
tracer.streamClosed(Status.OK);
snapshot = counter.snapshot();
assertQueryCounts(snapshot, 1, 0, 0, 0);
// Create a second LoadRecordingStreamTracerFactory with the same counter, stats are aggregated
// together.
LoadRecordingStreamTracerFactory factory2 =
new LoadRecordingStreamTracerFactory(counter, NOOP_CLIENT_STREAM_TRACER_FACTORY);
factory1.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.ABORTED);
factory2.newClientStreamTracer(STREAM_INFO, new Metadata()).streamClosed(Status.CANCELLED);
snapshot = counter.snapshot();
assertQueryCounts(snapshot, 0, 0, 2, 2);
}
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
ClientTracer tracer = new ClientTracer(module, startCtx);
// TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than
// one streams. We will need to update this file to support them.
if (streamTracerUpdater != null) {
checkState(
streamTracerUpdater.compareAndSet(this, null, tracer),
"Are you creating multiple streams per call? This class doesn't yet support this case");
} else {
checkState(
streamTracer == null,
"Are you creating multiple streams per call? This class doesn't yet support this case");
streamTracer = tracer;
}
if (module.propagateTags) {
headers.discardAll(module.statsHeader);
if (!module.tagger.empty().equals(parentCtx)) {
headers.put(module.statsHeader, parentCtx);
}
}
return tracer;
}
/**
* Factory method for the client-side.
*/
public static StatsTraceContext newClientContext(
final CallOptions callOptions, final Attributes transportAttrs, Metadata headers) {
List<ClientStreamTracer.Factory> factories = callOptions.getStreamTracerFactories();
if (factories.isEmpty()) {
return NOOP;
}
ClientStreamTracer.StreamInfo info =
ClientStreamTracer.StreamInfo.newBuilder()
.setTransportAttrs(transportAttrs).setCallOptions(callOptions).build();
// This array will be iterated multiple times per RPC. Use primitive array instead of Collection
// so that for-each doesn't create an Iterator every time.
StreamTracer[] tracers = new StreamTracer[factories.size()];
for (int i = 0; i < tracers.length; i++) {
tracers[i] = factories.get(i).newClientStreamTracer(info, headers);
}
return new StatsTraceContext(tracers);
}
private Substream createSubstream(int previousAttemptCount) {
Substream sub = new Substream(previousAttemptCount);
// one tracer per substream
final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
return bufferSizeTracer;
}
};
Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
// NOTICE: This set _must_ be done before stream.start() and it actually is.
sub.stream = newSubstream(tracerFactory, newHeaders);
return sub;
}
@Test
public void newStream_afterTermination() throws Exception {
// We expect the same general behavior as duringShutdown, but for some transports (e.g., Netty)
// dealing with afterTermination is harder than duringShutdown.
server.start(serverListener);
client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called");
client.shutdown(shutdownReason);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
Thread.sleep(100);
ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
stream.start(clientStreamListener);
assertEquals(
shutdownReason, clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
verify(clientStreamTracerFactory).newClientStreamTracer(
any(ClientStreamTracer.StreamInfo.class), any(Metadata.class));
assertNull(clientStreamTracer1.getInboundTrailers());
assertSame(shutdownReason, clientStreamTracer1.getStatus());
// Assert no interactions
assertNull(serverStreamTracer1.getServerCallInfo());
}
/**
* Factory method for the client-side.
*/
public static StatsTraceContext newClientContext(CallOptions callOptions, Metadata headers) {
List<ClientStreamTracer.Factory> factories = callOptions.getStreamTracerFactories();
if (factories.isEmpty()) {
return NOOP;
}
// This array will be iterated multiple times per RPC. Use primitive array instead of Collection
// so that for-each doesn't create an Iterator every time.
StreamTracer[] tracers = new StreamTracer[factories.size()];
for (int i = 0; i < tracers.length; i++) {
tracers[i] = factories.get(i).newClientStreamTracer(callOptions, headers);
}
return new StatsTraceContext(tracers);
}
@Override
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
if (span != BlankSpan.INSTANCE) {
headers.discardAll(tracingHeader);
headers.put(tracingHeader, span.getContext());
}
return new ClientTracer(span);
}
@Override
ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata metadata) {
bufferSizeTracer =
tracerFactory.newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null
? 0 : Integer.valueOf(metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS));
return retriableStreamRecorder.newSubstream(actualPreviousRpcAttemptsInHeader);
}
@Test
public void clientStreamTracerTransfers() {
ClientStreamTracer.Factory factory1 = new ClientStreamTracer.Factory() {};
ClientStreamTracer.Factory factory2 = new ClientStreamTracer.Factory() {};
CallOptions baseOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
CallOptions defaultOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory2);
DefaultCallOptionsClientInterceptor interceptor = new DefaultCallOptionsClientInterceptor(defaultOptions);
CallOptions patchedOptions = interceptor.patchOptions(baseOptions);
assertThat(patchedOptions.getStreamTracerFactories()).containsExactly(factory1, factory2);
}
@Test
public void nullDelegate() {
TokenAttachingTracerFactory factory = new TokenAttachingTracerFactory(null);
ClientStreamTracer.StreamInfo info = ClientStreamTracer.StreamInfo.newBuilder()
.setTransportAttrs(
Attributes.newBuilder()
.set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, Attributes.EMPTY).build())
.build();
Metadata headers = new Metadata();
ClientStreamTracer tracer = factory.newClientStreamTracer(info, headers);
assertThat(tracer).isNotNull();
assertThat(headers.get(GrpclbConstants.TOKEN_METADATA_KEY)).isNull();
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
PickResult result = delegate().pickSubchannel(args);
if (!result.getStatus().isOk()) {
return result;
}
if (result.getSubchannel() == null) {
return result;
}
ClientStreamTracer.Factory originFactory = result.getStreamTracerFactory();
if (originFactory == null) {
originFactory = NOOP_CLIENT_STREAM_TRACER_FACTORY;
}
return PickResult.withSubchannel(result.getSubchannel(), wrapTracerFactory(originFactory));
}
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
OrcaReportBroker broker = info.getCallOptions().getOption(ORCA_REPORT_BROKER_KEY);
boolean augmented = false;
if (broker == null) {
broker = new OrcaReportBroker();
info =
info.toBuilder()
.setCallOptions(info.getCallOptions().withOption(ORCA_REPORT_BROKER_KEY, broker))
.build();
augmented = true;
}
broker.addListener(listener);
ClientStreamTracer tracer = delegate.newClientStreamTracer(info, headers);
if (augmented) {
final ClientStreamTracer currTracer = tracer;
final OrcaReportBroker currBroker = broker;
// The actual tracer that performs ORCA report deserialization.
tracer =
new ForwardingClientStreamTracer() {
@Override
protected ClientStreamTracer delegate() {
return currTracer;
}
@Override
public void inboundTrailers(Metadata trailers) {
OrcaLoadReport report = trailers.get(ORCA_ENDPOINT_LOAD_METRICS_KEY);
if (report != null) {
currBroker.onReport(report);
}
delegate().inboundTrailers(trailers);
}
};
}
return tracer;
}
/**
* Tests a single load balance policy's listener receive per-request ORCA reports upon call
* trailer arrives.
*/
@Test
public void singlePolicyTypicalWorkflow() {
// Use a mocked noop stream tracer factory as the original stream tracer factory.
ClientStreamTracer.Factory fakeDelegateFactory = mock(ClientStreamTracer.Factory.class);
ClientStreamTracer fakeTracer = mock(ClientStreamTracer.class);
doNothing().when(fakeTracer).inboundTrailers(any(Metadata.class));
when(fakeDelegateFactory.newClientStreamTracer(
any(ClientStreamTracer.StreamInfo.class), any(Metadata.class)))
.thenReturn(fakeTracer);
// The OrcaReportingTracerFactory will augment the StreamInfo passed to its
// newClientStreamTracer method. The augmented StreamInfo's CallOptions will contain
// a OrcaReportBroker, in which has the registered listener.
ClientStreamTracer.Factory factory =
OrcaPerRequestUtil.getInstance()
.newOrcaClientStreamTracerFactory(fakeDelegateFactory, orcaListener1);
ClientStreamTracer tracer = factory.newClientStreamTracer(STREAM_INFO, new Metadata());
ArgumentCaptor<ClientStreamTracer.StreamInfo> streamInfoCaptor = ArgumentCaptor.forClass(null);
verify(fakeDelegateFactory)
.newClientStreamTracer(streamInfoCaptor.capture(), any(Metadata.class));
ClientStreamTracer.StreamInfo capturedInfo = streamInfoCaptor.getValue();
assertThat(capturedInfo).isNotEqualTo(STREAM_INFO);
// When the trailer does not contain ORCA report, listener callback will not be invoked.
Metadata trailer = new Metadata();
tracer.inboundTrailers(trailer);
verifyNoMoreInteractions(orcaListener1);
// When the trailer contains an ORCA report, listener callback will be invoked.
trailer.put(
OrcaReportingTracerFactory.ORCA_ENDPOINT_LOAD_METRICS_KEY,
OrcaLoadReport.getDefaultInstance());
tracer.inboundTrailers(trailer);
ArgumentCaptor<OrcaLoadReport> reportCaptor = ArgumentCaptor.forClass(null);
verify(orcaListener1).onLoadReport(reportCaptor.capture());
assertThat(reportCaptor.getValue()).isEqualTo(OrcaLoadReport.getDefaultInstance());
}
/**
* Tests parent-child load balance policies' listeners both receive per-request ORCA reports upon
* call trailer arrives and ORCA report deserialization happens only once.
*/
@Test
public void twoLevelPoliciesTypicalWorkflow() {
ClientStreamTracer.Factory parentFactory =
mock(ClientStreamTracer.Factory.class,
delegatesTo(
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(orcaListener1)));
ClientStreamTracer.Factory childFactory =
OrcaPerRequestUtil.getInstance()
.newOrcaClientStreamTracerFactory(parentFactory, orcaListener2);
// Child factory will augment the StreamInfo and pass it to the parent factory.
ClientStreamTracer childTracer =
childFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
ArgumentCaptor<ClientStreamTracer.StreamInfo> streamInfoCaptor = ArgumentCaptor.forClass(null);
verify(parentFactory).newClientStreamTracer(streamInfoCaptor.capture(), any(Metadata.class));
ClientStreamTracer.StreamInfo parentStreamInfo = streamInfoCaptor.getValue();
assertThat(parentStreamInfo).isNotEqualTo(STREAM_INFO);
// When the trailer does not contain ORCA report, no listener callback will be invoked.
Metadata trailer = new Metadata();
childTracer.inboundTrailers(trailer);
verifyNoMoreInteractions(orcaListener1);
verifyNoMoreInteractions(orcaListener2);
// When the trailer contains an ORCA report, callbacks for both listeners will be invoked.
// Both listener will receive the same ORCA report instance, which means deserialization
// happens only once.
trailer.put(
OrcaReportingTracerFactory.ORCA_ENDPOINT_LOAD_METRICS_KEY,
OrcaLoadReport.getDefaultInstance());
childTracer.inboundTrailers(trailer);
ArgumentCaptor<OrcaLoadReport> parentReportCap = ArgumentCaptor.forClass(null);
ArgumentCaptor<OrcaLoadReport> childReportCap = ArgumentCaptor.forClass(null);
verify(orcaListener1).onLoadReport(parentReportCap.capture());
verify(orcaListener2).onLoadReport(childReportCap.capture());
assertThat(parentReportCap.getValue()).isEqualTo(OrcaLoadReport.getDefaultInstance());
assertThat(childReportCap.getValue()).isSameInstanceAs(parentReportCap.getValue());
}
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
TestClientStreamTracer tracer = new TestClientStreamTracer();
clientStreamTracers.add(tracer);
return tracer;
}
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
if (span != BlankSpan.INSTANCE) {
headers.discardAll(tracingHeader);
headers.put(tracingHeader, span.getContext());
}
return new ClientTracer(span);
}
@Test
public void pickerReturnsStreamTracer_noDelay() {
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
createChannel();
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
when(mockTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel, factory2));
updateBalancingStateSafely(helper, READY, mockPicker);
CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
call.start(mockCallListener, new Metadata());
verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
assertEquals(
Arrays.asList(factory1, factory2),
callOptionsCaptor.getValue().getStreamTracerFactories());
// The factories are safely not stubbed because we do not expect any usage of them.
verifyZeroInteractions(factory1);
verifyZeroInteractions(factory2);
}
@Test
public void pickerReturnsStreamTracer_delayed() {
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
createChannel();
CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
call.start(mockCallListener, new Metadata());
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
when(mockTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel, factory2));
updateBalancingStateSafely(helper, READY, mockPicker);
assertEquals(1, executor.runDueTasks());
verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
assertEquals(
Arrays.asList(factory1, factory2),
callOptionsCaptor.getValue().getStreamTracerFactories());
// The factories are safely not stubbed because we do not expect any usage of them.
verifyZeroInteractions(factory1);
verifyZeroInteractions(factory2);
}
@Override
ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata metadata) {
bufferSizeTracer =
tracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null
? 0 : Integer.valueOf(metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS));
return retriableStreamRecorder.newSubstream(actualPreviousRpcAttemptsInHeader);
}
@Test
public void hedging_perRpcBufferLimitExceeded() {
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
hedgingStream.start(masterListener);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer;
bufferSizeTracer1.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream2).start(sublistenerCaptor2.capture());
ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer;
bufferSizeTracer2.outboundWireSize(PER_RPC_BUFFER_LIMIT - 1);
verify(retriableStreamRecorder, never()).postCommit();
// bufferLimitExceeded
bufferSizeTracer2.outboundWireSize(2);
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(mockStream1).cancel(statusCaptor.capture());
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
verify(retriableStreamRecorder).postCommit();
verifyNoMoreInteractions(mockStream1);
verifyNoMoreInteractions(mockStream2);
}
@Test
public void hedging_channelBufferLimitExceeded() {
ClientStream mockStream1 = mock(ClientStream.class);
ClientStream mockStream2 = mock(ClientStream.class);
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
doReturn(mockStream2).when(retriableStreamRecorder).newSubstream(1);
hedgingStream.start(masterListener);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream1).start(sublistenerCaptor1.capture());
ClientStreamTracer bufferSizeTracer1 = bufferSizeTracer;
bufferSizeTracer1.outboundWireSize(100);
fakeClock.forwardTime(HEDGING_DELAY_IN_SECONDS, TimeUnit.SECONDS);
ArgumentCaptor<ClientStreamListener> sublistenerCaptor2 =
ArgumentCaptor.forClass(ClientStreamListener.class);
verify(mockStream2).start(sublistenerCaptor2.capture());
ClientStreamTracer bufferSizeTracer2 = bufferSizeTracer;
bufferSizeTracer2.outboundWireSize(100);
verify(retriableStreamRecorder, never()).postCommit();
// channel bufferLimitExceeded
channelBufferUsed.addAndGet(CHANNEL_BUFFER_LIMIT - 200);
bufferSizeTracer2.outboundWireSize(101);
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(mockStream1).cancel(statusCaptor.capture());
assertEquals(Status.CANCELLED.getCode(), statusCaptor.getValue().getCode());
assertEquals(CANCELLED_BECAUSE_COMMITTED, statusCaptor.getValue().getDescription());
verify(retriableStreamRecorder).postCommit();
verifyNoMoreInteractions(mockStream1);
verifyNoMoreInteractions(mockStream2);
// verify channel buffer is adjusted
assertEquals(CHANNEL_BUFFER_LIMIT - 200, channelBufferUsed.addAndGet(0));
}
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
metadata.put(tracerHeaderKey, tracerKeyValue);
TestClientStreamTracer tracer = tracers.poll();
if (tracer != null) {
return tracer;
}
return new TestClientStreamTracer();
}