下面列出了io.grpc.ClientStreamTracer#Factory ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Substream createSubstream(int previousAttemptCount) {
Substream sub = new Substream(previousAttemptCount);
// one tracer per substream
final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
return bufferSizeTracer;
}
};
Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
// NOTICE: This set _must_ be done before stream.start() and it actually is.
sub.stream = newSubstream(tracerFactory, newHeaders);
return sub;
}
private Substream createSubstream(int previousAttempts) {
Substream sub = new Substream(previousAttempts);
// one tracer per substream
final ClientStreamTracer bufferSizeTracer = new BufferSizeTracer(sub);
ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
@Override
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
return bufferSizeTracer;
}
};
Metadata newHeaders = updateHeaders(headers, previousAttempts);
// NOTICE: This set _must_ be done before stream.start() and it actually is.
sub.stream = newSubstream(tracerFactory, newHeaders);
return sub;
}
/**
* Tests the case when parent policy creates its own {@link ClientStreamTracer.Factory}, ORCA
* reports are only forwarded to the parent's listener.
*/
@Test
public void onlyParentPolicyReceivesReportsIfCreatesOwnTracer() {
ClientStreamTracer.Factory parentFactory =
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(orcaListener1);
ClientStreamTracer.Factory childFactory =
mock(ClientStreamTracer.Factory.class,
delegatesTo(OrcaPerRequestUtil.getInstance()
.newOrcaClientStreamTracerFactory(parentFactory, orcaListener2)));
ClientStreamTracer parentTracer =
parentFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
Metadata trailer = new Metadata();
trailer.put(
OrcaReportingTracerFactory.ORCA_ENDPOINT_LOAD_METRICS_KEY,
OrcaLoadReport.getDefaultInstance());
parentTracer.inboundTrailers(trailer);
verify(orcaListener1).onLoadReport(eq(OrcaLoadReport.getDefaultInstance()));
verifyZeroInteractions(childFactory);
verifyZeroInteractions(orcaListener2);
}
/**
* Factory method for the client-side.
*/
public static StatsTraceContext newClientContext(
final CallOptions callOptions, final Attributes transportAttrs, Metadata headers) {
List<ClientStreamTracer.Factory> factories = callOptions.getStreamTracerFactories();
if (factories.isEmpty()) {
return NOOP;
}
ClientStreamTracer.StreamInfo info =
ClientStreamTracer.StreamInfo.newBuilder()
.setTransportAttrs(transportAttrs).setCallOptions(callOptions).build();
// This array will be iterated multiple times per RPC. Use primitive array instead of Collection
// so that for-each doesn't create an Iterator every time.
StreamTracer[] tracers = new StreamTracer[factories.size()];
for (int i = 0; i < tracers.length; i++) {
tracers[i] = factories.get(i).newClientStreamTracer(info, headers);
}
return new StatsTraceContext(tracers);
}
@Test
public void subchannelPickerInterceptedWithLoadRecording() {
List<EquivalentAddressGroup> backendAddrs = createResolvedBackendAddresses(2);
deliverResolvedAddresses(backendAddrs, "round_robin");
FakeLoadBalancer childBalancer = (FakeLoadBalancer) childBalancers.poll();
NoopSubchannel subchannel = childBalancer.subchannels.values().iterator().next();
deliverSubchannelState(subchannel, ConnectivityState.READY);
assertThat(loadRecorder.recording).isTrue();
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
assertThat(picker).isInstanceOf(LoadRecordingSubchannelPicker.class);
PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
ClientStreamTracer.Factory tracerFactory = result.getStreamTracerFactory();
assertThat(((LoadRecordingStreamTracerFactory) tracerFactory).getCounter())
.isSameInstanceAs(counter);
loadBalancer.shutdown();
assertThat(childBalancer.shutdown).isTrue();
assertThat(loadRecorder.recording).isFalse();
}
@Test
public void pickerReturnsStreamTracer_delayed() {
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
createChannel();
CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
call.start(mockCallListener, new Metadata());
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
when(mockTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel, factory2));
updateBalancingStateSafely(helper, READY, mockPicker);
assertEquals(1, executor.runDueTasks());
verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
assertEquals(
Arrays.asList(factory1, factory2),
callOptionsCaptor.getValue().getStreamTracerFactories());
// The factories are safely not stubbed because we do not expect any usage of them.
verifyZeroInteractions(factory1);
verifyZeroInteractions(factory2);
}
@Override
ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata metadata) {
bufferSizeTracer =
tracerFactory.newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null
? 0 : Integer.valueOf(metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS));
return retriableStreamRecorder.newSubstream(actualPreviousRpcAttemptsInHeader);
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
PickResult result = delegate().pickSubchannel(args);
if (!result.getStatus().isOk()) {
return result;
}
if (result.getSubchannel() == null) {
return result;
}
ClientStreamTracer.Factory originFactory = result.getStreamTracerFactory();
if (originFactory == null) {
originFactory = NOOP_CLIENT_STREAM_TRACER_FACTORY;
}
return PickResult.withSubchannel(result.getSubchannel(), wrapTracerFactory(originFactory));
}
@Test
public void pickerReturnsStreamTracer_noDelay() {
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
createChannel();
Subchannel subchannel =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
when(mockTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel, factory2));
updateBalancingStateSafely(helper, READY, mockPicker);
CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
call.start(mockCallListener, new Metadata());
verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
assertEquals(
Arrays.asList(factory1, factory2),
callOptionsCaptor.getValue().getStreamTracerFactories());
// The factories are safely not stubbed because we do not expect any usage of them.
verifyZeroInteractions(factory1);
verifyZeroInteractions(factory2);
}
/**
* Tests a single load balance policy's listener receive per-request ORCA reports upon call
* trailer arrives.
*/
@Test
public void singlePolicyTypicalWorkflow() {
// Use a mocked noop stream tracer factory as the original stream tracer factory.
ClientStreamTracer.Factory fakeDelegateFactory = mock(ClientStreamTracer.Factory.class);
ClientStreamTracer fakeTracer = mock(ClientStreamTracer.class);
doNothing().when(fakeTracer).inboundTrailers(any(Metadata.class));
when(fakeDelegateFactory.newClientStreamTracer(
any(ClientStreamTracer.StreamInfo.class), any(Metadata.class)))
.thenReturn(fakeTracer);
// The OrcaReportingTracerFactory will augment the StreamInfo passed to its
// newClientStreamTracer method. The augmented StreamInfo's CallOptions will contain
// a OrcaReportBroker, in which has the registered listener.
ClientStreamTracer.Factory factory =
OrcaPerRequestUtil.getInstance()
.newOrcaClientStreamTracerFactory(fakeDelegateFactory, orcaListener1);
ClientStreamTracer tracer = factory.newClientStreamTracer(STREAM_INFO, new Metadata());
ArgumentCaptor<ClientStreamTracer.StreamInfo> streamInfoCaptor = ArgumentCaptor.forClass(null);
verify(fakeDelegateFactory)
.newClientStreamTracer(streamInfoCaptor.capture(), any(Metadata.class));
ClientStreamTracer.StreamInfo capturedInfo = streamInfoCaptor.getValue();
assertThat(capturedInfo).isNotEqualTo(STREAM_INFO);
// When the trailer does not contain ORCA report, listener callback will not be invoked.
Metadata trailer = new Metadata();
tracer.inboundTrailers(trailer);
verifyNoMoreInteractions(orcaListener1);
// When the trailer contains an ORCA report, listener callback will be invoked.
trailer.put(
OrcaReportingTracerFactory.ORCA_ENDPOINT_LOAD_METRICS_KEY,
OrcaLoadReport.getDefaultInstance());
tracer.inboundTrailers(trailer);
ArgumentCaptor<OrcaLoadReport> reportCaptor = ArgumentCaptor.forClass(null);
verify(orcaListener1).onLoadReport(reportCaptor.capture());
assertThat(reportCaptor.getValue()).isEqualTo(OrcaLoadReport.getDefaultInstance());
}
@Override
ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata metadata) {
bufferSizeTracer =
tracerFactory.newClientStreamTracer(STREAM_INFO, new Metadata());
int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null
? 0 : Integer.valueOf(metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS));
return retriableStreamRecorder.newSubstream(actualPreviousRpcAttemptsInHeader);
}
@Override
public <ReqT> RetriableStream<ReqT> newRetriableStream(
final MethodDescriptor<ReqT, ?> method,
final CallOptions callOptions,
final Metadata headers,
final Context context) {
checkState(retryEnabled, "retry should be enabled");
final class RetryStream extends RetriableStream<ReqT> {
RetryStream() {
super(
method,
headers,
channelBufferUsed,
perRpcBufferLimit,
channelBufferLimit,
getCallExecutor(callOptions),
transportFactory.getScheduledExecutorService(),
callOptions.getOption(RETRY_POLICY_KEY),
callOptions.getOption(HEDGING_POLICY_KEY),
throttle);
}
@Override
Status prestart() {
return uncommittedRetriableStreamsRegistry.add(this);
}
@Override
void postCommit() {
uncommittedRetriableStreamsRegistry.remove(this);
}
@Override
ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata newHeaders) {
CallOptions newOptions = callOptions.withStreamTracerFactory(tracerFactory);
ClientTransport transport =
get(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
Context origContext = context.attach();
try {
return transport.newStream(method, newHeaders, newOptions);
} finally {
context.detach(origContext);
}
}
}
return new RetryStream();
}
TokenAttachingTracerFactory(@Nullable ClientStreamTracer.Factory delegate) {
this.delegate = delegate;
}
LoadRecordingStreamTracerFactory(ClientLoadCounter counter,
ClientStreamTracer.Factory delegate) {
this.counter = checkNotNull(counter, "counter");
this.delegate = checkNotNull(delegate, "delegate");
}
private void channelsAndSubchannels_instrumented0(boolean success) throws Exception {
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
// Channel stat bumped when ClientCall.start() called
assertEquals(0, getStats(channel).callsStarted);
call.start(mockCallListener, new Metadata());
assertEquals(1, getStats(channel).callsStarted);
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory = mock(ClientStreamTracer.Factory.class);
AbstractSubchannel subchannel =
(AbstractSubchannel) createSubchannelSafely(
helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel);
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
when(mockTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel, factory));
// subchannel stat bumped when call gets assigned to it
assertEquals(0, getStats(subchannel).callsStarted);
updateBalancingStateSafely(helper, READY, mockPicker);
assertEquals(1, executor.runDueTasks());
verify(mockStream).start(streamListenerCaptor.capture());
assertEquals(1, getStats(subchannel).callsStarted);
ClientStreamListener streamListener = streamListenerCaptor.getValue();
call.halfClose();
// closing stream listener affects subchannel stats immediately
assertEquals(0, getStats(subchannel).callsSucceeded);
assertEquals(0, getStats(subchannel).callsFailed);
streamListener.closed(success ? Status.OK : Status.UNKNOWN, new Metadata());
if (success) {
assertEquals(1, getStats(subchannel).callsSucceeded);
assertEquals(0, getStats(subchannel).callsFailed);
} else {
assertEquals(0, getStats(subchannel).callsSucceeded);
assertEquals(1, getStats(subchannel).callsFailed);
}
// channel stats bumped when the ClientCall.Listener is notified
assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(0, getStats(channel).callsFailed);
executor.runDueTasks();
if (success) {
assertEquals(1, getStats(channel).callsSucceeded);
assertEquals(0, getStats(channel).callsFailed);
} else {
assertEquals(0, getStats(channel).callsSucceeded);
assertEquals(1, getStats(channel).callsFailed);
}
}
protected abstract ClientStreamTracer.Factory wrapTracerFactory(
ClientStreamTracer.Factory originFactory);
/**
* Returns a transport out of a PickResult, or {@code null} if the result is "buffer".
*/
@Nullable
static ClientTransport getTransportFromPickResult(PickResult result, boolean isWaitForReady) {
final ClientTransport transport;
Subchannel subchannel = result.getSubchannel();
if (subchannel != null) {
transport = ((TransportProvider) subchannel.getInternalSubchannel()).obtainActiveTransport();
} else {
transport = null;
}
if (transport != null) {
final ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory();
if (streamTracerFactory == null) {
return transport;
}
return new ClientTransport() {
@Override
public ClientStream newStream(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
return transport.newStream(
method, headers, callOptions.withStreamTracerFactory(streamTracerFactory));
}
@Override
public void ping(PingCallback callback, Executor executor) {
transport.ping(callback, executor);
}
@Override
public InternalLogId getLogId() {
return transport.getLogId();
}
@Override
public ListenableFuture<SocketStats> getStats() {
return transport.getStats();
}
};
}
if (!result.getStatus().isOk()) {
if (result.isDrop()) {
return new FailingClientTransport(result.getStatus(), RpcProgress.DROPPED);
}
if (!isWaitForReady) {
return new FailingClientTransport(result.getStatus(), RpcProgress.PROCESSED);
}
}
return null;
}
/**
* Creates a new physical ClientStream that represents a retry/hedging attempt. The returned
* Client stream is not yet started.
*/
abstract ClientStream newSubstream(
ClientStreamTracer.Factory tracerFactory, Metadata headers);
/**
* Creates a new physical ClientStream that represents a retry/hedging attempt. The returned
* Client stream is not yet started.
*/
abstract ClientStream newSubstream(
ClientStreamTracer.Factory tracerFactory, Metadata headers);
/**
* Creates a new {@link ClientStreamTracer.Factory} with provided {@link
* OrcaPerRequestReportListener} installed to receive callback when a per-request ORCA report is
* received.
*
* <p>Example usages:
*
* <ul>
* <li> Delegating policy (e.g., xDS)
* <pre>
* {@code
* class XdsPicker extends SubchannelPicker {
*
* public PickResult pickSubchannel(PickSubchannelArgs args) {
* SubchannelPicker perLocalityPicker = ... // locality picking logic
* Result result = perLocalityPicker.pickSubchannel(args);
* return PickResult.withSubchannel(
* result.getSubchannel(),
* OrcaPerRequestReportUtil.getInstance().newOrcaClientTracerFactory(
* result.getStreamTracerFactory(), listener));
*
* }
* }
* }
* </pre>
* </li>
* <li> Delegating policy with additional tracing logic
* <pre>
* {@code
* class WrappingPicker extends SubchannelPicker {
*
* public PickResult pickSubchannel(PickSubchannelArgs args) {
* Result result = delegate.pickSubchannel(args);
* return PickResult.withSubchannel(
* result.getSubchannel(),
* new ClientStreamTracer.Factory() {
* public ClientStreamTracer newClientStreamTracer(
* StreamInfo info, Metadata metadata) {
* ClientStreamTracer.Factory orcaTracerFactory =
* OrcaPerRequestReportUtil.getInstance().newOrcaClientStreamTracerFactory(
* result.getStreamTracerFactory(), listener);
*
* // Wrap the tracer from the delegate factory if you need to trace the
* // stream for your own.
* final ClientStreamTracer orcaTracer =
* orcaTracerFactory.newClientStreamTracer(info, metadata);
*
* return ForwardingClientStreamTracer() {
* protected ClientStreamTracer delegate() {
* return orcaTracer;
* }
*
* public void inboundMessage(int seqNo) {
* // Handle this event.
* ...
* }
* };
* }
* });
* }
* }
* }
* </pre>
* </li>
* </ul>
*
* @param delegate the delegate factory to produce other client stream tracing.
* @param listener contains the callback to be invoked when a per-request ORCA report is received.
*/
public abstract ClientStreamTracer.Factory newOrcaClientStreamTracerFactory(
ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener);