下面列出了怎么用io.grpc.LoadBalancer.SubchannelPicker的API类实例代码及写法,或者点击链接到github查看源代码。
@VisibleForTesting
void panic(final Throwable t) {
if (panicMode) {
// Preserve the first panic information
return;
}
panicMode = true;
cancelIdleTimer(/* permanent= */ true);
shutdownNameResolverAndLoadBalancer(false);
final class PanicSubchannelPicker extends SubchannelPicker {
private final PickResult panicPickResult =
PickResult.withDrop(
Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return panicPickResult;
}
}
updateSubchannelPicker(new PanicSubchannelPicker());
channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
channelStateManager.gotoState(TRANSIENT_FAILURE);
}
@Override
public void updateBalancingState(
final ConnectivityState newState, final SubchannelPicker newPicker) {
checkNotNull(newState, "newState");
checkNotNull(newPicker, "newPicker");
final class UpdateBalancingState implements Runnable {
@Override
public void run() {
if (LbHelperImpl.this != lbHelper) {
return;
}
updateSubchannelPicker(newPicker);
// It's not appropriate to report SHUTDOWN state from lb.
// Ignore the case of newState == SHUTDOWN for now.
if (newState != SHUTDOWN) {
channelLogger.log(ChannelLogLevel.INFO, "Entering {0} state", newState);
channelStateManager.gotoState(newState);
}
}
}
syncContext.execute(new UpdateBalancingState());
}
@Test
public void newStream_racesWithReprocessIdleMode() throws Exception {
SubchannelPicker picker = new SubchannelPicker() {
@Override public PickResult pickSubchannel(PickSubchannelArgs args) {
// Assume entering idle mode raced with the pick
delayedTransport.reprocess(null);
// Act like IDLE LB
return PickResult.withNoResult();
}
};
// Because there is no pending stream yet, it will do nothing but save the picker.
delayedTransport.reprocess(picker);
ClientStream stream = delayedTransport.newStream(method, headers, callOptions);
stream.start(streamListener);
assertTrue(delayedTransport.hasPendingStreams());
verify(transportListener).transportInUse(true);
}
@Test
public void reprocess_NoPendingStream() {
SubchannelPicker picker = mock(SubchannelPicker.class);
AbstractSubchannel subchannel = mock(AbstractSubchannel.class);
when(subchannel.getInternalSubchannel()).thenReturn(mockInternalSubchannel);
when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel));
when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class),
any(CallOptions.class))).thenReturn(mockRealStream);
delayedTransport.reprocess(picker);
verifyNoMoreInteractions(picker);
verifyNoMoreInteractions(transportListener);
// Though picker was not originally used, it will be saved and serve future streams.
ClientStream stream = delayedTransport.newStream(method, headers, CallOptions.DEFAULT);
verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT));
verify(mockInternalSubchannel).obtainActiveTransport();
assertSame(mockRealStream, stream);
}
@Test
public void handleResolvedAddressGroups_keepOldBalancer() {
final List<EquivalentAddressGroup> servers =
Collections.singletonList(
new EquivalentAddressGroup(new SocketAddress(){}, Attributes.EMPTY));
Helper helper = new TestHelper() {
@Override
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
assertThat(addrs).isEqualTo(servers);
return new TestSubchannel(addrs, attrs);
}
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
// noop
}
};
AutoConfiguredLoadBalancer lb =
(AutoConfiguredLoadBalancer) lbf.newLoadBalancer(helper);
LoadBalancer oldDelegate = lb.getDelegate();
lb.handleResolvedAddressGroups(servers, Attributes.EMPTY);
assertThat(lb.getDelegate()).isSameAs(oldDelegate);
}
@Test
public void requestConnectionPicker() throws Exception {
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
InOrder inOrder = inOrder(mockHelper, mockSubchannel);
inOrder.verify(mockSubchannel).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
inOrder.verify(mockSubchannel).requestConnection();
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
// Calling pickSubchannel() twice gave the same result
assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs));
// But the picker calls requestConnection() only once
inOrder.verify(mockSubchannel).requestConnection();
verify(mockSubchannel, times(2)).requestConnection();
}
private void updatePicker(
@Nullable ConnectivityState state, List<WeightedChildPicker> childPickers) {
SubchannelPicker picker;
if (childPickers.isEmpty()) {
if (state == TRANSIENT_FAILURE) {
picker = new ErrorPicker(Status.UNAVAILABLE); // TODO: more details in status
} else {
picker = XdsSubchannelPickers.BUFFER_PICKER;
}
} else {
picker = new WeightedRandomPicker(childPickers);
}
if (!dropOverloads.isEmpty()) {
picker = new DroppablePicker(dropOverloads, picker, random, loadStatsStore);
}
if (state != null) {
helper.updateBalancingState(state, picker);
}
}
@Test
public void subchannelPickerInterceptedWithLoadRecording() {
List<EquivalentAddressGroup> backendAddrs = createResolvedBackendAddresses(2);
deliverResolvedAddresses(backendAddrs, "round_robin");
FakeLoadBalancer childBalancer = (FakeLoadBalancer) childBalancers.poll();
NoopSubchannel subchannel = childBalancer.subchannels.values().iterator().next();
deliverSubchannelState(subchannel, ConnectivityState.READY);
assertThat(loadRecorder.recording).isTrue();
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
assertThat(picker).isInstanceOf(LoadRecordingSubchannelPicker.class);
PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
ClientStreamTracer.Factory tracerFactory = result.getStreamTracerFactory();
assertThat(((LoadRecordingStreamTracerFactory) tracerFactory).getCounter())
.isSameInstanceAs(counter);
loadBalancer.shutdown();
assertThat(childBalancer.shutdown).isTrue();
assertThat(loadRecorder.recording).isFalse();
}
@Test
public void transientError_withPreviousEndpointUpdateReceived() {
deliverResolvedAddresses(null, null, fakeEndpointPickingPolicy);
// Endpoint update received.
ClusterLoadAssignment clusterLoadAssignment =
buildClusterLoadAssignment(CLUSTER_NAME,
ImmutableList.of(
buildLocalityLbEndpoints("region1", "zone1", "subzone1",
ImmutableList.of(
buildLbEndpoint("192.168.0.1", 8080, HEALTHY, 2)),
1, 0)),
ImmutableList.of(buildDropOverload("throttle", 1000)));
deliverClusterLoadAssignments(clusterLoadAssignment);
verify(helper, never()).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
// XdsClient stream receives an error.
responseObserver.onError(new RuntimeException("fake error"));
verify(helper, never()).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
}
@Override
public void updateBalancingState(
final ConnectivityState newState, final SubchannelPicker newPicker) {
checkNotNull(newState, "newState");
checkNotNull(newPicker, "newPicker");
logWarningIfNotInSyncContext("updateBalancingState()");
final class UpdateBalancingState implements Runnable {
@Override
public void run() {
if (LbHelperImpl.this != lbHelper) {
return;
}
updateSubchannelPicker(newPicker);
// It's not appropriate to report SHUTDOWN state from lb.
// Ignore the case of newState == SHUTDOWN for now.
if (newState != SHUTDOWN) {
channelLogger.log(
ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
channelStateManager.gotoState(newState);
}
}
}
syncContext.execute(new UpdateBalancingState());
}
@Test
public void clusterWatcher_resourceNotExist() {
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(Attributes.newBuilder()
.set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
.build())
.setLoadBalancingPolicyConfig(new CdsConfig("foo.googleapis.com"))
.build();
cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);
ArgumentCaptor<ClusterWatcher> clusterWatcherCaptor = ArgumentCaptor.forClass(null);
verify(xdsClient).watchClusterData(eq("foo.googleapis.com"), clusterWatcherCaptor.capture());
ClusterWatcher clusterWatcher = clusterWatcherCaptor.getValue();
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
clusterWatcher.onResourceDoesNotExist("foo.googleapis.com");
assertThat(edsLoadBalancers).isEmpty();
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("Resource foo.googleapis.com is unavailable");
}
@Test
public void nameResolutionSuccessAfterError() throws Exception {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));
inOrder.verify(mockHelper)
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
verify(mockSubchannel, never()).requestConnection();
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture());
CreateSubchannelArgs args = createArgsCaptor.getValue();
assertThat(args.getAddresses()).isEqualTo(servers);
assertThat(args.getAttributes()).isEqualTo(Attributes.EMPTY);
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection();
assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs)
.getSubchannel());
assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs),
pickerCaptor.getValue().pickSubchannel(mockArgs));
verifyNoMoreInteractions(mockHelper);
}
@Test
public void newPolicyNameTheSameAsCurrentPolicy_shouldShutdownPendingLb() {
gracefulSwitchLb.switchTo(lbProviders.get(lbPolicies[0]));
LoadBalancer lb0 = balancers.get(lbPolicies[0]);
gracefulSwitchLb.switchTo(lbProviders.get(lbPolicies[0]));
assertThat(balancers.get(lbPolicies[0])).isSameInstanceAs(lb0);
Helper helper0 = helpers.get(lb0);
SubchannelPicker picker = mock(SubchannelPicker.class);
helper0.updateBalancingState(READY, picker);
gracefulSwitchLb.switchTo(lbProviders.get(lbPolicies[1]));
LoadBalancer lb1 = balancers.get(lbPolicies[1]);
gracefulSwitchLb.switchTo(lbProviders.get(lbPolicies[0]));
verify(lb1).shutdown();
assertThat(balancers.get(lbPolicies[0])).isSameInstanceAs(lb0);
verifyNoMoreInteractions(lb0, lb1);
}
@Test
public void transientFailureOnInitialResolutionError() {
gracefulSwitchLb.handleNameResolutionError(Status.DATA_LOSS);
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
assertThat(picker.pickSubchannel(mock(PickSubchannelArgs.class)).getStatus().getCode())
.isEqualTo(Status.Code.DATA_LOSS);
}
@Test
public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() {
Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
// Name resolution is started as soon as channel is created.
createChannel();
verify(mockLoadBalancer).handleNameResolutionError(same(error));
FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
assertNotNull(nameResolverBackoff);
assertFalse(nameResolverBackoff.isCancelled());
// Add a pending call to the delayed transport
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
Metadata headers = new Metadata();
call.start(mockCallListener, headers);
// The pending call on the delayed transport stops the name resolver backoff from cancelling
channel.shutdown();
assertFalse(nameResolverBackoff.isCancelled());
// Notify that a subchannel is ready, which drains the delayed transport
SubchannelPicker picker = mock(SubchannelPicker.class);
Status status = Status.UNAVAILABLE.withDescription("for test");
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withDrop(status));
helper.updateBalancingState(READY, picker);
executor.runDueTasks();
verify(mockCallListener).onClose(same(status), any(Metadata.class));
assertTrue(nameResolverBackoff.isCancelled());
}
private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) {
createChannel();
// This call will be buffered by the channel, thus involve delayed transport
CallOptions callOptions = CallOptions.DEFAULT;
if (waitForReady) {
callOptions = callOptions.withWaitForReady();
} else {
callOptions = callOptions.withoutWaitForReady();
}
ClientCall<String, Integer> call1 = channel.newCall(method, callOptions);
call1.start(mockCallListener, new Metadata());
SubchannelPicker picker = mock(SubchannelPicker.class);
Status status = Status.UNAVAILABLE.withDescription("for test");
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status));
helper.updateBalancingState(READY, picker);
executor.runDueTasks();
if (shouldFail) {
verify(mockCallListener).onClose(same(status), any(Metadata.class));
} else {
verifyZeroInteractions(mockCallListener);
}
// This call doesn't involve delayed transport
ClientCall<String, Integer> call2 = channel.newCall(method, callOptions);
call2.start(mockCallListener2, new Metadata());
executor.runDueTasks();
if (shouldFail) {
verify(mockCallListener2).onClose(same(status), any(Metadata.class));
} else {
verifyZeroInteractions(mockCallListener2);
}
}
/**
* If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
* picker will be consulted.
*
* <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
* returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
*/
@Override
public final ClientStream newStream(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
try {
PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
SubchannelPicker picker = null;
long pickerVersion = -1;
while (true) {
synchronized (lock) {
if (shutdownStatus != null) {
return new FailingClientStream(shutdownStatus);
}
if (lastPicker == null) {
return createPendingStream(args);
}
// Check for second time through the loop, and whether anything changed
if (picker != null && pickerVersion == lastPickerVersion) {
return createPendingStream(args);
}
picker = lastPicker;
pickerVersion = lastPickerVersion;
}
PickResult pickResult = picker.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady());
if (transport != null) {
return transport.newStream(
args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions());
}
// This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible
// race with reprocess()), we will buffer it. Otherwise, will try with the new picker.
}
} finally {
syncContext.drain();
}
}
@Test
public void requestConnection() {
loadBalancer.handleResolvedAddressGroups(servers, affinity);
loadBalancer.handleSubchannelState(mockSubchannel, ConnectivityStateInfo.forNonError(IDLE));
verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
verify(mockSubchannel).requestConnection();
picker.requestConnection();
verify(mockSubchannel, times(2)).requestConnection();
}
private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) {
createChannel();
// This call will be buffered by the channel, thus involve delayed transport
CallOptions callOptions = CallOptions.DEFAULT;
if (waitForReady) {
callOptions = callOptions.withWaitForReady();
} else {
callOptions = callOptions.withoutWaitForReady();
}
ClientCall<String, Integer> call1 = channel.newCall(method, callOptions);
call1.start(mockCallListener, new Metadata());
SubchannelPicker picker = mock(SubchannelPicker.class);
Status status = Status.UNAVAILABLE.withDescription("for test");
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status));
updateBalancingStateSafely(helper, READY, picker);
executor.runDueTasks();
if (shouldFail) {
verify(mockCallListener).onClose(same(status), any(Metadata.class));
} else {
verifyZeroInteractions(mockCallListener);
}
// This call doesn't involve delayed transport
ClientCall<String, Integer> call2 = channel.newCall(method, callOptions);
call2.start(mockCallListener2, new Metadata());
executor.runDueTasks();
if (shouldFail) {
verify(mockCallListener2).onClose(same(status), any(Metadata.class));
} else {
verifyZeroInteractions(mockCallListener2);
}
}
@Test
public void canHandleEmptyAddressListFromNameResolutionForwardedToLatestPolicy() {
gracefulSwitchLb.switchTo(lbProviders.get(lbPolicies[0]));
LoadBalancer lb0 = balancers.get(lbPolicies[0]);
Helper helper0 = helpers.get(lb0);
SubchannelPicker picker = mock(SubchannelPicker.class);
helper0.updateBalancingState(READY, picker);
assertThat(gracefulSwitchLb.canHandleEmptyAddressListFromNameResolution()).isFalse();
doReturn(true).when(lb0).canHandleEmptyAddressListFromNameResolution();
assertThat(gracefulSwitchLb.canHandleEmptyAddressListFromNameResolution()).isTrue();
gracefulSwitchLb.switchTo(lbProviders.get(lbPolicies[1]));
LoadBalancer lb1 = balancers.get(lbPolicies[1]);
assertThat(gracefulSwitchLb.canHandleEmptyAddressListFromNameResolution()).isFalse();
doReturn(true).when(lb1).canHandleEmptyAddressListFromNameResolution();
assertThat(gracefulSwitchLb.canHandleEmptyAddressListFromNameResolution()).isTrue();
gracefulSwitchLb.switchTo(lbProviders.get(lbPolicies[2]));
LoadBalancer lb2 = balancers.get(lbPolicies[2]);
assertThat(gracefulSwitchLb.canHandleEmptyAddressListFromNameResolution()).isFalse();
doReturn(true).when(lb2).canHandleEmptyAddressListFromNameResolution();
assertThat(gracefulSwitchLb.canHandleEmptyAddressListFromNameResolution()).isTrue();
}
@Test
public void pickerEmptyList() throws Exception {
SubchannelPicker picker = new EmptyPicker(Status.UNKNOWN);
assertEquals(null, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(Status.UNKNOWN,
picker.pickSubchannel(mockArgs).getStatus());
}
@Test
public void noStickinessEnabled_withStickyHeader() {
loadBalancer.handleResolvedAddressGroups(servers, Attributes.EMPTY);
for (Subchannel subchannel : subchannels.values()) {
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
}
verify(mockHelper, times(4))
.updateBalancingState(any(ConnectivityState.class), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue = new Metadata();
headerWithStickinessValue.put(stickinessKey, "my-sticky-value");
doReturn(headerWithStickinessValue).when(mockArgs).getHeaders();
List<Subchannel> allSubchannels = getList(picker);
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc3 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc4 = picker.pickSubchannel(mockArgs).getSubchannel();
assertEquals(nextSubchannel(sc1, allSubchannels), sc2);
assertEquals(nextSubchannel(sc2, allSubchannels), sc3);
assertEquals(nextSubchannel(sc3, allSubchannels), sc1);
assertEquals(sc4, sc1);
assertNull(loadBalancer.getStickinessMapForTest());
}
@Test
public void stickinessEnabled_withoutStickyHeader() {
Map<String, Object> serviceConfig = new HashMap<String, Object>();
serviceConfig.put("stickinessMetadataKey", "my-sticky-key");
Attributes attributes = Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
loadBalancer.handleResolvedAddressGroups(servers, attributes);
for (Subchannel subchannel : subchannels.values()) {
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
doReturn(new Metadata()).when(mockArgs).getHeaders();
List<Subchannel> allSubchannels = getList(picker);
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc3 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc4 = picker.pickSubchannel(mockArgs).getSubchannel();
assertEquals(nextSubchannel(sc1, allSubchannels), sc2);
assertEquals(nextSubchannel(sc2, allSubchannels), sc3);
assertEquals(nextSubchannel(sc3, allSubchannels), sc1);
assertEquals(sc4, sc1);
verify(mockArgs, times(4)).getHeaders();
assertNotNull(loadBalancer.getStickinessMapForTest());
assertThat(loadBalancer.getStickinessMapForTest()).isEmpty();
}
@Test
public void stickinessEnabled_withStickyHeader() {
Map<String, Object> serviceConfig = new HashMap<String, Object>();
serviceConfig.put("stickinessMetadataKey", "my-sticky-key");
Attributes attributes = Attributes.newBuilder()
.set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build();
loadBalancer.handleResolvedAddressGroups(servers, attributes);
for (Subchannel subchannel : subchannels.values()) {
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
}
verify(mockHelper, times(4))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue = new Metadata();
headerWithStickinessValue.put(stickinessKey, "my-sticky-value");
doReturn(headerWithStickinessValue).when(mockArgs).getHeaders();
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
assertEquals(sc1, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(sc1, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(sc1, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(sc1, picker.pickSubchannel(mockArgs).getSubchannel());
verify(mockArgs, atLeast(4)).getHeaders();
assertNotNull(loadBalancer.getStickinessMapForTest());
assertThat(loadBalancer.getStickinessMapForTest()).hasSize(1);
}
@Test
public void updateBalancingState_triggersListener() {
ChildPolicyWrapper childPolicyWrapper = factory.createOrGet("foo.google.com");
ChildPolicyReportingHelper childPolicyReportingHelper = childPolicyWrapper.getHelper();
SubchannelPicker childPicker = mock(SubchannelPicker.class);
childPolicyReportingHelper.updateBalancingState(ConnectivityState.READY, childPicker);
verify(childLbStatusListener).onStatusChanged(ConnectivityState.READY);
assertThat(childPolicyWrapper.getPicker()).isEqualTo(childPicker);
// picker governs childPickers will be reported to parent LB
verify(helper).updateBalancingState(ConnectivityState.READY, picker);
}
@Test
public void handleResolvedAddressesAndNameResolutionErrorForwardedToLatestPolicy() {
gracefulSwitchLb.switchTo(lbProviders.get(lbPolicies[0]));
LoadBalancer lb0 = balancers.get(lbPolicies[0]);
Helper helper0 = helpers.get(lb0);
SubchannelPicker picker = mock(SubchannelPicker.class);
helper0.updateBalancingState(READY, picker);
ResolvedAddresses addresses = newFakeAddresses();
gracefulSwitchLb.handleResolvedAddresses(addresses);
verify(lb0).handleResolvedAddresses(addresses);
gracefulSwitchLb.handleNameResolutionError(Status.DATA_LOSS);
verify(lb0).handleNameResolutionError(Status.DATA_LOSS);
gracefulSwitchLb.switchTo(lbProviders.get(lbPolicies[1]));
LoadBalancer lb1 = balancers.get(lbPolicies[1]);
addresses = newFakeAddresses();
gracefulSwitchLb.handleResolvedAddresses(addresses);
verify(lb0, never()).handleResolvedAddresses(addresses);
verify(lb1).handleResolvedAddresses(addresses);
gracefulSwitchLb.handleNameResolutionError(Status.ALREADY_EXISTS);
verify(lb0, never()).handleNameResolutionError(Status.ALREADY_EXISTS);
verify(lb1).handleNameResolutionError(Status.ALREADY_EXISTS);
gracefulSwitchLb.switchTo(lbProviders.get(lbPolicies[2]));
verify(lb1).shutdown();
LoadBalancer lb2 = balancers.get(lbPolicies[2]);
addresses = newFakeAddresses();
gracefulSwitchLb.handleResolvedAddresses(addresses);
verify(lb0, never()).handleResolvedAddresses(addresses);
verify(lb1, never()).handleResolvedAddresses(addresses);
verify(lb2).handleResolvedAddresses(addresses);
gracefulSwitchLb.handleNameResolutionError(Status.CANCELLED);
verify(lb0, never()).handleNameResolutionError(Status.CANCELLED);
verify(lb1, never()).handleNameResolutionError(Status.CANCELLED);
verify(lb2).handleNameResolutionError(Status.CANCELLED);
verifyNoMoreInteractions(lb0, lb1, lb2);
}
void handleSubchannelStateChange(final ConnectivityStateInfo newState) {
channelTracer.reportEvent(
new ChannelTrace.Event.Builder()
.setDescription("Entering " + newState.getState() + " state")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timeProvider.currentTimeNanos())
.build());
switch (newState.getState()) {
case READY:
case IDLE:
delayedTransport.reprocess(subchannelPicker);
break;
case TRANSIENT_FAILURE:
final class OobErrorPicker extends SubchannelPicker {
final PickResult errorResult = PickResult.withError(newState.getStatus());
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return errorResult;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(OobErrorPicker.class)
.add("errorResult", errorResult)
.toString();
}
}
delayedTransport.reprocess(new OobErrorPicker());
break;
default:
// Do nothing
}
}
ChildHelper() {
final ClientLoadCounter counter = loadStatsStore.getLocalityCounter(locality);
Helper delegate = new ForwardingLoadBalancerHelper() {
@Override
protected Helper delegate() {
return helper;
}
@Override
public void updateBalancingState(
ConnectivityState newState, SubchannelPicker newPicker) {
logger.log(
XdsLogLevel.INFO,
"Update load balancing state for locality {0} to {1}", locality, newState);
currentChildState = newState;
currentChildPicker =
new LoadRecordingSubchannelPicker(
counter,
new MetricsObservingSubchannelPicker(new MetricsRecordingListener(counter),
newPicker, orcaPerRequestUtil));
priorityManager.updatePriorityState(priorityManager.getPriority(locality));
}
@Override
public String getAuthority() {
//FIXME: This should be a new proposed field of Locality, locality_name
return locality.getSubZone();
}
};
orcaReportingHelperWrapper =
orcaOobUtil.newOrcaReportingHelperWrapper(
delegate, new MetricsRecordingListener(counter));
if (metricsReportIntervalNano > 0) {
updateMetricsReportInterval(metricsReportIntervalNano);
}
}
@Test
public void pickerEmptyList() throws Exception {
SubchannelPicker picker = new EmptyPicker(Status.UNKNOWN);
assertEquals(null, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(Status.UNKNOWN,
picker.pickSubchannel(mockArgs).getStatus());
}
/** Creates a {@link SubchannelPicker} that returns the given {@link Subchannel} on every pick. */
public static SubchannelPicker pickerOf(final Subchannel subchannel) {
return new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel);
}
};
}