下面列出了怎么用io.grpc.LoadBalancer.PickResult的API类实例代码及写法,或者点击链接到github查看源代码。
@VisibleForTesting
void panic(final Throwable t) {
if (panicMode) {
// Preserve the first panic information
return;
}
panicMode = true;
cancelIdleTimer(/* permanent= */ true);
shutdownNameResolverAndLoadBalancer(false);
final class PanicSubchannelPicker extends SubchannelPicker {
private final PickResult panicPickResult =
PickResult.withDrop(
Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return panicPickResult;
}
}
updateSubchannelPicker(new PanicSubchannelPicker());
channelLogger.log(ChannelLogLevel.ERROR, "PANIC! Entering TRANSIENT_FAILURE");
channelStateManager.gotoState(TRANSIENT_FAILURE);
}
void handleSubchannelStateChange(final ConnectivityStateInfo newState) {
channelTracer.reportEvent(
new ChannelTrace.Event.Builder()
.setDescription("Entering " + newState.getState() + " state")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timeProvider.currentTimeNanos())
.build());
switch (newState.getState()) {
case READY:
case IDLE:
delayedTransport.reprocess(subchannelPicker);
break;
case TRANSIENT_FAILURE:
delayedTransport.reprocess(new SubchannelPicker() {
final PickResult errorResult = PickResult.withError(newState.getStatus());
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return errorResult;
}
});
break;
default:
// Do nothing
}
}
@Test
public void newStream_racesWithReprocessIdleMode() throws Exception {
SubchannelPicker picker = new SubchannelPicker() {
@Override public PickResult pickSubchannel(PickSubchannelArgs args) {
// Assume entering idle mode raced with the pick
delayedTransport.reprocess(null);
// Act like IDLE LB
return PickResult.withNoResult();
}
};
// Because there is no pending stream yet, it will do nothing but save the picker.
delayedTransport.reprocess(picker);
ClientStream stream = delayedTransport.newStream(method, headers, callOptions);
stream.start(streamListener);
assertTrue(delayedTransport.hasPendingStreams());
verify(transportListener).transportInUse(true);
}
@Test
public void clusterWatcher_resourceNotExist() {
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(Attributes.newBuilder()
.set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
.build())
.setLoadBalancingPolicyConfig(new CdsConfig("foo.googleapis.com"))
.build();
cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);
ArgumentCaptor<ClusterWatcher> clusterWatcherCaptor = ArgumentCaptor.forClass(null);
verify(xdsClient).watchClusterData(eq("foo.googleapis.com"), clusterWatcherCaptor.capture());
ClusterWatcher clusterWatcher = clusterWatcherCaptor.getValue();
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
clusterWatcher.onResourceDoesNotExist("foo.googleapis.com");
assertThat(edsLoadBalancers).isEmpty();
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("Resource foo.googleapis.com is unavailable");
}
@Test
public void reprocess_NoPendingStream() {
SubchannelPicker picker = mock(SubchannelPicker.class);
AbstractSubchannel subchannel = mock(AbstractSubchannel.class);
when(subchannel.obtainActiveTransport()).thenReturn(mockRealTransport);
when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel));
when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class),
any(CallOptions.class))).thenReturn(mockRealStream);
delayedTransport.reprocess(picker);
verifyNoMoreInteractions(picker);
verifyNoMoreInteractions(transportListener);
// Though picker was not originally used, it will be saved and serve future streams.
ClientStream stream = delayedTransport.newStream(method, headers, CallOptions.DEFAULT);
verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT));
verify(subchannel).obtainActiveTransport();
assertSame(mockRealStream, stream);
}
@Test
public void subchannelPickerInterceptedWithLoadRecording() {
List<EquivalentAddressGroup> backendAddrs = createResolvedBackendAddresses(2);
deliverResolvedAddresses(backendAddrs, "round_robin");
FakeLoadBalancer childBalancer = (FakeLoadBalancer) childBalancers.poll();
NoopSubchannel subchannel = childBalancer.subchannels.values().iterator().next();
deliverSubchannelState(subchannel, ConnectivityState.READY);
assertThat(loadRecorder.recording).isTrue();
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
verify(helper).updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
assertThat(picker).isInstanceOf(LoadRecordingSubchannelPicker.class);
PickResult result = picker.pickSubchannel(mock(PickSubchannelArgs.class));
ClientStreamTracer.Factory tracerFactory = result.getStreamTracerFactory();
assertThat(((LoadRecordingStreamTracerFactory) tracerFactory).getCounter())
.isSameInstanceAs(counter);
loadBalancer.shutdown();
assertThat(childBalancer.shutdown).isTrue();
assertThat(loadRecorder.recording).isFalse();
}
@SuppressWarnings("unchecked")
@Test
public void nameResolutionErrorWithActiveChannels() throws Exception {
final Subchannel readySubchannel = subchannels.values().iterator().next();
loadBalancer.handleResolvedAddressGroups(servers, affinity);
loadBalancer.handleSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));
verify(mockHelper, times(3)).createSubchannel(any(List.class), any(Attributes.class));
verify(mockHelper, times(3))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Iterator<ConnectivityState> stateIterator = stateCaptor.getAllValues().iterator();
assertEquals(CONNECTING, stateIterator.next());
assertEquals(READY, stateIterator.next());
assertEquals(TRANSIENT_FAILURE, stateIterator.next());
LoadBalancer.PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(readySubchannel, pickResult.getSubchannel());
assertEquals(Status.OK.getCode(), pickResult.getStatus().getCode());
LoadBalancer.PickResult pickResult2 = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(readySubchannel, pickResult2.getSubchannel());
verifyNoMoreInteractions(mockHelper);
}
@Test
public void reprocess_NoPendingStream() {
SubchannelPicker picker = mock(SubchannelPicker.class);
AbstractSubchannel subchannel = mock(AbstractSubchannel.class);
when(subchannel.getInternalSubchannel()).thenReturn(mockInternalSubchannel);
when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel));
when(mockRealTransport.newStream(any(MethodDescriptor.class), any(Metadata.class),
any(CallOptions.class))).thenReturn(mockRealStream);
delayedTransport.reprocess(picker);
verifyNoMoreInteractions(picker);
verifyNoMoreInteractions(transportListener);
// Though picker was not originally used, it will be saved and serve future streams.
ClientStream stream = delayedTransport.newStream(method, headers, CallOptions.DEFAULT);
verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT));
verify(mockInternalSubchannel).obtainActiveTransport();
assertSame(mockRealStream, stream);
}
@Test
public void roundRobinPickerWithIdleEntry_noDrop() {
Subchannel subchannel = mock(Subchannel.class);
IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext);
RoundRobinPicker picker =
new RoundRobinPicker(Collections.<DropEntry>emptyList(), Collections.singletonList(entry));
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
verify(subchannel, never()).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
verify(subchannel).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
// Only the first pick triggers requestConnection()
verify(subchannel).requestConnection();
}
@Test
public void roundRobinPickerWithIdleEntry_andDrop() {
GrpclbClientLoadRecorder loadRecorder =
new GrpclbClientLoadRecorder(fakeClock.getTimeProvider());
// 1 out of 2 requests are to be dropped
DropEntry d = new DropEntry(loadRecorder, "LBTOKEN0003");
List<DropEntry> dropList = Arrays.asList(null, d);
Subchannel subchannel = mock(Subchannel.class);
IdleSubchannelEntry entry = new IdleSubchannelEntry(subchannel, syncContext);
RoundRobinPicker picker = new RoundRobinPicker(dropList, Collections.singletonList(entry));
PickSubchannelArgs args = mock(PickSubchannelArgs.class);
verify(subchannel, never()).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
verify(subchannel).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(DROP_PICK_RESULT);
verify(subchannel).requestConnection();
assertThat(picker.pickSubchannel(args)).isSameInstanceAs(PickResult.withNoResult());
// Only the first pick triggers requestConnection()
verify(subchannel).requestConnection();
}
@Test
public void nameResolutionErrorWithActiveChannels() throws Exception {
final Subchannel readySubchannel = subchannels.values().iterator().next();
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
deliverSubchannelState(readySubchannel, ConnectivityStateInfo.forNonError(READY));
loadBalancer.handleNameResolutionError(Status.NOT_FOUND.withDescription("nameResolutionError"));
verify(mockHelper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
verify(mockHelper, times(3))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
Iterator<ConnectivityState> stateIterator = stateCaptor.getAllValues().iterator();
assertEquals(CONNECTING, stateIterator.next());
assertEquals(READY, stateIterator.next());
assertEquals(TRANSIENT_FAILURE, stateIterator.next());
LoadBalancer.PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(readySubchannel, pickResult.getSubchannel());
assertEquals(Status.OK.getCode(), pickResult.getStatus().getCode());
LoadBalancer.PickResult pickResult2 = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(readySubchannel, pickResult2.getSubchannel());
verifyNoMoreInteractions(mockHelper);
}
/** Uses Subchannel connected to default target. */
private PickResult useFallback(PickSubchannelArgs args) {
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().getDefaultTarget();
if (fallbackChildPolicyWrapper == null
|| !fallbackChildPolicyWrapper.getTarget().equals(defaultTarget)) {
// TODO(creamsoup) wait until lb is ready
startFallbackChildPolicy();
}
switch (fallbackChildPolicyWrapper.getConnectivityStateInfo().getState()) {
case IDLE:
// fall through
case CONNECTING:
return PickResult.withNoResult();
case TRANSIENT_FAILURE:
// fall through
case SHUTDOWN:
return
PickResult
.withError(fallbackChildPolicyWrapper.getConnectivityStateInfo().getStatus());
case READY:
SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
if (picker == null) {
return PickResult.withNoResult();
}
return picker.pickSubchannel(args);
default:
throw new AssertionError();
}
}
@Override
public PickResult picked(Metadata headers) {
headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
if (token != null) {
headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
}
return result;
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
synchronized (pickList) {
// Two-level round-robin.
// First round-robin on dropList. If a drop entry is selected, request will be dropped. If
// a non-drop entry is selected, then round-robin on pickList. This makes sure requests are
// dropped at the same proportion as the drop entries appear on the round-robin list from
// the balancer, while only READY backends (that make up pickList) are selected for the
// non-drop cases.
if (!dropList.isEmpty()) {
DropEntry drop = dropList.get(dropIndex);
dropIndex++;
if (dropIndex == dropList.size()) {
dropIndex = 0;
}
if (drop != null) {
return drop.picked();
}
}
RoundRobinEntry pick = pickList.get(pickIndex);
pickIndex++;
if (pickIndex == pickList.size()) {
pickIndex = 0;
}
return pick.picked(args.getHeaders());
}
}
@Override
public ClientTransport get(PickSubchannelArgs args) {
SubchannelPicker pickerCopy = subchannelPicker;
if (shutdown.get()) {
// If channel is shut down, delayedTransport is also shut down which will fail the stream
// properly.
return delayedTransport;
}
if (pickerCopy == null) {
final class ExitIdleModeForTransport implements Runnable {
@Override
public void run() {
exitIdleMode();
}
}
syncContext.execute(new ExitIdleModeForTransport());
return delayedTransport;
}
// There is no need to reschedule the idle timer here.
//
// pickerCopy != null, which means idle timer has not expired when this method starts.
// Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
// which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
// SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
//
// In most cases the idle timer is scheduled to fire after the transport has created the
// stream, which would have reported in-use state to the channel that would have cancelled
// the idle timer.
PickResult pickResult = pickerCopy.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(
pickResult, args.getCallOptions().isWaitForReady());
if (transport != null) {
return transport;
}
return delayedTransport;
}
@Test
public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() {
Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
// Name resolution is started as soon as channel is created.
createChannel();
verify(mockLoadBalancer).handleNameResolutionError(same(error));
FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
assertNotNull(nameResolverBackoff);
assertFalse(nameResolverBackoff.isCancelled());
// Add a pending call to the delayed transport
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
Metadata headers = new Metadata();
call.start(mockCallListener, headers);
// The pending call on the delayed transport stops the name resolver backoff from cancelling
channel.shutdown();
assertFalse(nameResolverBackoff.isCancelled());
// Notify that a subchannel is ready, which drains the delayed transport
SubchannelPicker picker = mock(SubchannelPicker.class);
Status status = Status.UNAVAILABLE.withDescription("for test");
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withDrop(status));
updateBalancingStateSafely(helper, READY, picker);
executor.runDueTasks();
verify(mockCallListener).onClose(same(status), any(Metadata.class));
assertTrue(nameResolverBackoff.isCancelled());
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
Subchannel subchannel = null;
if (stickinessState != null) {
String stickinessValue = args.getHeaders().get(stickinessState.key);
if (stickinessValue != null) {
subchannel = stickinessState.getSubchannel(stickinessValue);
if (subchannel == null || !RoundRobinLoadBalancer.isReady(subchannel)) {
subchannel = stickinessState.maybeRegister(stickinessValue, nextSubchannel());
}
}
}
return PickResult.withSubchannel(subchannel != null ? subchannel : nextSubchannel());
}
@Test
public void nameResolutionFailed_delayedTransportShutdownCancelsBackoff() {
Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).setError(error).build();
channelBuilder.nameResolverFactory(nameResolverFactory);
// Name resolution is started as soon as channel is created.
createChannel();
verify(mockLoadBalancer).handleNameResolutionError(same(error));
FakeClock.ScheduledTask nameResolverBackoff = getNameResolverRefresh();
assertNotNull(nameResolverBackoff);
assertFalse(nameResolverBackoff.isCancelled());
// Add a pending call to the delayed transport
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
Metadata headers = new Metadata();
call.start(mockCallListener, headers);
// The pending call on the delayed transport stops the name resolver backoff from cancelling
channel.shutdown();
assertFalse(nameResolverBackoff.isCancelled());
// Notify that a subchannel is ready, which drains the delayed transport
SubchannelPicker picker = mock(SubchannelPicker.class);
Status status = Status.UNAVAILABLE.withDescription("for test");
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withDrop(status));
helper.updateBalancingState(READY, picker);
executor.runDueTasks();
verify(mockCallListener).onClose(same(status), any(Metadata.class));
assertTrue(nameResolverBackoff.isCancelled());
}
@Test
public void handleNameResolutionError() {
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(null);
// Error before any child balancer created.
weightedTargetLb.handleNameResolutionError(Status.DATA_LOSS);
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(pickResult.getStatus().getCode()).isEqualTo(Status.Code.DATA_LOSS);
// Child configs updated.
Map<String, WeightedPolicySelection> targets = ImmutableMap.of(
// {foo, 10, config0}
"target0", weightedLbConfig0,
// {bar, 20, config1}
"target1", weightedLbConfig1,
// {bar, 30, config2}
"target2", weightedLbConfig2,
// {foo, 40, config3}
"target3", weightedLbConfig3);
weightedTargetLb.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(new WeightedTargetConfig(targets))
.build());
// Error after child balancers created.
weightedTargetLb.handleNameResolutionError(Status.ABORTED);
for (LoadBalancer childBalancer : childBalancers) {
verify(childBalancer).handleNameResolutionError(statusCaptor.capture());
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Status.Code.ABORTED);
}
}
@Test
public void pickerReturnsStreamTracer_noDelay() {
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
createChannel();
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
subchannel.requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
when(mockTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel, factory2));
helper.updateBalancingState(READY, mockPicker);
CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
call.start(mockCallListener, new Metadata());
verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
assertEquals(
Arrays.asList(factory1, factory2),
callOptionsCaptor.getValue().getStreamTracerFactories());
// The factories are safely not stubbed because we do not expect any usage of them.
verifyZeroInteractions(factory1);
verifyZeroInteractions(factory2);
}
@Test
public void pickerReturnsStreamTracer_delayed() {
ClientStream mockStream = mock(ClientStream.class);
ClientStreamTracer.Factory factory1 = mock(ClientStreamTracer.Factory.class);
ClientStreamTracer.Factory factory2 = mock(ClientStreamTracer.Factory.class);
createChannel();
CallOptions callOptions = CallOptions.DEFAULT.withStreamTracerFactory(factory1);
ClientCall<String, Integer> call = channel.newCall(method, callOptions);
call.start(mockCallListener, new Metadata());
Subchannel subchannel = createSubchannelSafely(helper, addressGroup, Attributes.EMPTY);
subchannel.requestConnection();
MockClientTransportInfo transportInfo = transports.poll();
transportInfo.listener.transportReady();
ClientTransport mockTransport = transportInfo.transport;
when(mockTransport.newStream(
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)))
.thenReturn(mockStream);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(
PickResult.withSubchannel(subchannel, factory2));
helper.updateBalancingState(READY, mockPicker);
assertEquals(1, executor.runDueTasks());
verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
verify(mockTransport).newStream(same(method), any(Metadata.class), callOptionsCaptor.capture());
assertEquals(
Arrays.asList(factory1, factory2),
callOptionsCaptor.getValue().getStreamTracerFactories());
// The factories are safely not stubbed because we do not expect any usage of them.
verifyZeroInteractions(factory1);
verifyZeroInteractions(factory2);
}
@Test
public void panic_bufferedCallsWillFail() {
createChannel();
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withNoResult());
helper.updateBalancingState(CONNECTING, mockPicker);
// Start RPCs that will be buffered in delayedTransport
ClientCall<String, Integer> call =
channel.newCall(method, CallOptions.DEFAULT.withoutWaitForReady());
call.start(mockCallListener, new Metadata());
ClientCall<String, Integer> call2 =
channel.newCall(method, CallOptions.DEFAULT.withWaitForReady());
call2.start(mockCallListener2, new Metadata());
executor.runDueTasks();
verifyZeroInteractions(mockCallListener, mockCallListener2);
// Enter panic
Throwable panicReason = new Exception("Simulated uncaught exception");
channel.panic(panicReason);
// Buffered RPCs fail immediately
executor.runDueTasks();
verifyCallListenerClosed(mockCallListener, Status.Code.INTERNAL, panicReason);
verifyCallListenerClosed(mockCallListener2, Status.Code.INTERNAL, panicReason);
}
@Override
public void handleNameResolutionError(final Status error) {
SubchannelPicker picker = new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error.augmentDescription("handled by downstream balancer"));
}
};
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, picker);
}
@Test
public void nameResolutionError() throws Exception {
Status error = Status.NOT_FOUND.withDescription("nameResolutionError");
loadBalancer.handleNameResolutionError(error);
verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(null, pickResult.getSubchannel());
assertEquals(error, pickResult.getStatus());
verify(mockSubchannel, never()).requestConnection();
verifyNoMoreInteractions(mockHelper);
}
@Test
public void nameResolutionError() throws Exception {
Status error = Status.NOT_FOUND.withDescription("nameResolutionError");
loadBalancer.handleNameResolutionError(error);
verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(null, pickResult.getSubchannel());
assertEquals(error, pickResult.getStatus());
verify(mockSubchannel, never()).requestConnection();
verifyNoMoreInteractions(mockHelper);
}
@Test
public void nameResolutionErrorWithStateChanges() throws Exception {
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture());
verify(mockSubchannel).start(stateListenerCaptor.capture());
CreateSubchannelArgs args = createArgsCaptor.getValue();
assertThat(args.getAddresses()).isEqualTo(servers);
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE));
inOrder.verify(mockHelper).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));
Status error = Status.NOT_FOUND.withDescription("nameResolutionError");
loadBalancer.handleNameResolutionError(error);
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(null, pickResult.getSubchannel());
assertEquals(error, pickResult.getStatus());
Status error2 = Status.NOT_FOUND.withDescription("nameResolutionError2");
loadBalancer.handleNameResolutionError(error2);
inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
pickResult = pickerCaptor.getValue().pickSubchannel(mockArgs);
assertEquals(null, pickResult.getSubchannel());
assertEquals(error2, pickResult.getStatus());
verifyNoMoreInteractions(mockHelper);
}
@Test
public void getTransportFromPickResult_errorPickResult_waitForReady() {
Status status = Status.UNAVAILABLE;
PickResult pickResult = PickResult.withError(status);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, true);
assertNull(transport);
}
private void subtestFailRpcFromBalancer(boolean waitForReady, boolean drop, boolean shouldFail) {
createChannel();
// This call will be buffered by the channel, thus involve delayed transport
CallOptions callOptions = CallOptions.DEFAULT;
if (waitForReady) {
callOptions = callOptions.withWaitForReady();
} else {
callOptions = callOptions.withoutWaitForReady();
}
ClientCall<String, Integer> call1 = channel.newCall(method, callOptions);
call1.start(mockCallListener, new Metadata());
SubchannelPicker picker = mock(SubchannelPicker.class);
Status status = Status.UNAVAILABLE.withDescription("for test");
when(picker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(drop ? PickResult.withDrop(status) : PickResult.withError(status));
updateBalancingStateSafely(helper, READY, picker);
executor.runDueTasks();
if (shouldFail) {
verify(mockCallListener).onClose(same(status), any(Metadata.class));
} else {
verifyZeroInteractions(mockCallListener);
}
// This call doesn't involve delayed transport
ClientCall<String, Integer> call2 = channel.newCall(method, callOptions);
call2.start(mockCallListener2, new Metadata());
executor.runDueTasks();
if (shouldFail) {
verify(mockCallListener2).onClose(same(status), any(Metadata.class));
} else {
verifyZeroInteractions(mockCallListener2);
}
}
@Test
public void getTransportFromPickResult_dropPickResult_failFast() {
Status status = Status.UNAVAILABLE;
PickResult pickResult = PickResult.withDrop(status);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, false);
assertNotNull(transport);
ClientStream stream = transport
.newStream(TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT);
ClientStreamListener listener = mock(ClientStreamListener.class);
stream.start(listener);
verify(listener).closed(eq(status), eq(RpcProgress.DROPPED), any(Metadata.class));
}
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
when(mockPicker.pickSubchannel(any(PickSubchannelArgs.class)))
.thenReturn(PickResult.withSubchannel(mockSubchannel));
when(mockSubchannel.obtainActiveTransport()).thenReturn(mockRealTransport);
when(mockRealTransport.newStream(same(method), same(headers), same(callOptions)))
.thenReturn(mockRealStream);
when(mockRealTransport2.newStream(same(method2), same(headers2), same(callOptions2)))
.thenReturn(mockRealStream2);
delayedTransport.start(transportListener);
}