下面列出了怎么用io.grpc.LoadBalancer.PickSubchannelArgs的API类实例代码及写法,或者点击链接到github查看源代码。
@VisibleForTesting
void panic(final Throwable t) {
if (panicMode) {
// Preserve the first panic information
return;
}
panicMode = true;
cancelIdleTimer(/* permanent= */ true);
shutdownNameResolverAndLoadBalancer(false);
final class PanicSubchannelPicker extends SubchannelPicker {
private final PickResult panicPickResult =
PickResult.withDrop(
Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return panicPickResult;
}
}
updateSubchannelPicker(new PanicSubchannelPicker());
channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
channelStateManager.gotoState(TRANSIENT_FAILURE);
}
@Test
public void reprocess_NoPendingStream() {
SubchannelPicker picker = mock(SubchannelPicker.class);
AbstractSubchannel subchannel = mock(AbstractSubchannel.class);
when(subchannel.obtainActiveTransport()).thenReturn(mockRealTransport);
when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel));
when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class),
any(CallOptions.class))).thenReturn(mockRealStream);
delayedTransport.reprocess(picker);
verifyNoMoreInteractions(picker);
verifyNoMoreInteractions(transportListener);
// Though picker was not originally used, it will be saved and serve future streams.
ClientStream stream = delayedTransport.newStream(method, headers, CallOptions.DEFAULT);
verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT));
verify(subchannel).obtainActiveTransport();
assertSame(mockRealStream, stream);
}
@Test
public void roundRobinPickerWithIdleEntry_noDrop() {
Subchannel subchannel = mock(Subchannel.class);
IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext);
RoundRobinPicker picker =
new RoundRobinPicker(Collections.<DropEntry>emptyList(), Collections.singletonList(entry));
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
verify(subchannel, never()).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
verify(subchannel).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
// Only the first pick triggers requestConnection()
verify(subchannel).requestConnection();
}
@Test
public void newStream_racesWithReprocessIdleMode() throws Exception {
SubchannelPicker picker = new SubchannelPicker() {
@Override public PickResult pickSubchannel(PickSubchannelArgs args) {
// Assume entering idle mode raced with the pick
delayedTransport.reprocess(null);
// Act like IDLE LB
return PickResult.withNoResult();
}
};
// Because there is no pending stream yet, it will do nothing but save the picker.
delayedTransport.reprocess(picker);
ClientStream stream = delayedTransport.newStream(method, headers, callOptions);
stream.start(streamListener);
assertTrue(delayedTransport.hasPendingStreams());
verify(transportListener).transportInUse(true);
}
@Test
public void errorPropagation() {
loadBalancer.handleNameResolutionError(Status.UNKNOWN.withDescription("I failed"));
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
verify(helper)
.updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
Status status =
pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)).getStatus();
assertThat(status.getDescription()).contains("I failed");
List<EquivalentAddressGroup> backendAddrs = createResolvedBackendAddresses(2);
deliverResolvedAddresses(backendAddrs, "round_robin");
// Error after child policy is created.
loadBalancer.handleNameResolutionError(Status.UNKNOWN.withDescription("I failed"));
verify(helper, times(2))
.updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
status = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)).getStatus();
assertThat(status.getDescription()).contains("I failed");
assertThat(status.getDescription()).contains("handled by downstream balancer");
}
@Test
public void clusterWatcher_resourceNotExist() {
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(Attributes.newBuilder()
.set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
.build())
.setLoadBalancingPolicyConfig(new CdsConfig("foo.googleapis.com"))
.build();
cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);
ArgumentCaptor<ClusterWatcher> clusterWatcherCaptor = ArgumentCaptor.forClass(null);
verify(xdsClient).watchClusterData(eq("foo.googleapis.com"), clusterWatcherCaptor.capture());
ClusterWatcher clusterWatcher = clusterWatcherCaptor.getValue();
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
clusterWatcher.onResourceDoesNotExist("foo.googleapis.com");
assertThat(edsLoadBalancers).isEmpty();
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("Resource foo.googleapis.com is unavailable");
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
synchronized (pickList) {
// Two-level round-robin.
// First round-robin on dropList. If a drop entry is selected, request will be dropped. If
// a non-drop entry is selected, then round-robin on pickList. This makes sure requests are
// dropped at the same proportion as the drop entries appear on the round-robin list from
// the balancer, while only READY backends (that make up pickList) are selected for the
// non-drop cases.
if (!dropList.isEmpty()) {
DropEntry drop = dropList.get(dropIndex);
dropIndex++;
if (dropIndex == dropList.size()) {
dropIndex = 0;
}
if (drop != null) {
return drop.picked();
}
}
RoundRobinEntry pick = pickList.get(pickIndex);
pickIndex++;
if (pickIndex == pickList.size()) {
pickIndex = 0;
}
return pick.picked(args.getHeaders());
}
}
@Test
public void roundRobinPickerNoDrop() {
GrpclbClientLoadRecorder loadRecorder =
new GrpclbClientLoadRecorder(fakeClock.getTimeProvider());
Subchannel subchannel = mock(Subchannel.class);
BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001");
BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002");
List<BackendEntry> pickList = Arrays.asList(b1, b2);
RoundRobinPicker picker = new RoundRobinPicker(Collections.<DropEntry>emptyList(), pickList);
PickSubchannelArgs args1 = mock(PickSubchannelArgs.class);
Metadata headers1 = new Metadata();
// The existing token on the headers will be replaced
headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
when(args1.getHeaders()).thenReturn(headers1);
assertSame(b1.result, picker.pickSubchannel(args1));
verify(args1).getHeaders();
assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
PickSubchannelArgs args2 = mock(PickSubchannelArgs.class);
Metadata headers2 = new Metadata();
when(args2.getHeaders()).thenReturn(headers2);
assertSame(b2.result, picker.pickSubchannel(args2));
verify(args2).getHeaders();
assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
PickSubchannelArgs args3 = mock(PickSubchannelArgs.class);
Metadata headers3 = new Metadata();
when(args3.getHeaders()).thenReturn(headers3);
assertSame(b1.result, picker.pickSubchannel(args3));
verify(args3).getHeaders();
assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
verify(subchannel, never()).getAttributes();
}
@Test
public void abundantInitialResponse() {
Metadata headers = new Metadata();
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
when(args.getHeaders()).thenReturn(headers);
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
// Simulate LB initial response
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
lbResponseObserver.onNext(buildInitialResponse(1983));
// Load reporting task is scheduled
assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
FakeClock.ScheduledTask scheduledTask =
Iterables.getOnlyElement(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER));
assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
logs.clear();
// Simulate an abundant LB initial response, with a different report interval
lbResponseObserver.onNext(buildInitialResponse(9097));
// This incident is logged
assertThat(logs).containsExactly(
"DEBUG: Got an LB response: " + buildInitialResponse(9097),
"WARNING: Ignoring unexpected response type: INITIAL_RESPONSE").inOrder();
// It doesn't affect load-reporting at all
assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER))
.containsExactly(scheduledTask);
assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
}
/** Creates a {@link SubchannelPicker} that returns the given {@link Subchannel} on every pick. */
public static SubchannelPicker pickerOf(final Subchannel subchannel) {
return new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel);
}
};
}
@Override
public ClientTransport get(PickSubchannelArgs args) {
ClientTransport transport = subchannel.getTransport();
if (transport == null) {
return notReadyTransport;
} else {
return transport;
}
}
/**
* Caller must call {@code syncContext.drain()} outside of lock because this method may
* schedule tasks on syncContext.
*/
@GuardedBy("lock")
private PendingStream createPendingStream(PickSubchannelArgs args) {
PendingStream pendingStream = new PendingStream(args);
pendingStreams.add(pendingStream);
if (getPendingStreamsCount() == 1) {
syncContext.executeLater(reportTransportInUse);
}
return pendingStream;
}
@Override
public ClientTransport get(PickSubchannelArgs args) {
// delayed transport's newStream() always acquires a lock, but concurrent performance doesn't
// matter here because OOB communication should be sparse, and it's not on application RPC's
// critical path.
return delayedTransport;
}
@Test
public void panic_bufferedCallsWillFail() {
createChannel();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult());
updateBalancingStateSafely(helper, CONNECTING, mockPicker);
// Start RPCs that will be buffered in delayedTransport
ClientCall<String, Integer> call =
channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
call.start(mockCallListener, new Metadata());
ClientCall<String, Integer> call2 =
channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
call2.start(mockCallListener2, new Metadata());
executor.runDueTasks();
verifyZeroInteractions(mockCallListener, mockCallListener2);
// Enter panic
final Throwable panicReason = new Exception("Simulated uncaught exception");
channel.syncContext.execute(
new Runnable() {
@Override
public void run() {
channel.panic(panicReason);
}
});
// Buffered RPCs fail immediately
executor.runDueTasks();
verifyCallListenerClosed(mockCallListener, Status.Code.INTERNAL, panicReason);
verifyCallListenerClosed(mockCallListener2, Status.Code.INTERNAL, panicReason);
panicExpected = true;
}
@Override
public ClientTransport get(PickSubchannelArgs args) {
// delayed transport's newStream() always acquires a lock, but concurrent performance doesn't
// matter here because OOB communication should be sparse, and it's not on application RPC's
// critical path.
return delayedTransport;
}
@Test
public void updateBalancingState_withWrappedSubchannel() {
ClientStream mockStream = mock(ClientStream.class);
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
final Subchannel subchannel1 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel1);
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
transportListener.transportReady();
Subchannel wrappedSubchannel1 = new ForwardingSubchannel() {
@Override
protected Subchannel delegate() {
return subchannel1;
}
};
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(wrappedSubchannel1));
updateBalancingStateSafely(helper, READY, mockPicker);
executor.runDueTasks();
verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
verify(mockStream).start(any(ClientStreamListener.class));
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
Subchannel subchannel = null;
if (stickinessState != null) {
String stickinessValue = args.getHeaders().get(stickinessState.key);
if (stickinessValue != null) {
subchannel = stickinessState.getSubchannel(stickinessValue);
if (subchannel == null || !RoundRobinLoadBalancer.isReady(subchannel)) {
subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel());
}
}
}
return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel());
}
@Test
public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() {
Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
// Name resolution is started as soon as channel is created.
createChannel();
verify(mockLoadBalancer).handleNameResolutionError(same(error));
FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
assertNotNull(nameResolverBackoff);
assertFalse(nameResolverBackoff.isCancelled());
// Add a pending call to the delayed transport
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
Metadata headers = new Metadata();
call.start(mockCallListener, headers);
// The pending call on the delayed transport stops the name resolver backoff from cancelling
channel.shutdown();
assertFalse(nameResolverBackoff.isCancelled());
// Notify that a subchannel is ready, which drains the delayed transport
SubchannelPicker picker = mock(SubchannelPicker.class);
Status status = Status.UNAVAILABLE.withDescription("for test");
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withDrop(status));
helper.updateBalancingState(READY, picker);
executor.runDueTasks();
verify(mockCallListener).onClose(same(status), any(Metadata.class));
assertTrue(nameResolverBackoff.isCancelled());
}
private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) {
createChannel();
// This call will be buffered by the channel, thus involve delayed transport
CallOptions callOptions = CallOptions.DEFAULT;
if (waitForReady) {
callOptions = callOptions.withWaitForReady();
} else {
callOptions = callOptions.withoutWaitForReady();
}
ClientCall<String, Integer> call1 = channel.newCall(method, callOptions);
call1.start(mockCallListener, new Metadata());
SubchannelPicker picker = mock(SubchannelPicker.class);
Status status = Status.UNAVAILABLE.withDescription("for test");
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status));
helper.updateBalancingState(READY, picker);
executor.runDueTasks();
if (shouldFail) {
verify(mockCallListener).onClose(same(status), any(Metadata.class));
} else {
verifyZeroInteractions(mockCallListener);
}
// This call doesn't involve delayed transport
ClientCall<String, Integer> call2 = channel.newCall(method, callOptions);
call2.start(mockCallListener2, new Metadata());
executor.runDueTasks();
if (shouldFail) {
verify(mockCallListener2).onClose(same(status), any(Metadata.class));
} else {
verifyZeroInteractions(mockCallListener2);
}
}
@Test
public void pickerReturnsStreamTracer_noDelay() {
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
createChannel();
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
subchannel.requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
when(mockTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel, factory2));
helper.updateBalancingState(READY, mockPicker);
CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
call.start(mockCallListener, new Metadata());
verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
assertEquals(
Arrays.asList(factory1, factory2),
callOptionsCaptor.getValue().getStreamTracerFactories());
// The factories are safely not stubbed because we do not expect any usage of them.
verifyZeroInteractions(factory1);
verifyZeroInteractions(factory2);
}
@Test
public void panic_bufferedCallsWillFail() {
createChannel();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult());
helper.updateBalancingState(CONNECTING, mockPicker);
// Start RPCs that will be buffered in delayedTransport
ClientCall<String, Integer> call =
channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
call.start(mockCallListener, new Metadata());
ClientCall<String, Integer> call2 =
channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
call2.start(mockCallListener2, new Metadata());
executor.runDueTasks();
verifyZeroInteractions(mockCallListener, mockCallListener2);
// Enter panic
Throwable panicReason = new Exception("Simulated uncaught exception");
channel.panic(panicReason);
// Buffered RPCs fail immediately
executor.runDueTasks();
verifyCallListenerClosed(mockCallListener, Status.Code.INTERNAL, panicReason);
verifyCallListenerClosed(mockCallListener2, Status.Code.INTERNAL, panicReason);
}
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(mockSubchannel));
when(mockSubchannel.obtainActiveTransport()).thenReturn(mockRealTransport);
when(mockRealTransport.newStream(same(method), same(headers), same(callOptions)))
.thenReturn(mockRealStream);
when(mockRealTransport2.newStream(same(method2), same(headers2), same(callOptions2)))
.thenReturn(mockRealStream2);
delayedTransport.start(transportListener);
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
Map<String, Object> affinity =
args.getCallOptions().getOption(GrpcCallOptions.CALLOPTIONS_CUSTOME_KEY);
GrpcURL refUrl = (GrpcURL) affinity.get(GrpcCallOptions.GRPC_REF_URL);
if (size > 0) {
Subchannel subchannel = nextSubchannel(refUrl);
affinity.put(GrpcCallOptions.GRPC_NAMERESOVER_ATTRIBUTES, nameResovleCache);
return PickResult.withSubchannel(subchannel);
}
if (status != null) {
return PickResult.withError(status);
}
return PickResult.withNoResult();
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
synchronized (pickList) {
// Two-level round-robin.
// First round-robin on dropList. If a drop entry is selected, request will be dropped. If
// a non-drop entry is selected, then round-robin on pickList. This makes sure requests are
// dropped at the same proportion as the drop entries appear on the round-robin list from
// the balancer, while only backends from pickList are selected for the non-drop cases.
if (!dropList.isEmpty()) {
DropEntry drop = dropList.get(dropIndex);
dropIndex++;
if (dropIndex == dropList.size()) {
dropIndex = 0;
}
if (drop != null) {
return drop.picked();
}
}
RoundRobinEntry pick = pickList.get(pickIndex);
pickIndex++;
if (pickIndex == pickList.size()) {
pickIndex = 0;
}
return pick.picked(args.getHeaders());
}
}
@Test
public void roundRobinPickerNoDrop() {
GrpclbClientLoadRecorder loadRecorder =
new GrpclbClientLoadRecorder(fakeClock.getTimeProvider());
Subchannel subchannel = mock(Subchannel.class);
BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001");
BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002");
List<BackendEntry> pickList = Arrays.asList(b1, b2);
RoundRobinPicker picker = new RoundRobinPicker(Collections.<DropEntry>emptyList(), pickList);
PickSubchannelArgs args1 = mock(PickSubchannelArgs.class);
Metadata headers1 = new Metadata();
// The existing token on the headers will be replaced
headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
when(args1.getHeaders()).thenReturn(headers1);
assertSame(b1.result, picker.pickSubchannel(args1));
verify(args1).getHeaders();
assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
PickSubchannelArgs args2 = mock(PickSubchannelArgs.class);
Metadata headers2 = new Metadata();
when(args2.getHeaders()).thenReturn(headers2);
assertSame(b2.result, picker.pickSubchannel(args2));
verify(args2).getHeaders();
assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
PickSubchannelArgs args3 = mock(PickSubchannelArgs.class);
Metadata headers3 = new Metadata();
when(args3.getHeaders()).thenReturn(headers3);
assertSame(b1.result, picker.pickSubchannel(args3));
verify(args3).getHeaders();
assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
verify(subchannel, never()).getAttributes();
}
@Test
public void abundantInitialResponse() {
Metadata headers = new Metadata();
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
when(args.getHeaders()).thenReturn(headers);
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
assertEquals(1, fakeOobChannels.size());
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
// Simulate LB initial response
assertEquals(0, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
lbResponseObserver.onNext(buildInitialResponse(1983));
// Load reporting task is scheduled
assertEquals(1, fakeClock.numPendingTasks(LOAD_REPORTING_TASK_FILTER));
FakeClock.ScheduledTask scheduledTask =
Iterables.getOnlyElement(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER));
assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
logs.clear();
// Simulate an abundant LB initial response, with a different report interval
lbResponseObserver.onNext(buildInitialResponse(9097));
// This incident is logged
assertThat(logs).containsExactly(
"DEBUG: Got an LB response: " + buildInitialResponse(9097),
"WARNING: Ignoring unexpected response type: INITIAL_RESPONSE").inOrder();
// It doesn't affect load-reporting at all
assertThat(fakeClock.getPendingTasks(LOAD_REPORTING_TASK_FILTER))
.containsExactly(scheduledTask);
assertEquals(1983, scheduledTask.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
PickResult result = delegate().pickSubchannel(args);
if (!result.getStatus().isOk()) {
return result;
}
if (result.getSubchannel() == null) {
return result;
}
ClientStreamTracer.Factory originFactory = result.getStreamTracerFactory();
if (originFactory == null) {
originFactory = NOOP_CLIENT_STREAM_TRACER_FACTORY;
}
return PickResult.withSubchannel(result.getSubchannel(), wrapTracerFactory(originFactory));
}
/**
* Caller must call {@code syncContext.drain()} outside of lock because this method may
* schedule tasks on syncContext.
*/
@GuardedBy("lock")
private PendingStream createPendingStream(PickSubchannelArgs args) {
PendingStream pendingStream = new PendingStream(args);
pendingStreams.add(pendingStream);
if (getPendingStreamsCount() == 1) {
syncContext.executeLater(reportTransportInUse);
}
return pendingStream;
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
for (DropOverload dropOverload : dropOverloads) {
int rand = random.nextInt(1000_000);
if (rand < dropOverload.getDropsPerMillion()) {
logger.log(
XdsLogLevel.INFO,
"Drop request with category: {0}", dropOverload.getCategory());
loadStatsStore.recordDroppedRequest(dropOverload.getCategory());
return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
"dropped by loadbalancer: " + dropOverload.toString()));
}
}
return delegate.pickSubchannel(args);
}
@Override
public void handleNameResolutionError(final Status error) {
SubchannelPicker picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error.augmentDescription("handled by downstream balancer"));
}
};
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, picker);
}