下面列出了怎么用io.grpc.ConnectivityStateInfo的API类实例代码及写法,或者点击链接到github查看源代码。
void handleResponse(HealthCheckResponse response) {
callHasResponded = true;
backoffPolicy = null;
ServingStatus status = response.getStatus();
// running == true means the Subchannel's state (rawState) is READY
if (Objects.equal(status, ServingStatus.SERVING)) {
subchannelLogger.log(ChannelLogLevel.INFO, "READY: health-check responded SERVING");
gotoState(ConnectivityStateInfo.forNonError(READY));
} else {
subchannelLogger.log(
ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check responded {0}", status);
gotoState(
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription(
"Health-check service responded "
+ status + " for '" + callServiceName + "'")));
}
call.request(1);
}
private void startRpc() {
checkState(serviceName != null, "serviceName is null");
checkState(activeRpc == null, "previous health-checking RPC has not been cleaned up");
checkState(subchannel != null, "init() not called");
// Optimization suggested by @markroth: if we are already READY and starting the health
// checking RPC, either because health check is just enabled or has switched to a new service
// name, we don't go to CONNECTING, otherwise there will be artificial delays on RPCs
// waiting for the health check to respond.
if (!Objects.equal(concludedState.getState(), READY)) {
subchannelLogger.log(
ChannelLogLevel.INFO, "CONNECTING: Starting health-check for \"{0}\"", serviceName);
gotoState(ConnectivityStateInfo.forNonError(CONNECTING));
}
activeRpc = new HcStream();
activeRpc.start();
}
@Test
public void nameResolutionErrorWithActiveChannels() throws Exception {
final Subchannel readySubchannel = subchannels.values().iterator().next();
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));
verify(mockHelper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(mockHelper, times(3))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Iterator<ConnectivityState> stateIterator = stateCaptor.getAllValues().iterator();
assertEquals(CONNECTING, stateIterator.next());
assertEquals(READY, stateIterator.next());
assertEquals(TRANSIENT_FAILURE, stateIterator.next());
LoadBalancer.PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(readySubchannel, pickResult.getSubchannel());
assertEquals(Status.OK.getCode(), pickResult.getStatus().getCode());
LoadBalancer.PickResult pickResult2 = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(readySubchannel, pickResult2.getSubchannel());
verifyNoMoreInteractions(mockHelper);
}
@Test
public void requestConnectionPicker() throws Exception {
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
InOrder inOrder = inOrder(mockHelper, mockSubchannel);
inOrder.verify(mockSubchannel).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
inOrder.verify(mockSubchannel).requestConnection();
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(IDLE));
inOrder.verify(mockHelper).updateBalancingState(eq(IDLE), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
// Calling pickSubchannel() twice gave the same result
assertEquals(picker.pickSubchannel(mockArgs), picker.pickSubchannel(mockArgs));
// But the picker calls requestConnection() only once
inOrder.verify(mockSubchannel).requestConnection();
verify(mockSubchannel, times(2)).requestConnection();
}
@Test
public void reportingNotStartedUntilConfigured() {
createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0])
.onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
assertThat(orcaServiceImps[0].calls).isEmpty();
assertThat(subchannels[0].logs).isEmpty();
setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs,
"DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
assertThat(orcaServiceImps[0].calls.peek().request)
.isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
}
@Test
public void nameResolutionErrorWithStateChanges() throws Exception {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleSubchannelState(mockSubchannel,
ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
Status error = Status.NOT_FOUND.withDescription("nameResolutionError");
loadBalancer.handleNameResolutionError(error);
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(null, pickResult.getSubchannel());
assertEquals(error, pickResult.getStatus());
loadBalancer.handleSubchannelState(mockSubchannel, ConnectivityStateInfo.forNonError(READY));
Status error2 = Status.NOT_FOUND.withDescription("nameResolutionError2");
loadBalancer.handleNameResolutionError(error2);
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(null, pickResult.getSubchannel());
assertEquals(error2, pickResult.getStatus());
verifyNoMoreInteractions(mockHelper);
}
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
final Subchannel subchannel = super.createSubchannel(args);
return new ForwardingSubchannel() {
@Override
protected Subchannel delegate() {
return subchannel;
}
@Override
public void start(final SubchannelStateListener listener) {
super.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
setConnectivityStateInfo(newState);
listener.onSubchannelState(newState);
}
});
}
};
}
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
ConnectivityState currentState = stateInfo.getState();
if (subchannel != this.subchannel || currentState == SHUTDOWN) {
return;
}
PickResult pickResult;
switch (currentState) {
case CONNECTING:
pickResult = PickResult.withNoResult();
break;
case READY:
case IDLE:
pickResult = PickResult.withSubchannel(subchannel);
break;
case TRANSIENT_FAILURE:
pickResult = PickResult.withError(stateInfo.getStatus());
break;
default:
throw new IllegalArgumentException("Unsupported state:" + currentState);
}
helper.updateBalancingState(currentState, new Picker(pickResult));
}
@Override
public void returnSubchannel(Subchannel subchannel, ConnectivityStateInfo lastKnownState) {
CacheEntry prev = cache.get(subchannel.getAddresses());
if (prev != null) {
// Returning the same Subchannel twice has no effect.
// Returning a different Subchannel for an already cached EAG will cause the
// latter Subchannel to be shutdown immediately.
if (prev.subchannel != subchannel) {
subchannel.shutdown();
}
return;
}
final ShutdownSubchannelTask shutdownTask = new ShutdownSubchannelTask(subchannel);
ScheduledHandle shutdownTimer =
helper.getSynchronizationContext().schedule(
shutdownTask, SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS,
helper.getScheduledExecutorService());
CacheEntry entry = new CacheEntry(subchannel, shutdownTimer, lastKnownState);
cache.put(subchannel.getAddresses(), entry);
}
void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
if (newState.getState() == SHUTDOWN || !subchannels.values().contains(subchannel)) {
return;
}
if (newState.getState() == IDLE) {
subchannel.requestConnection();
}
subchannel.getAttributes().get(STATE_INFO).set(newState);
maybeUseFallbackBackends();
maybeUpdatePicker();
}
/**
* Make and use a picker out of the current lists and the states of subchannels if they have
* changed since the last picker created.
*/
private void maybeUpdatePicker() {
List<RoundRobinEntry> pickList = new ArrayList<>(backendList.size());
Status error = null;
boolean hasIdle = false;
for (BackendEntry entry : backendList) {
Subchannel subchannel = entry.result.getSubchannel();
Attributes attrs = subchannel.getAttributes();
ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get();
if (stateInfo.getState() == READY) {
pickList.add(entry);
} else if (stateInfo.getState() == TRANSIENT_FAILURE) {
error = stateInfo.getStatus();
} else if (stateInfo.getState() == IDLE) {
hasIdle = true;
}
}
ConnectivityState state;
if (pickList.isEmpty()) {
if (error != null && !hasIdle) {
pickList.add(new ErrorEntry(error));
state = TRANSIENT_FAILURE;
} else {
pickList.add(BUFFER_ENTRY);
state = CONNECTING;
}
} else {
state = READY;
}
maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList));
}
private void deliverSubchannelState(
final Subchannel subchannel, final ConnectivityStateInfo newState) {
syncContext.execute(new Runnable() {
@Override
public void run() {
balancer.handleSubchannelState(subchannel, newState);
}
});
}
@Override
public void handleSubchannelState(
Subchannel subchannel, ConnectivityStateInfo stateInfo) {
HealthCheckState hcState =
checkNotNull(subchannel.getAttributes().get(KEY_HEALTH_CHECK_STATE), "hcState");
hcState.updateRawState(stateInfo);
if (Objects.equal(stateInfo.getState(), SHUTDOWN)) {
helper.hcStates.remove(hcState);
}
}
@Override
public void shutdown() {
super.shutdown();
helper.balancerShutdown = true;
for (HealthCheckState hcState : helper.hcStates) {
// ManagedChannel will stop calling handleSubchannelState() after shutdown() is called,
// which is required by LoadBalancer API semantics. We need to deliver the final SHUTDOWN
// signal to health checkers so that they can cancel the streams.
hcState.updateRawState(ConnectivityStateInfo.forNonError(SHUTDOWN));
}
helper.hcStates.clear();
}
@Override
@Deprecated
public void handleSubchannelState(
Subchannel subchannel, ConnectivityStateInfo stateInfo) {
throw new UnsupportedOperationException(
"handleSubchannelState() is not supported by " + this.getClass().getName());
}
private void gotoState(ConnectivityStateInfo newState) {
checkState(subchannel != null, "init() not called");
if (!helperImpl.balancerShutdown && !Objects.equal(concludedState, newState)) {
concludedState = newState;
delegate.handleSubchannelState(subchannel, concludedState);
}
}
void handleStreamClosed(Status status) {
if (Objects.equal(status.getCode(), Code.UNIMPLEMENTED)) {
disabled = true;
subchannelLogger.log(ChannelLogLevel.ERROR, "Health-check disabled: {0}", status);
subchannelLogger.log(ChannelLogLevel.INFO, "{0} (no health-check)", rawState);
gotoState(rawState);
return;
}
long delayNanos = 0;
subchannelLogger.log(
ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check stream closed with {0}", status);
gotoState(
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription(
"Health-check stream unexpectedly closed with "
+ status + " for '" + callServiceName + "'")));
// Use backoff only when server has not responded for the previous call
if (!callHasResponded) {
if (backoffPolicy == null) {
backoffPolicy = backoffPolicyProvider.get();
}
delayNanos =
callCreationNanos + backoffPolicy.nextBackoffNanos() - time.currentTimeNanos();
}
if (delayNanos <= 0) {
startRpc();
} else {
checkState(!isRetryTimerPending(), "Retry double scheduled");
subchannelLogger.log(
ChannelLogLevel.DEBUG, "Will retry health-check after {0} ns", delayNanos);
retryTimer = syncContext.schedule(
retryTask, delayNanos, TimeUnit.NANOSECONDS, timerService);
}
}
private void verifyRetryAfterNanos(
InOrder inOrder, Subchannel subchannel, HealthImpl impl, long nanos) {
assertThat(impl.calls).isEmpty();
clock.forwardNanos(nanos - 1);
assertThat(impl.calls).isEmpty();
inOrder.verifyNoMoreInteractions();
verifyNoMoreInteractions(origLb);
clock.forwardNanos(1);
inOrder.verify(origLb).handleSubchannelState(
same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
assertThat(impl.calls).hasSize(1);
}
@Test
public void serviceConfigHasNoHealthCheckingInitiallyButDoesLater() {
// No service config, thus no health check.
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(Attributes.EMPTY));
verifyNoMoreInteractions(origLb);
// First, create Subchannels 0
createSubchannel(0, Attributes.EMPTY);
// No health check activity. Underlying Subchannel states are directly propagated
hcLbEventDelivery.handleSubchannelState(
subchannels[0], ConnectivityStateInfo.forNonError(READY));
assertThat(healthImpls[0].calls).isEmpty();
verify(origLb).handleSubchannelState(
same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(READY)));
verifyNoMoreInteractions(origLb);
// Service config enables health check
Attributes resolutionAttrs = attrsWithHealthCheckService("FooService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
verify(origLb).handleResolvedAddressGroups(
same(resolvedAddressList), same(resolutionAttrs));
// Health check started on existing Subchannel
assertThat(healthImpls[0].calls).hasSize(1);
// State stays in READY, instead of switching to CONNECTING.
verifyNoMoreInteractions(origLb);
// Start Subchannel 1, which will have health check
createSubchannel(1, Attributes.EMPTY);
assertThat(healthImpls[1].calls).isEmpty();
hcLbEventDelivery.handleSubchannelState(
subchannels[1], ConnectivityStateInfo.forNonError(READY));
assertThat(healthImpls[1].calls).hasSize(1);
}
@Test
public void policiesReceiveSameReportIndependently() {
createSubchannel(childHelperWrapper.asHelper(), 0, Attributes.EMPTY);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
// No helper sets ORCA reporting interval, so load reporting is not started.
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
assertThat(orcaServiceImps[0].calls).isEmpty();
assertThat(subchannels[0].logs).isEmpty();
// Parent helper requests ORCA reports with a certain interval, load reporting starts.
setOrcaReportConfig(parentHelperWrapper, SHORT_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs,
"DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
OrcaLoadReport report = OrcaLoadReport.getDefaultInstance();
assertThat(orcaServiceImps[0].calls).hasSize(1);
orcaServiceImps[0].calls.peek().responseObserver.onNext(report);
assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report);
// Only parent helper's listener receives the report.
ArgumentCaptor<OrcaLoadReport> parentReportCaptor = ArgumentCaptor.forClass(null);
verify(mockOrcaListener1).onLoadReport(parentReportCaptor.capture());
assertThat(parentReportCaptor.getValue()).isEqualTo(report);
verifyNoMoreInteractions(mockOrcaListener2);
// Now child helper also wants to receive reports.
setOrcaReportConfig(childHelperWrapper, SHORT_INTERVAL_CONFIG);
orcaServiceImps[0].calls.peek().responseObserver.onNext(report);
assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report);
// Both helper receives the same report instance.
ArgumentCaptor<OrcaLoadReport> childReportCaptor = ArgumentCaptor.forClass(null);
verify(mockOrcaListener1, times(2))
.onLoadReport(parentReportCaptor.capture());
verify(mockOrcaListener2)
.onLoadReport(childReportCaptor.capture());
assertThat(childReportCaptor.getValue()).isSameInstanceAs(parentReportCaptor.getValue());
}
@Test
public void serviceConfigDisablesHealthCheckWhenRpcInactive() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
verifyNoMoreInteractions(origLb);
Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
assertThat(subchannel).isSameAs(subchannels[0]);
InOrder inOrder = inOrder(origLb);
// Underlying subchannel is not READY initially
ConnectivityStateInfo underlyingErrorState =
ConnectivityStateInfo.forTransientFailure(
Status.UNAVAILABLE.withDescription("connection refused"));
hcLbEventDelivery.handleSubchannelState(subchannel, underlyingErrorState);
inOrder.verify(origLb).handleSubchannelState(same(subchannel), same(underlyingErrorState));
inOrder.verifyNoMoreInteractions();
// NameResolver gives an update without service config, thus health check will be disabled
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY);
inOrder.verify(origLb).handleResolvedAddressGroups(
same(resolvedAddressList), same(Attributes.EMPTY));
// Underlying subchannel is now ready
hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
// Since health check is disabled, READY state is propagated directly.
inOrder.verify(origLb).handleSubchannelState(
same(subchannel), eq(ConnectivityStateInfo.forNonError(READY)));
// and there is no health check activity.
assertThat(healthImpls[0].calls).isEmpty();
verifyNoMoreInteractions(origLb);
}
void handleSubchannelStateChange(final ConnectivityStateInfo newState) {
channelTracer.reportEvent(
new ChannelTrace.Event.Builder()
.setDescription("Entering " + newState.getState() + " state")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timeProvider.currentTimeNanos())
.build());
switch (newState.getState()) {
case READY:
case IDLE:
delayedTransport.reprocess(subchannelPicker);
break;
case TRANSIENT_FAILURE:
final class OobErrorPicker extends SubchannelPicker {
final PickResult errorResult = PickResult.withError(newState.getStatus());
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return errorResult;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(OobErrorPicker.class)
.add("errorResult", errorResult)
.toString();
}
}
delayedTransport.reprocess(new OobErrorPicker());
break;
default:
// Do nothing
}
}
@Test
public void balancerShutdown() {
Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService");
hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs);
verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs));
verifyNoMoreInteractions(origLb);
Subchannel subchannel = createSubchannel(0, Attributes.EMPTY);
assertThat(subchannel).isSameAs(subchannels[0]);
// Trigger the health check
hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY));
HealthImpl healthImpl = healthImpls[0];
assertThat(healthImpl.calls).hasSize(1);
ServerSideCall serverCall = healthImpl.calls.poll();
assertThat(serverCall.cancelled).isFalse();
verify(origLb).handleSubchannelState(
same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING)));
// Shut down the balancer
hcLbEventDelivery.shutdown();
verify(origLb).shutdown();
// Health check stream should be cancelled
assertThat(serverCall.cancelled).isTrue();
// LoadBalancer API requires no more callbacks on LoadBalancer after shutdown() is called.
verifyNoMoreInteractions(origLb);
// No more health check call is made or scheduled
assertThat(healthImpl.calls).isEmpty();
assertThat(clock.getPendingTasks()).isEmpty();
}
@Test
public void reportWithMostFrequentIntervalRequested() {
setOrcaReportConfig(parentHelperWrapper, SHORT_INTERVAL_CONFIG);
setOrcaReportConfig(childHelperWrapper, LONG_INTERVAL_CONFIG);
createSubchannel(childHelperWrapper.asHelper(), 0, Attributes.EMPTY);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs,
"DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
// The real report interval to be requested is the minimum of intervals requested by helpers.
assertThat(Durations.toNanos(orcaServiceImps[0].calls.peek().request.getReportInterval()))
.isEqualTo(SHORT_INTERVAL_CONFIG.getReportIntervalNanos());
// Child helper wants reporting to be more frequent than its current setting while it is still
// less frequent than parent helper. Nothing should happen on existing RPC.
setOrcaReportConfig(childHelperWrapper, MEDIUM_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls.peek().cancelled).isFalse();
assertThat(subchannels[0].logs).isEmpty();
// Parent helper wants reporting to be less frequent.
setOrcaReportConfig(parentHelperWrapper, MEDIUM_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue();
assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs,
"DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
// ORCA reporting RPC restarts and the the real report interval is adjusted.
assertThat(Durations.toNanos(orcaServiceImps[0].calls.poll().request.getReportInterval()))
.isEqualTo(MEDIUM_INTERVAL_CONFIG.getReportIntervalNanos());
}
@Override
public void onSubchannelState(ConnectivityStateInfo rawState) {
if (Objects.equal(this.rawState.getState(), READY)
&& !Objects.equal(rawState.getState(), READY)) {
// A connection was lost. We will reset disabled flag because health check
// may be available on the new connection.
disabled = false;
}
if (Objects.equal(rawState.getState(), SHUTDOWN)) {
helperImpl.hcStates.remove(this);
}
this.rawState = rawState;
adjustHealthCheck();
}
@Test
public void pickAfterResolved() throws Exception {
final Subchannel readySubchannel = subchannels.values().iterator().next();
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
verify(mockHelper, times(3)).createSubchannel(createArgsCaptor.capture());
List<List<EquivalentAddressGroup>> capturedAddrs = new ArrayList<>();
for (CreateSubchannelArgs arg : createArgsCaptor.getAllValues()) {
capturedAddrs.add(arg.getAddresses());
}
assertThat(capturedAddrs).containsAtLeastElementsIn(subchannels.keySet());
for (Subchannel subchannel : subchannels.values()) {
verify(subchannel).requestConnection();
verify(subchannel, never()).shutdown();
}
verify(mockHelper, times(2))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
assertEquals(CONNECTING, stateCaptor.getAllValues().get(0));
assertEquals(READY, stateCaptor.getAllValues().get(1));
assertThat(getList(pickerCaptor.getValue())).containsExactly(readySubchannel);
verifyNoMoreInteractions(mockHelper);
}
private void gotoState(final ConnectivityStateInfo newState) {
syncContext.throwIfNotInThisSynchronizationContext();
if (state.getState() != newState.getState()) {
Preconditions.checkState(state.getState() != SHUTDOWN,
"Cannot transition out of SHUTDOWN to " + newState);
state = newState;
callback.onStateChange(InternalSubchannel.this, newState);
}
}
@Override
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
ConnectivityState currentState = stateInfo.getState();
if (currentState == SHUTDOWN) {
return;
}
EquivalentAddressGroup addressGroup = subchannel.getAddresses();
Subchannel theSubchannel = subchannels.get(addressGroup);
if (theSubchannel == null) {
return;
}
if (theSubchannel != currentSubchannel) {
return;
}
SubchannelPicker picker;
switch (currentState) {
case IDLE:
picker = new RequestConnectionPicker(subchannel);
break;
case CONNECTING:
// It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave
// the current picker in-place. But ignoring the potential optimization is simpler.
picker = new Picker(PickResult.withNoResult());
break;
case READY:
picker = new Picker(PickResult.withSubchannel(subchannel));
break;
case TRANSIENT_FAILURE:
picker = new Picker(PickResult.withError(stateInfo.getStatus()));
break;
default:
throw new IllegalArgumentException("Unsupported state:" + currentState);
}
helper.updateBalancingState(currentState, picker);
}
private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
ConnectivityState currentState = stateInfo.getState();
if (currentState == SHUTDOWN) {
return;
}
SubchannelPicker picker;
switch (currentState) {
case IDLE:
picker = new RequestConnectionPicker(subchannel);
break;
case CONNECTING:
// It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave
// the current picker in-place. But ignoring the potential optimization is simpler.
picker = new Picker(PickResult.withNoResult());
break;
case READY:
picker = new Picker(PickResult.withSubchannel(subchannel));
break;
case TRANSIENT_FAILURE:
picker = new Picker(PickResult.withError(stateInfo.getStatus()));
break;
default:
throw new IllegalArgumentException("Unsupported state:" + currentState);
}
helper.updateBalancingState(currentState, picker);
}
@Test
public void updateReportingIntervalWhenRpcPendingRetry() {
createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY);
setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs,
"DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
assertThat(orcaServiceImps[0].calls.peek().request)
.isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
// Server closes the RPC without response, client will retry with backoff.
assertThat(fakeClock.getPendingTasks()).isEmpty();
orcaServiceImps[0].calls.poll().responseObserver.onCompleted();
assertLog(subchannels[0].logs,
"DEBUG: ORCA reporting stream closed with " + Status.OK + ", backoff in 11"
+ " ns");
assertThat(fakeClock.getPendingTasks()).hasSize(1);
assertThat(orcaServiceImps[0].calls).isEmpty();
// Make reporting less frequent.
setOrcaReportConfig(orcaHelperWrapper, LONG_INTERVAL_CONFIG);
// Retry task will be canceled and restarts new RPC immediately.
assertThat(fakeClock.getPendingTasks()).isEmpty();
assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs,
"DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
assertThat(orcaServiceImps[0].calls.peek().request)
.isEqualTo(buildOrcaRequestFromConfig(LONG_INTERVAL_CONFIG));
}