下面列出了怎么用io.grpc.ConnectivityState的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void handleNameResolutionError(final Status error) {
class ErrorPicker extends SubchannelPicker {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("error", error)
.toString();
}
}
if (routeLookupClient != null) {
routeLookupClient.close();
routeLookupClient = null;
lbPolicyConfiguration = null;
}
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker());
}
@Override
public void updateBalancingState(
final ConnectivityState newState, final SubchannelPicker newPicker) {
checkNotNull(newState, "newState");
checkNotNull(newPicker, "newPicker");
final class UpdateBalancingState implements Runnable {
@Override
public void run() {
if (LbHelperImpl.this != lbHelper) {
return;
}
updateSubchannelPicker(newPicker);
// It's not appropriate to report SHUTDOWN state from lb.
// Ignore the case of newState == SHUTDOWN for now.
if (newState != SHUTDOWN) {
channelLogger.log(ChannelLogLevel.INFO, "Entering {0} state", newState);
channelStateManager.gotoState(newState);
}
}
}
syncContext.execute(new UpdateBalancingState());
}
@Override
public void updateBalancingState(
final ConnectivityState newState, final SubchannelPicker newPicker) {
checkNotNull(newState, "newState");
checkNotNull(newPicker, "newPicker");
logWarningIfNotInSyncContext("updateBalancingState()");
final class UpdateBalancingState implements Runnable {
@Override
public void run() {
if (LbHelperImpl.this != lbHelper) {
return;
}
updateSubchannelPicker(newPicker);
// It's not appropriate to report SHUTDOWN state from lb.
// Ignore the case of newState == SHUTDOWN for now.
if (newState != SHUTDOWN) {
channelLogger.log(
ChannelLogLevel.INFO, "Entering {0} state with picker: {1}", newState, newPicker);
channelStateManager.gotoState(newState);
}
}
}
syncContext.execute(new UpdateBalancingState());
}
/**
* Connectivity state is changed to the specified value. Will trigger some notifications that have
* been registered earlier by {@link ManagedChannel#notifyWhenStateChanged}.
*/
void gotoState(@Nonnull ConnectivityState newState) {
checkNotNull(newState, "newState");
if (state != newState && state != ConnectivityState.SHUTDOWN) {
state = newState;
if (listeners.isEmpty()) {
return;
}
// Swap out callback list before calling them, because a callback may register new callbacks,
// if run in direct executor, can cause ConcurrentModificationException.
ArrayList<Listener> savedListeners = listeners;
listeners = new ArrayList<>();
for (Listener listener : savedListeners) {
listener.runInExecutor();
}
}
}
@Test
public void subchannelPickerInterceptedWithLoadRecording() {
List<EquivalentAddressGroup> backendAddrs = createResolvedBackendAddresses(2);
deliverResolvedAddresses(backendAddrs, "round_robin");
FakeLoadBalancer childBalancer = (FakeLoadBalancer) childBalancers.poll();
NoopSubchannel subchannel = childBalancer.subchannels.values().iterator().next();
deliverSubchannelState(subchannel, ConnectivityState.READY);
assertThat(loadRecorder.recording).isTrue();
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
assertThat(picker).isInstanceOf(LoadRecordingSubchannelPicker.class);
PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
ClientStreamTracer.Factory tracerFactory = result.getStreamTracerFactory();
assertThat(((LoadRecordingStreamTracerFactory) tracerFactory).getCounter())
.isSameInstanceAs(counter);
loadBalancer.shutdown();
assertThat(childBalancer.shutdown).isTrue();
assertThat(loadRecorder.recording).isFalse();
}
@Test
public void nameResolutionErrorWithActiveChannels() throws Exception {
final Subchannel readySubchannel = subchannels.values().iterator().next();
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));
verify(mockHelper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(mockHelper, times(3))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Iterator<ConnectivityState> stateIterator = stateCaptor.getAllValues().iterator();
assertEquals(CONNECTING, stateIterator.next());
assertEquals(READY, stateIterator.next());
assertEquals(TRANSIENT_FAILURE, stateIterator.next());
LoadBalancer.PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(readySubchannel, pickResult.getSubchannel());
assertEquals(Status.OK.getCode(), pickResult.getStatus().getCode());
LoadBalancer.PickResult pickResult2 = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(readySubchannel, pickResult2.getSubchannel());
verifyNoMoreInteractions(mockHelper);
}
@Test
public void pickAfterResolvedAndUnchanged() throws Exception {
loadBalancer.handleResolvedAddressGroups(servers, affinity);
verify(mockSubchannel).requestConnection();
loadBalancer.handleResolvedAddressGroups(servers, affinity);
verifyNoMoreInteractions(mockSubchannel);
verify(mockHelper).createSubchannel(anyListOf(EquivalentAddressGroup.class),
any(Attributes.class));
verify(mockHelper)
.updateBalancingState(isA(ConnectivityState.class), isA(SubchannelPicker.class));
// Updating the subchannel addresses is unnecessary, but doesn't hurt anything
verify(mockHelper).updateSubchannelAddresses(
eq(mockSubchannel), anyListOf(EquivalentAddressGroup.class));
verifyNoMoreInteractions(mockHelper);
}
@Test
public void nameResolutionSuccessAfterError() throws Exception {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));
inOrder.verify(mockHelper)
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
verify(mockSubchannel, never()).requestConnection();
loadBalancer.handleResolvedAddressGroups(servers, affinity);
inOrder.verify(mockHelper).createSubchannel(eq(servers), eq(Attributes.EMPTY));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection();
assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs)
.getSubchannel());
assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs),
pickerCaptor.getValue().pickSubchannel(mockArgs));
verifyNoMoreInteractions(mockHelper);
}
private Channel getChannel(final Endpoint endpoint) {
return this.managedChannelPool.computeIfAbsent(endpoint, ep -> {
final ManagedChannel ch = ManagedChannelBuilder.forAddress(ep.getIp(), ep.getPort()) //
.usePlaintext() //
.directExecutor() //
.build();
// channel connection event
ch.notifyWhenStateChanged(ConnectivityState.READY, () -> {
final ReplicatorGroup rpGroup = replicatorGroup;
if (rpGroup != null) {
Utils.runInThread(() -> {
final PeerId peer = new PeerId();
if (peer.parse(ep.toString())) {
LOG.info("Peer {} is connected.", peer);
rpGroup.checkReplicator(peer, true);
} else {
LOG.error("Fail to parse peer: {}.", ep);
}
});
}
});
return ch;
});
}
@Test
public void subchannelStateChange_updateChildPolicyWrapper() {
ChildPolicyWrapper childPolicyWrapper = factory.createOrGet("foo.google.com");
ChildPolicyReportingHelper childPolicyReportingHelper = childPolicyWrapper.getHelper();
FakeSubchannel fakeSubchannel = new FakeSubchannel();
when(helper.createSubchannel(any(CreateSubchannelArgs.class))).thenReturn(fakeSubchannel);
Subchannel subchannel =
childPolicyReportingHelper
.createSubchannel(
CreateSubchannelArgs.newBuilder()
.setAddresses(new EquivalentAddressGroup(mock(SocketAddress.class)))
.build());
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
// no-op
}
});
fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
assertThat(childPolicyWrapper.getConnectivityStateInfo())
.isEqualTo(ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
}
private ConnectivityState getAggregatedState() {
Set<ConnectivityState> states = EnumSet.noneOf(ConnectivityState.class);
for (Subchannel subchannel : getSubchannels()) {
states.add(getSubchannelStateInfoRef(subchannel).get().getState());
}
if (states.contains(READY)) {
return READY;
}
if (states.contains(CONNECTING)) {
return CONNECTING;
}
if (states.contains(IDLE)) {
return CONNECTING;
}
return TRANSIENT_FAILURE;
}
/**
* Creates a HealthIndicator based on the channels' {@link ConnectivityState}s from the underlying
* {@link GrpcChannelFactory}.
*
* @param factory The factory to derive the connectivity states from.
* @return A health indicator bean, that uses the following assumption
* <code>DOWN == states.contains(TRANSIENT_FAILURE)</code>.
*/
@Bean
@Lazy
public HealthIndicator grpcChannelHealthIndicator(final GrpcChannelFactory factory) {
return () -> {
final ImmutableMap<String, ConnectivityState> states = ImmutableMap.copyOf(factory.getConnectivityState());
final Health.Builder health;
if (states.containsValue(ConnectivityState.TRANSIENT_FAILURE)) {
health = Health.down();
} else {
health = Health.up();
}
return health.withDetails(states)
.build();
};
}
@Test
public void pickAfterResolvedAndUnchanged() throws Exception {
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockSubchannel).start(any(SubchannelStateListener.class));
verify(mockSubchannel).requestConnection();
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockSubchannel).updateAddresses(eq(servers));
verifyNoMoreInteractions(mockSubchannel);
verify(mockHelper).createSubchannel(createArgsCaptor.capture());
assertThat(createArgsCaptor.getValue()).isNotNull();
verify(mockHelper)
.updateBalancingState(isA(ConnectivityState.class), isA(SubchannelPicker.class));
// Updating the subchannel addresses is unnecessary, but doesn't hurt anything
verify(mockSubchannel).updateAddresses(ArgumentMatchers.<EquivalentAddressGroup>anyList());
verifyNoMoreInteractions(mockHelper);
}
@Override
public boolean isServerReachable() {
final ConnectivityState state = channel.getState(false);
switch (state) {
case READY:
case IDLE:
return true;
case CONNECTING:
case TRANSIENT_FAILURE:
case SHUTDOWN:
return false;
default:
log.error("Unrecognized channel connectivity state {}", state);
return false;
}
}
@Override
public void handleNameResolutionError(final Status error) {
class ErrorPicker extends SubchannelPicker {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(ErrorPicker.class).add("error", error).toString();
}
}
helper.updateBalancingState(
ConnectivityState.TRANSIENT_FAILURE,
new ErrorPicker());
}
@Nullable
private static ConnectivityState aggregateState(
@Nullable ConnectivityState overallState, ConnectivityState childState) {
if (overallState == null) {
return childState;
}
if (overallState == READY || childState == READY) {
return READY;
}
if (overallState == CONNECTING || childState == CONNECTING) {
return CONNECTING;
}
if (overallState == IDLE || childState == IDLE) {
return IDLE;
}
return overallState;
}
/**
* Make and use a picker out of the current lists and the states of subchannels if they have
* changed since the last picker created.
*/
private void maybeUpdatePicker() {
List<RoundRobinEntry> pickList = new ArrayList<>(backendList.size());
Status error = null;
boolean hasIdle = false;
for (BackendEntry entry : backendList) {
Subchannel subchannel = entry.result.getSubchannel();
Attributes attrs = subchannel.getAttributes();
ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get();
if (stateInfo.getState() == READY) {
pickList.add(entry);
} else if (stateInfo.getState() == TRANSIENT_FAILURE) {
error = stateInfo.getStatus();
} else if (stateInfo.getState() == IDLE) {
hasIdle = true;
}
}
ConnectivityState state;
if (pickList.isEmpty()) {
if (error != null && !hasIdle) {
pickList.add(new ErrorEntry(error));
state = TRANSIENT_FAILURE;
} else {
pickList.add(BUFFER_ENTRY);
state = CONNECTING;
}
} else {
state = READY;
}
maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList));
}
static State toState(ConnectivityState state) {
if (state == null) {
return State.UNKNOWN;
}
try {
return Enum.valueOf(State.class, state.name());
} catch (IllegalArgumentException e) {
return State.UNKNOWN;
}
}
/**
* Updates balancing state from one or more subchannels tracked in the {@link
* SubchannelStateManager}. The passed picker will be ignored, instead the picker which governs
* many subchannels/pickers will be reported to the parent load-balancer.
*/
@Override
public void updateBalancingState(
@Nonnull ConnectivityState newState,
@Nonnull SubchannelPicker unused) {
subchannelStateManager.updateState(target, newState);
super.updateBalancingState(subchannelStateManager.getAggregatedState(), picker);
}
/**
* Adds a listener for state change event.
*
* <p>The {@code executor} must be one that can run RPC call listeners.
*/
void notifyWhenStateChanged(Runnable callback, Executor executor, ConnectivityState source) {
checkNotNull(callback, "callback");
checkNotNull(executor, "executor");
checkNotNull(source, "source");
Listener stateChangeListener = new Listener(callback, executor);
if (state != source) {
stateChangeListener.runInExecutor();
} else {
listeners.add(stateChangeListener);
}
}
@Override
public LoadBalancer newLoadBalancer(final Helper helper) {
return new LoadBalancer() {
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
// TODO: make the picker accessible
helper.updateBalancingState(ConnectivityState.READY, mock(SubchannelPicker.class));
}
@Override
public void handleNameResolutionError(final Status error) {
class ErrorPicker extends SubchannelPicker {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error);
}
}
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker());
}
@Override
public void shutdown() {
}
};
}
@Override
public void handleNameResolutionError(final Status error) {
SubchannelPicker picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error.augmentDescription("handled by downstream balancer"));
}
};
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, picker);
}
@Test
public void childLoadBalancerHelper_shouldReportsSubchannelState() {
InOrder inOrder = Mockito.inOrder(helper);
String target1 = "foo.com";
ChildLoadBalancerHelper childLbHelper1 = provider.forTarget(target1);
SubchannelPicker picker1 = mock(SubchannelPicker.class);
String target2 = "bar.com";
ChildLoadBalancerHelper childLbHelper2 = provider.forTarget(target2);
SubchannelPicker picker2 = mock(SubchannelPicker.class);
assertThat(subchannelStateManager.getState(target1)).isNull();
assertThat(subchannelStateManager.getState(target2)).isNull();
childLbHelper1.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, picker1);
inOrder.verify(helper).updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, picker);
assertThat(subchannelStateManager.getState(target1))
.isEqualTo(ConnectivityState.TRANSIENT_FAILURE);
childLbHelper2.updateBalancingState(ConnectivityState.CONNECTING, picker2);
inOrder.verify(helper).updateBalancingState(ConnectivityState.CONNECTING, picker);
assertThat(subchannelStateManager.getState(target2)).isEqualTo(ConnectivityState.CONNECTING);
childLbHelper1.updateBalancingState(ConnectivityState.READY, picker1);
inOrder.verify(helper).updateBalancingState(ConnectivityState.READY, picker);
assertThat(subchannelStateManager.getState(target1)).isEqualTo(ConnectivityState.READY);
childLbHelper1.updateBalancingState(ConnectivityState.SHUTDOWN, picker1);
inOrder.verify(helper).updateBalancingState(ConnectivityState.CONNECTING, picker);
assertThat(subchannelStateManager.getState(target1)).isNull();
}
private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
ConnectivityState currentState = stateInfo.getState();
if (currentState == SHUTDOWN) {
return;
}
SubchannelPicker picker;
switch (currentState) {
case IDLE:
picker = new RequestConnectionPicker(subchannel);
break;
case CONNECTING:
// It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave
// the current picker in-place. But ignoring the potential optimization is simpler.
picker = new Picker(PickResult.withNoResult());
break;
case READY:
picker = new Picker(PickResult.withSubchannel(subchannel));
break;
case TRANSIENT_FAILURE:
picker = new Picker(PickResult.withError(stateInfo.getStatus()));
break;
default:
throw new IllegalArgumentException("Unsupported state:" + currentState);
}
helper.updateBalancingState(currentState, picker);
}
@Test
public void toState() {
for (ConnectivityState connectivityState : ConnectivityState.values()) {
assertEquals(
connectivityState.name(),
ChannelzProtoUtil.toState(connectivityState).getValueDescriptor().getName());
}
assertEquals(State.UNKNOWN, ChannelzProtoUtil.toState(null));
}
@Override
public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
final class NotifyStateChanged implements Runnable {
@Override
public void run() {
channelStateManager.notifyWhenStateChanged(callback, executor, source);
}
}
syncContext.execute(new NotifyStateChanged());
}
private boolean checkChannel(final Endpoint endpoint) {
final ManagedChannel ch = this.managedChannelPool.get(endpoint);
if (ch == null) {
return false;
}
final ConnectivityState st = ch.getState(true);
return st == ConnectivityState.CONNECTING || st == ConnectivityState.READY || st == ConnectivityState.IDLE;
}
@Override
public boolean isConnected() {
if (channel == null) {
return false;
}
ConnectivityState connectivityState = channel.getState(false);
return connectivityState == ConnectivityState.READY;
}
private boolean channelIsReadyOrIdle() {
if (channel == null) {
return false;
}
ConnectivityState connectivityState = channel.getState(false);
return connectivityState == ConnectivityState.READY
|| connectivityState
== ConnectivityState.IDLE; // Since a new RPC would take the channel out of idle mode
}
private static void deliverSubchannelState(
final NoopSubchannel subchannel, ConnectivityState state) {
SubchannelPicker picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withSubchannel(subchannel);
}
};
subchannel.helper.updateBalancingState(state, picker);
}