下面列出了怎么用io.grpc.LoadBalancer.Subchannel的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void returnSubchannel(Subchannel subchannel) {
CacheEntry prev = cache.get(subchannel.getAddresses());
if (prev != null) {
// Returning the same Subchannel twice has no effect.
// Returning a different Subchannel for an already cached EAG will cause the
// latter Subchannel to be shutdown immediately.
if (prev.subchannel != subchannel) {
subchannel.shutdown();
}
return;
}
final ShutdownSubchannelTask shutdownTask = new ShutdownSubchannelTask(subchannel);
ScheduledHandle shutdownTimer =
helper.getSynchronizationContext().schedule(
shutdownTask, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS,
helper.getScheduledExecutorService());
CacheEntry entry = new CacheEntry(subchannel, shutdownTimer);
cache.put(subchannel.getAddresses(), entry);
}
@Test
public void clear() {
Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1);
Subchannel subchannel2 = pool.takeOrCreateSubchannel(EAG2, ATTRS2);
Subchannel subchannel3 = pool.takeOrCreateSubchannel(EAG2, ATTRS2);
pool.returnSubchannel(subchannel1);
pool.returnSubchannel(subchannel2);
verify(subchannel1, never()).shutdown();
verify(subchannel2, never()).shutdown();
pool.clear();
verify(subchannel1).shutdown();
verify(subchannel2).shutdown();
verify(subchannel3, never()).shutdown();
assertThat(clock.numPendingTasks()).isEqualTo(0);
}
@Test
public void updateSubchannelAddresses_newAddressConnects() {
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata()); // Create LB
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
Helper helper = helperCaptor.getValue();
Subchannel subchannel = createSubchannelSafely(helper, servers.get(0), Attributes.EMPTY);
subchannel.requestConnection();
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();
helper.updateSubchannelAddresses(subchannel, servers.get(1));
subchannel.requestConnection();
MockClientTransportInfo t1 = newTransports.poll();
t1.listener.transportReady();
}
@Test
public void roundRobinPickerWithIdleEntry_noDrop() {
Subchannel subchannel = mock(Subchannel.class);
IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext);
RoundRobinPicker picker =
new RoundRobinPicker(Collections.<DropEntry>emptyList(), Collections.singletonList(entry));
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
verify(subchannel, never()).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
verify(subchannel).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
// Only the first pick triggers requestConnection()
verify(subchannel).requestConnection();
}
@Override
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
// HealthCheckState is not thread-safe, we are requiring the original LoadBalancer calls
// createSubchannel() from the SynchronizationContext.
syncContext.throwIfNotInThisSynchronizationContext();
HealthCheckState hcState = new HealthCheckState(
this, delegateBalancer, syncContext, delegate.getScheduledExecutorService());
hcStates.add(hcState);
Subchannel subchannel = super.createSubchannel(
addrs, attrs.toBuilder().set(KEY_HEALTH_CHECK_STATE, hcState).build());
hcState.init(subchannel);
if (healthCheckedService != null) {
hcState.setServiceName(healthCheckedService);
}
return subchannel;
}
@Before
public void setUp() {
doAnswer(new Answer<Subchannel>() {
@Override
public Subchannel answer(InvocationOnMock invocation) throws Throwable {
Subchannel subchannel = mock(Subchannel.class);
CreateSubchannelArgs args = (CreateSubchannelArgs) invocation.getArguments()[0];
when(subchannel.getAllAddresses()).thenReturn(args.getAddresses());
when(subchannel.getAttributes()).thenReturn(args.getAttributes());
mockSubchannels.add(subchannel);
return subchannel;
}
}).when(helper).createSubchannel(any(CreateSubchannelArgs.class));
when(helper.getSynchronizationContext()).thenReturn(syncContext);
when(helper.getScheduledExecutorService()).thenReturn(clock.getScheduledExecutorService());
pool.registerListener(listener);
}
@Test
public void subchannelsNoConnectionShutdown() {
createChannel();
Subchannel sub1 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
Subchannel sub2 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
channel.shutdown();
verify(mockLoadBalancer).shutdown();
shutdownSafely(helper, sub1);
assertFalse(channel.isTerminated());
shutdownSafely(helper, sub2);
assertTrue(channel.isTerminated());
verify(mockTransportFactory, never())
.newClientTransport(
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
}
@Test
public void subchannelStateChange_updateChildPolicyWrapper() {
ChildPolicyWrapper childPolicyWrapper = factory.createOrGet("foo.google.com");
ChildPolicyReportingHelper childPolicyReportingHelper = childPolicyWrapper.getHelper();
FakeSubchannel fakeSubchannel = new FakeSubchannel();
when(helper.createSubchannel(any(CreateSubchannelArgs.class))).thenReturn(fakeSubchannel);
Subchannel subchannel =
childPolicyReportingHelper
.createSubchannel(
CreateSubchannelArgs.newBuilder()
.setAddresses(new EquivalentAddressGroup(mock(SocketAddress.class)))
.build());
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
// no-op
}
});
fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
assertThat(childPolicyWrapper.getConnectivityStateInfo())
.isEqualTo(ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
}
/**
* Returns the subchannel associated to the stickiness value if available in both the
* registry and the round robin list, otherwise associates the given subchannel with the
* stickiness key in the registry and returns the given subchannel.
*/
@Nonnull
Subchannel maybeRegister(
String stickinessValue, @Nonnull Subchannel subchannel) {
final Ref<Subchannel> newSubchannelRef = subchannel.getAttributes().get(STICKY_REF);
while (true) {
Ref<Subchannel> existingSubchannelRef =
stickinessMap.putIfAbsent(stickinessValue, newSubchannelRef);
if (existingSubchannelRef == null) {
// new entry
addToEvictionQueue(stickinessValue);
return subchannel;
} else {
// existing entry
Subchannel existingSubchannel = existingSubchannelRef.value;
if (existingSubchannel != null && isReady(existingSubchannel)) {
return existingSubchannel;
}
}
// existingSubchannelRef is not null but no longer valid, replace it
if (stickinessMap.replace(stickinessValue, existingSubchannelRef, newSubchannelRef)) {
return subchannel;
}
// another thread concurrently removed or updated the entry, try again
}
}
@Deprecated
@Test
public void helper_createSubchannel_old_delegates() {
class OverrideCreateSubchannel extends NoopHelper {
boolean ran;
@Override
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrsIn, Attributes attrsIn) {
assertThat(addrsIn).hasSize(1);
assertThat(addrsIn.get(0)).isSameInstanceAs(eag);
assertThat(attrsIn).isSameInstanceAs(attrs);
ran = true;
return subchannel;
}
}
OverrideCreateSubchannel helper = new OverrideCreateSubchannel();
assertThat(helper.createSubchannel(eag, attrs)).isSameInstanceAs(subchannel);
assertThat(helper.ran).isTrue();
}
private static Subchannel createUnstartedSubchannel(
final Helper helper, final EquivalentAddressGroup addressGroup, final Attributes attrs) {
final AtomicReference<Subchannel> resultCapture = new AtomicReference<>();
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
Subchannel s = helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(addressGroup)
.setAttributes(attrs)
.build());
resultCapture.set(s);
}
});
return resultCapture.get();
}
@Test
public void helper_updateSubchannelAddresses_delegates() {
class OverrideUpdateSubchannel extends NoopHelper {
boolean ran;
@Override
public void updateSubchannelAddresses(
Subchannel subchannelIn, List<EquivalentAddressGroup> addrsIn) {
assertThat(subchannelIn).isSameAs(emptySubchannel);
assertThat(addrsIn).hasSize(1);
assertThat(addrsIn.get(0)).isSameAs(eag);
ran = true;
}
}
OverrideUpdateSubchannel helper = new OverrideUpdateSubchannel();
helper.updateSubchannelAddresses(emptySubchannel, eag);
assertThat(helper.ran).isTrue();
}
@Test
public void subchannelChannel_failWhenNotReady() {
createChannel();
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
Channel sChannel = subchannel.asChannel();
Metadata headers = new Metadata();
subchannel.requestConnection();
verify(mockTransportFactory)
.newClientTransport(any(SocketAddress.class), any(ClientTransportOptions.class));
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
assertEquals(0, balancerRpcExecutor.numPendingTasks());
// Subchannel is still CONNECTING, but not READY yet
ClientCall<String, Integer> call = sChannel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, headers);
verify(mockTransport, never()).newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class));
verifyZeroInteractions(mockCallListener);
assertEquals(1, balancerRpcExecutor.runDueTasks());
verify(mockCallListener).onClose(
same(SubchannelChannel.NOT_READY_ERROR), any(Metadata.class));
}
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
syncContext.throwIfNotInThisSynchronizationContext();
OrcaReportingState orcaState = args.getOption(ORCA_REPORTING_STATE_KEY);
boolean augmented = false;
if (orcaState == null) {
// Only the first load balancing policy requesting ORCA reports instantiates an
// OrcaReportingState.
orcaState = new OrcaReportingState(this, syncContext,
delegate().getScheduledExecutorService());
args = args.toBuilder().addOption(ORCA_REPORTING_STATE_KEY, orcaState).build();
augmented = true;
}
orcaStates.add(orcaState);
orcaState.listeners.add(this);
Subchannel subchannel = super.createSubchannel(args);
if (augmented) {
subchannel = new SubchannelImpl(subchannel, orcaState);
}
if (orcaConfig != null) {
orcaState.setReportingConfig(this, orcaConfig);
}
return subchannel;
}
@Test
public void pickerRoundRobin() throws Exception {
Subchannel subchannel = mock(Subchannel.class);
Subchannel subchannel1 = mock(Subchannel.class);
Subchannel subchannel2 = mock(Subchannel.class);
ReadyPicker picker = new ReadyPicker(Collections.unmodifiableList(
Lists.<Subchannel>newArrayList(subchannel, subchannel1, subchannel2)),
0 /* startIndex */, null /* stickinessState */);
assertThat(picker.getList()).containsExactly(subchannel, subchannel1, subchannel2);
assertEquals(subchannel, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(subchannel1, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(subchannel2, picker.pickSubchannel(mockArgs).getSubchannel());
assertEquals(subchannel, picker.pickSubchannel(mockArgs).getSubchannel());
}
@Test
public void handleResolvedAddressGroups_keepOldBalancer() {
final List<EquivalentAddressGroup> servers =
Collections.singletonList(new EquivalentAddressGroup(new SocketAddress(){}));
Helper helper = new TestHelper() {
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
assertThat(args.getAddresses()).isEqualTo(servers);
return new TestSubchannel(args);
}
};
AutoConfiguredLoadBalancer lb = lbf.newLoadBalancer(helper);
LoadBalancer oldDelegate = lb.getDelegate();
Status handleResult = lb.tryHandleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(Attributes.EMPTY)
.setLoadBalancingPolicyConfig(null)
.build());
assertThat(handleResult.getCode()).isEqualTo(Status.Code.OK);
assertThat(lb.getDelegate()).isSameInstanceAs(oldDelegate);
}
private List<Subchannel> fallbackTestVerifyUseOfBalancerBackendLists(
InOrder inOrder, List<ServerEntry> servers) {
ArrayList<EquivalentAddressGroup> addrs = new ArrayList<>();
ArrayList<String> tokens = new ArrayList<>();
for (ServerEntry server : servers) {
addrs.add(new EquivalentAddressGroup(server.addr, LB_BACKEND_ATTRS));
tokens.add(server.token);
}
return fallbackTestVerifyUseOfBackendLists(inOrder, addrs, tokens);
}
@Test
public void updateBalancingState_withWrappedSubchannel() {
ClientStream mockStream = mock(ClientStream.class);
createChannel();
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata());
final Subchannel subchannel1 =
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
requestConnectionSafely(helper, subchannel1);
MockClientTransportInfo transportInfo = transports.poll();
ConnectionClientTransport mockTransport = transportInfo.transport;
ManagedClientTransport.Listener transportListener = transportInfo.listener;
when(mockTransport.newStream(same(method), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
transportListener.transportReady();
Subchannel wrappedSubchannel1 = new ForwardingSubchannel() {
@Override
protected Subchannel delegate() {
return subchannel1;
}
};
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(wrappedSubchannel1));
updateBalancingStateSafely(helper, READY, mockPicker);
executor.runDueTasks();
verify(mockTransport).newStream(same(method), any(Metadata.class), any(CallOptions.class));
verify(mockStream).start(any(ClientStreamListener.class));
}
/**
* Make and use a picker out of the current lists and the states of subchannels if they have
* changed since the last picker created.
*/
private void maybeUpdatePicker() {
List<RoundRobinEntry> pickList = new ArrayList<>(backendList.size());
Status error = null;
boolean hasIdle = false;
for (BackendEntry entry : backendList) {
Subchannel subchannel = entry.result.getSubchannel();
Attributes attrs = subchannel.getAttributes();
ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get();
if (stateInfo.getState() == READY) {
pickList.add(entry);
} else if (stateInfo.getState() == TRANSIENT_FAILURE) {
error = stateInfo.getStatus();
} else if (stateInfo.getState() == IDLE) {
hasIdle = true;
}
}
ConnectivityState state;
if (pickList.isEmpty()) {
if (error != null && !hasIdle) {
pickList.add(new ErrorEntry(error));
state = TRANSIENT_FAILURE;
} else {
pickList.add(BUFFER_ENTRY);
state = CONNECTING;
}
} else {
state = READY;
}
maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList));
}
@Test
public void roundRobinPickerNoDrop() {
GrpclbClientLoadRecorder loadRecorder =
new GrpclbClientLoadRecorder(fakeClock.getTimeProvider());
Subchannel subchannel = mock(Subchannel.class);
BackendEntry b1 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0001");
BackendEntry b2 = new BackendEntry(subchannel, loadRecorder, "LBTOKEN0002");
List<BackendEntry> pickList = Arrays.asList(b1, b2);
RoundRobinPicker picker = new RoundRobinPicker(Collections.<DropEntry>emptyList(), pickList);
PickSubchannelArgs args1 = mock(PickSubchannelArgs.class);
Metadata headers1 = new Metadata();
// The existing token on the headers will be replaced
headers1.put(GrpclbConstants.TOKEN_METADATA_KEY, "LBTOKEN__OLD");
when(args1.getHeaders()).thenReturn(headers1);
assertSame(b1.result, picker.pickSubchannel(args1));
verify(args1).getHeaders();
assertThat(headers1.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
PickSubchannelArgs args2 = mock(PickSubchannelArgs.class);
Metadata headers2 = new Metadata();
when(args2.getHeaders()).thenReturn(headers2);
assertSame(b2.result, picker.pickSubchannel(args2));
verify(args2).getHeaders();
assertThat(headers2.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0002");
PickSubchannelArgs args3 = mock(PickSubchannelArgs.class);
Metadata headers3 = new Metadata();
when(args3.getHeaders()).thenReturn(headers3);
assertSame(b1.result, picker.pickSubchannel(args3));
verify(args3).getHeaders();
assertThat(headers3.getAll(GrpclbConstants.TOKEN_METADATA_KEY)).containsExactly("LBTOKEN0001");
verify(subchannel, never()).getAttributes();
}
@Override
public Subchannel takeOrCreateSubchannel(
EquivalentAddressGroup eag, Attributes defaultAttributes) {
CacheEntry entry = cache.remove(eag);
Subchannel subchannel;
if (entry == null) {
subchannel = helper.createSubchannel(eag, defaultAttributes);
} else {
subchannel = entry.subchannel;
entry.shutdownTimer.cancel();
}
return subchannel;
}
HealthCheckState(
HelperImpl helperImpl,
Subchannel subchannel, SynchronizationContext syncContext,
ScheduledExecutorService timerService) {
this.helperImpl = checkNotNull(helperImpl, "helperImpl");
this.subchannel = checkNotNull(subchannel, "subchannel");
this.subchannelLogger = checkNotNull(subchannel.getChannelLogger(), "subchannelLogger");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.timerService = checkNotNull(timerService, "timerService");
}
@Test
public void subchannelExpireAfterReturned() {
Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1);
assertThat(subchannel1).isNotNull();
InOrder inOrder = Mockito.inOrder(helper);
inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture());
CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue();
assertThat(createSubchannelArgs.getAddresses()).containsExactly(EAG1);
assertThat(createSubchannelArgs.getAttributes()).isEqualTo(ATTRS1);
Subchannel subchannel2 = pool.takeOrCreateSubchannel(EAG2, ATTRS2);
assertThat(subchannel2).isNotNull();
assertThat(subchannel2).isNotSameInstanceAs(subchannel1);
inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture());
createSubchannelArgs = createSubchannelArgsCaptor.getValue();
assertThat(createSubchannelArgs.getAddresses()).containsExactly(EAG2);
assertThat(createSubchannelArgs.getAttributes()).isEqualTo(ATTRS2);
pool.returnSubchannel(subchannel1, READY_STATE);
// subchannel1 is 1ms away from expiration.
clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS);
verify(subchannel1, never()).shutdown();
pool.returnSubchannel(subchannel2, READY_STATE);
// subchannel1 expires. subchannel2 is (SHUTDOWN_TIMEOUT_MS - 1) away from expiration.
clock.forwardTime(1, MILLISECONDS);
verify(subchannel1).shutdown();
// subchanne2 expires.
clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS);
verify(subchannel2).shutdown();
assertThat(clock.numPendingTasks()).isEqualTo(0);
}
@Test
public void subchannelExpireAfterReturned() {
Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1);
assertThat(subchannel1).isNotNull();
verify(helper).createSubchannel(eq(Arrays.asList(EAG1)), same(ATTRS1));
Subchannel subchannel2 = pool.takeOrCreateSubchannel(EAG2, ATTRS2);
assertThat(subchannel2).isNotNull();
assertThat(subchannel2).isNotSameAs(subchannel1);
verify(helper).createSubchannel(eq(Arrays.asList(EAG2)), same(ATTRS2));
pool.returnSubchannel(subchannel1);
// subchannel1 is 1ms away from expiration.
clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS);
verify(subchannel1, never()).shutdown();
pool.returnSubchannel(subchannel2);
// subchannel1 expires. subchannel2 is (SHUTDOWN_TIMEOUT_MS - 1) away from expiration.
clock.forwardTime(1, MILLISECONDS);
verify(subchannel1).shutdown();
// subchanne2 expires.
clock.forwardTime(SHUTDOWN_TIMEOUT_MS - 1, MILLISECONDS);
verify(subchannel2).shutdown();
assertThat(clock.numPendingTasks()).isEqualTo(0);
}
@Test
public void returnDuplicateAddressSubchannel() {
Subchannel subchannel1 = pool.takeOrCreateSubchannel(EAG1, ATTRS1);
Subchannel subchannel2 = pool.takeOrCreateSubchannel(EAG1, ATTRS2);
Subchannel subchannel3 = pool.takeOrCreateSubchannel(EAG2, ATTRS1);
assertThat(subchannel1).isNotSameAs(subchannel2);
assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).isEmpty();
pool.returnSubchannel(subchannel2);
assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(1);
// If the subchannel being returned has an address that is the same as a subchannel in the pool,
// the returned subchannel will be shut down.
verify(subchannel1, never()).shutdown();
pool.returnSubchannel(subchannel1);
assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(1);
verify(subchannel1).shutdown();
pool.returnSubchannel(subchannel3);
assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(2);
// Returning the same subchannel twice has no effect.
pool.returnSubchannel(subchannel3);
assertThat(clock.getPendingTasks(SHUTDOWN_TASK_FILTER)).hasSize(2);
verify(subchannel2, never()).shutdown();
verify(subchannel3, never()).shutdown();
}
@Test
public void pickAfterStateChange() throws Exception {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(Attributes.EMPTY)
.build());
Subchannel subchannel = loadBalancer.getSubchannels().iterator().next();
Ref<ConnectivityStateInfo> subchannelStateInfo = subchannel.getAttributes().get(
STATE_INFO);
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), isA(EmptyPicker.class));
assertThat(subchannelStateInfo.value).isEqualTo(ConnectivityStateInfo.forNonError(IDLE));
deliverSubchannelState(subchannel,
ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertThat(pickerCaptor.getValue()).isInstanceOf(ReadyPicker.class);
assertThat(subchannelStateInfo.value).isEqualTo(
ConnectivityStateInfo.forNonError(READY));
Status error = Status.UNKNOWN.withDescription("¯\\_(ツ)_//¯");
deliverSubchannelState(subchannel,
ConnectivityStateInfo.forTransientFailure(error));
assertThat(subchannelStateInfo.value.getState()).isEqualTo(TRANSIENT_FAILURE);
assertThat(subchannelStateInfo.value.getStatus()).isEqualTo(error);
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
assertThat(pickerCaptor.getValue()).isInstanceOf(EmptyPicker.class);
deliverSubchannelState(subchannel,
ConnectivityStateInfo.forNonError(IDLE));
assertThat(subchannelStateInfo.value.getState()).isEqualTo(TRANSIENT_FAILURE);
assertThat(subchannelStateInfo.value.getStatus()).isEqualTo(error);
verify(subchannel, times(2)).requestConnection();
verify(mockHelper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verifyNoMoreInteractions(mockHelper);
}
private Subchannel createSubchannel(final int index, final Attributes attrs) {
final AtomicReference<Subchannel> returnedSubchannel = new AtomicReference<>();
syncContext.execute(new Runnable() {
@Override
public void run() {
Subchannel s = wrappedHelper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(eagLists[index])
.setAttributes(attrs)
.build());
s.start(mockStateListeners[index]);
returnedSubchannel.set(s);
}
});
return returnedSubchannel.get();
}
private void maybeUseFallbackBackends() {
if (balancerWorking) {
return;
}
if (usingFallbackBackends) {
return;
}
for (Subchannel subchannel : subchannels.values()) {
if (subchannel.getAttributes().get(STATE_INFO).get().getState() == READY) {
return;
}
}
// Fallback conditions met
useFallbackBackends();
}
@Test
public void noStickinessEnabled_withStickyHeader() {
loadBalancer.handleResolvedAddressGroups(servers, Attributes.EMPTY);
for (Subchannel subchannel : subchannels.values()) {
loadBalancer.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
}
verify(mockHelper, times(4))
.updateBalancingState(any(ConnectivityState.class), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
Key<String> stickinessKey = Key.of("my-sticky-key", Metadata.ASCII_STRING_MARSHALLER);
Metadata headerWithStickinessValue = new Metadata();
headerWithStickinessValue.put(stickinessKey, "my-sticky-value");
doReturn(headerWithStickinessValue).when(mockArgs).getHeaders();
List<Subchannel> allSubchannels = getList(picker);
Subchannel sc1 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc2 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc3 = picker.pickSubchannel(mockArgs).getSubchannel();
Subchannel sc4 = picker.pickSubchannel(mockArgs).getSubchannel();
assertEquals(nextSubchannel(sc1, allSubchannels), sc2);
assertEquals(nextSubchannel(sc2, allSubchannels), sc3);
assertEquals(nextSubchannel(sc3, allSubchannels), sc1);
assertEquals(sc4, sc1);
assertNull(loadBalancer.getStickinessMapForTest());
}
@Test
public void serviceConfigDisablesHealthCheckWhenRpcActive() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
verifyNoMoreInteractions(origLb);
Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
assertThat(subchannel).isSameAs(subchannels[0]);
InOrder inOrder = inOrder(origLb);
hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
inOrder.verify(origLb).handleSubchannelState(
same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
inOrder.verifyNoMoreInteractions();
HealthImpl healthImpl = healthImpls[0];
assertThat(healthImpl.calls).hasSize(1);
ServerSideCall serverCall = healthImpl.calls.poll();
assertThat(serverCall.cancelled).isFalse();
// NameResolver gives an update without service config, thus health check will be disabled
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY);
// Health check RPC cancelled.
assertThat(serverCall.cancelled).isTrue();
// Subchannel uses original state
inOrder.verify(origLb).handleSubchannelState(
same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
inOrder.verify(origLb).handleResolvedAddressGroups(
same(resolvedAddressList), same(Attributes.EMPTY));
verifyNoMoreInteractions(origLb);
assertThat(healthImpl.calls).isEmpty();
}