下面列出了怎么用io.grpc.EquivalentAddressGroup的API类实例代码及写法,或者点击链接到github查看源代码。
private void subtestShutdownWithoutSubchannel(GrpclbConfig grpclbConfig) {
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
deliverResolvedAddresses(
Collections.<EquivalentAddressGroup>emptyList(),
grpclbBalancerList,
grpclbConfig);
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
assertEquals(1, lbRequestObservers.size());
StreamObserver<LoadBalanceRequest> requestObserver = lbRequestObservers.poll();
verify(requestObserver, never()).onCompleted();
balancer.shutdown();
ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(requestObserver).onError(throwableCaptor.capture());
assertThat(Status.fromThrowable(throwableCaptor.getValue()).getCode())
.isEqualTo(Code.CANCELLED);
}
@Before
@SuppressWarnings("unchecked")
public void setUp() {
doAnswer(new Answer<Subchannel>() {
@Override
public Subchannel answer(InvocationOnMock invocation) throws Throwable {
Subchannel subchannel = mock(Subchannel.class);
List<EquivalentAddressGroup> eagList =
(List<EquivalentAddressGroup>) invocation.getArguments()[0];
Attributes attrs = (Attributes) invocation.getArguments()[1];
when(subchannel.getAllAddresses()).thenReturn(eagList);
when(subchannel.getAttributes()).thenReturn(attrs);
mockSubchannels.add(subchannel);
return subchannel;
}
}).when(helper).createSubchannel(any(List.class), any(Attributes.class));
when(helper.getSynchronizationContext()).thenReturn(syncContext);
when(helper.getScheduledExecutorService()).thenReturn(clock.getScheduledExecutorService());
pool.init(helper);
}
@Test
public void decideLoadBalancerProvider_serviceConfigFailsOnUnknown() {
Map<String, Object> serviceConfig = new HashMap<String, Object>();
serviceConfig.put("loadBalancingPolicy", "MAGIC_BALANCER");
List<EquivalentAddressGroup> servers =
Collections.singletonList(
new EquivalentAddressGroup(
new SocketAddress(){},
Attributes.EMPTY));
try {
AutoConfiguredLoadBalancer.decideLoadBalancerProvider(servers, serviceConfig);
fail();
} catch (PolicyNotFoundException e) {
assertThat(e.policy).isEqualTo("magic_balancer");
assertThat(e.choiceReason).contains("service-config specifies load-balancing policy");
}
}
private void deliverResolvedAddresses(
@Nullable String edsServiceName,
@Nullable String lrsServerName,
PolicySelection endpointPickingPolicy) {
EdsConfig config =
new EdsConfig(CLUSTER_NAME, edsServiceName, lrsServerName, endpointPickingPolicy);
ResolvedAddresses.Builder resolvedAddressBuilder = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(config);
if (isFullFlow) {
resolvedAddressBuilder.setAttributes(
Attributes.newBuilder().set(XdsAttributes.XDS_CLIENT_POOL,
xdsClientPoolFromResolveAddresses).build());
}
edsLb.handleResolvedAddresses(resolvedAddressBuilder.build());
}
@Override
public void onAddresses(List<EquivalentAddressGroup> servers, Attributes attributes) {
synchronized (lock) {
if (closed) {
return;
}
this.servers.addAll(servers);
if (this.attributes == null) {
this.attributes = attributes;
} else if (!attributes.equals(this.attributes)) {
throw new IllegalStateException("New attributes \"" + attributes
+ "\" are not the same as existing attributes: " + this.attributes);
}
if (++onAddressesCount == nameResolvers.size()) {
Collections.shuffle(this.servers);
listener.onAddresses(this.servers, attributes);
close();
}
}
}
@Override
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
// HealthCheckState is not thread-safe, we are requiring the original LoadBalancer calls
// createSubchannel() from the SynchronizationContext.
syncContext.throwIfNotInThisSynchronizationContext();
HealthCheckState hcState = new HealthCheckState(
this, delegateBalancer, syncContext, delegate.getScheduledExecutorService());
hcStates.add(hcState);
Subchannel subchannel = super.createSubchannel(
addrs, attrs.toBuilder().set(KEY_HEALTH_CHECK_STATE, hcState).build());
hcState.init(subchannel);
if (healthCheckedService != null) {
hcState.setServiceName(healthCheckedService);
}
return subchannel;
}
private static List<EquivalentAddressGroup> addUpstreamTlsContext(
List<EquivalentAddressGroup> addresses,
UpstreamTlsContext upstreamTlsContext) {
if (upstreamTlsContext == null || addresses == null) {
return addresses;
}
ArrayList<EquivalentAddressGroup> copyList = new ArrayList<>(addresses.size());
for (EquivalentAddressGroup eag : addresses) {
EquivalentAddressGroup eagCopy =
new EquivalentAddressGroup(eag.getAddresses(),
eag.getAttributes()
.toBuilder()
.set(XdsAttributes.ATTR_UPSTREAM_TLS_CONTEXT, upstreamTlsContext)
.build()
);
copyList.add(eagCopy);
}
return copyList;
}
@Test
public void pickAfterResolvedAndUnchanged() throws Exception {
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockSubchannel).start(any(SubchannelStateListener.class));
verify(mockSubchannel).requestConnection();
loadBalancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
verify(mockSubchannel).updateAddresses(eq(servers));
verifyNoMoreInteractions(mockSubchannel);
verify(mockHelper).createSubchannel(createArgsCaptor.capture());
assertThat(createArgsCaptor.getValue()).isNotNull();
verify(mockHelper)
.updateBalancingState(isA(ConnectivityState.class), isA(SubchannelPicker.class));
// Updating the subchannel addresses is unnecessary, but doesn't hurt anything
verify(mockSubchannel).updateAddresses(ArgumentMatchers.<EquivalentAddressGroup>anyList());
verifyNoMoreInteractions(mockHelper);
}
@Override
public NameResolver newNameResolver(URI notUsedUri, Attributes params) {
return new NameResolver() {
@Override
public String getServiceAuthority() {
return authority;
}
@Override
public void start(final Listener listener) {
listener.onAddresses(
Collections.singletonList(new EquivalentAddressGroup(address)),
Attributes.EMPTY);
}
@Override
public void shutdown() {}
};
}
@Override
public ListenableFuture<ChannelStats> getStats() {
SettableFuture<ChannelStats> ret = SettableFuture.create();
ChannelStats.Builder builder = new ChannelStats.Builder();
List<EquivalentAddressGroup> addressGroupsSnapshot;
List<InternalWithLogId> transportsSnapshot;
synchronized (lock) {
addressGroupsSnapshot = addressIndex.getGroups();
transportsSnapshot = new ArrayList<InternalWithLogId>(transports);
}
builder.setTarget(addressGroupsSnapshot.toString()).setState(getState());
builder.setSockets(transportsSnapshot);
callsTracer.updateBuilder(builder);
channelTracer.updateBuilder(builder);
ret.set(builder.build());
return ret;
}
@Test
public void resolve_nullResourceResolver() throws Exception {
InetAddress backendAddr = InetAddress.getByAddress(new byte[] {127, 0, 0, 0});
AddressResolver mockAddressResolver = mock(AddressResolver.class);
when(mockAddressResolver.resolveAddress(anyString()))
.thenReturn(Collections.singletonList(backendAddr));
ResourceResolver resourceResolver = null;
resolver.setAddressResolver(mockAddressResolver);
resolver.setResourceResolver(resourceResolver);
resolver.start(mockListener);
assertThat(fakeClock.runDueTasks()).isEqualTo(1);
verify(mockListener).onResult(resultCaptor.capture());
ResolutionResult result = resultCaptor.getValue();
assertThat(result.getAddresses())
.containsExactly(
new EquivalentAddressGroup(new InetSocketAddress(backendAddr, DEFAULT_PORT)));
assertThat(result.getAttributes()).isEqualTo(Attributes.EMPTY);
assertThat(result.getServiceConfig()).isNull();
}
private void deliverResolvedAddresses(
final List<EquivalentAddressGroup> backendAddrs,
List<EquivalentAddressGroup> balancerAddrs,
final GrpclbConfig grpclbConfig) {
final Attributes attrs =
Attributes.newBuilder().set(GrpclbConstants.ATTR_LB_ADDRS, balancerAddrs).build();
syncContext.execute(new Runnable() {
@Override
public void run() {
balancer.handleResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(backendAddrs)
.setAttributes(attrs)
.setLoadBalancingPolicyConfig(grpclbConfig)
.build());
}
});
}
/**
* 删除客户端与离线服务端之间的无效subchannel
*
* @author sxp
* @since 2019/12/02
*/
@Override
public void removeInvalidCacheSubchannels(Set<String> removeHostPorts) {
if (removeHostPorts == null || removeHostPorts.isEmpty()) {
return;
}
Subchannel theSubchannel;
EquivalentAddressGroup server;
for (String hostAndPort: removeHostPorts) {
server = getAddressGroupByHostAndPort(hostAndPort);
if (server == null) {
continue;
}
theSubchannel = subchannels.remove(server);
if (theSubchannel != null) {
logger.info("关闭" + server + "subchannel");
theSubchannel.shutdown();
}
}
}
@Test
public void handleResolvedAddressGroups_keepOldBalancer() {
final List<EquivalentAddressGroup> servers =
Collections.singletonList(
new EquivalentAddressGroup(new SocketAddress(){}, Attributes.EMPTY));
Helper helper = new TestHelper() {
@Override
public Subchannel createSubchannel(List<EquivalentAddressGroup> addrs, Attributes attrs) {
assertThat(addrs).isEqualTo(servers);
return new TestSubchannel(addrs, attrs);
}
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
// noop
}
};
AutoConfiguredLoadBalancer lb =
(AutoConfiguredLoadBalancer) lbf.newLoadBalancer(helper);
LoadBalancer oldDelegate = lb.getDelegate();
lb.handleResolvedAddressGroups(servers, Attributes.EMPTY);
assertThat(lb.getDelegate()).isSameAs(oldDelegate);
}
@Test
public void emptyAddresses_validConfig_firstResolution_lbNeedsAddress() throws Exception {
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.<EquivalentAddressGroup>emptyList())
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
Map<String, Object> rawServiceConfig =
parseJson("{\"loadBalancingConfig\": [{\"round_robin\": {}}]}");
nameResolverFactory.nextRawServiceConfig.set(rawServiceConfig);
createChannel();
assertThat(channel.getState(true)).isEqualTo(ConnectivityState.TRANSIENT_FAILURE);
assertWithMessage("Empty address should schedule NameResolver retry")
.that(getNameResolverRefresh())
.isNotNull();
}
@Test
public void clusterWatcher_resourceNotExist() {
ResolvedAddresses resolvedAddresses = ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setAttributes(Attributes.newBuilder()
.set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
.build())
.setLoadBalancingPolicyConfig(new CdsConfig("foo.googleapis.com"))
.build();
cdsLoadBalancer.handleResolvedAddresses(resolvedAddresses);
ArgumentCaptor<ClusterWatcher> clusterWatcherCaptor = ArgumentCaptor.forClass(null);
verify(xdsClient).watchClusterData(eq("foo.googleapis.com"), clusterWatcherCaptor.capture());
ClusterWatcher clusterWatcher = clusterWatcherCaptor.getValue();
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(null);
clusterWatcher.onResourceDoesNotExist("foo.googleapis.com");
assertThat(edsLoadBalancers).isEmpty();
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class));
assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(result.getStatus().getDescription())
.isEqualTo("Resource foo.googleapis.com is unavailable");
}
@Test
public void channelTracing_nameResolvedEvent() throws Exception {
timer.forwardNanos(1234);
channelBuilder.maxTraceEvents(10);
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri)
.setServers(Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.build();
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Address resolved: "
+ Collections.singletonList(new EquivalentAddressGroup(socketAddress)))
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void nameResolutionFailsThenRecover() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertThat(logs).containsExactly(
"DEBUG: Error: " + error,
"INFO: TRANSIENT_FAILURE: picks="
+ "[Status{code=NOT_FOUND, description=www.google.com not found, cause=null}],"
+ " drops=[]")
.inOrder();
logs.clear();
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.dropList).isEmpty();
assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
// Recover with a subsequent success
List<EquivalentAddressGroup> grpclbBalancerList = createResolvedBalancerAddresses(1);
EquivalentAddressGroup eag = grpclbBalancerList.get(0);
deliverResolvedAddresses(Collections.<EquivalentAddressGroup>emptyList(), grpclbBalancerList);
verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0)));
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
}
@Test
public void pickAfterResolvedAndChanged() throws Exception {
SocketAddress socketAddr = new FakeSocketAddress("newserver");
List<EquivalentAddressGroup> newServers =
Lists.newArrayList(new EquivalentAddressGroup(socketAddr));
InOrder inOrder = inOrder(mockHelper);
loadBalancer.handleResolvedAddressGroups(servers, affinity);
inOrder.verify(mockHelper).createSubchannel(eq(servers), any(Attributes.class));
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockSubchannel).requestConnection();
assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
loadBalancer.handleResolvedAddressGroups(newServers, affinity);
inOrder.verify(mockHelper).updateSubchannelAddresses(eq(mockSubchannel), eq(newServers));
verifyNoMoreInteractions(mockSubchannel);
verifyNoMoreInteractions(mockHelper);
}
@Test
public void updateOobChannelAddresses_existingAddressDoesNotConnect() {
ClientCall<String, Integer> call = channel.newCall(method, CallOptions.DEFAULT);
call.start(mockCallListener, new Metadata()); // Create LB
ArgumentCaptor<Helper> helperCaptor = ArgumentCaptor.forClass(null);
verify(mockLoadBalancerFactory).newLoadBalancer(helperCaptor.capture());
Helper helper = helperCaptor.getValue();
ManagedChannel oobChannel = helper.createOobChannel(servers.get(0), "localhost");
oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
MockClientTransportInfo t0 = newTransports.poll();
t0.listener.transportReady();
List<SocketAddress> changedList = new ArrayList<>(servers.get(0).getAddresses());
changedList.add(new FakeSocketAddress("aDifferentServer"));
helper.updateOobChannelAddresses(oobChannel, new EquivalentAddressGroup(changedList));
oobChannel.newCall(method, CallOptions.DEFAULT).start(mockCallListener, new Metadata());
assertNull(newTransports.poll());
}
@Test public void index_updateGroups_resets() {
SocketAddress addr1 = new FakeSocketAddress();
SocketAddress addr2 = new FakeSocketAddress();
SocketAddress addr3 = new FakeSocketAddress();
Index index = new Index(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1)),
new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
index.increment();
index.increment();
// We want to make sure both groupIndex and addressIndex are reset
index.updateGroups(Arrays.asList(
new EquivalentAddressGroup(Arrays.asList(addr1)),
new EquivalentAddressGroup(Arrays.asList(addr2, addr3))));
assertThat(index.getCurrentAddress()).isSameInstanceAs(addr1);
}
@Test
public void doNotResolveWhenProxyDetected() throws Exception {
final String name = "foo.googleapis.com";
final int port = 81;
final InetSocketAddress proxyAddress =
new InetSocketAddress(InetAddress.getByName("10.0.0.1"), 1000);
ProxyDetector alwaysDetectProxy = new ProxyDetector() {
@Override
public HttpConnectProxiedSocketAddress proxyFor(SocketAddress targetAddress) {
return HttpConnectProxiedSocketAddress.newBuilder()
.setTargetAddress((InetSocketAddress) targetAddress)
.setProxyAddress(proxyAddress)
.setUsername("username")
.setPassword("password").build();
}
};
DnsNameResolver resolver =
newResolver(name, port, alwaysDetectProxy, Stopwatch.createUnstarted());
AddressResolver mockAddressResolver = mock(AddressResolver.class);
when(mockAddressResolver.resolveAddress(anyString())).thenThrow(new AssertionError());
resolver.setAddressResolver(mockAddressResolver);
resolver.start(mockListener);
assertEquals(1, fakeExecutor.runDueTasks());
verify(mockListener).onResult(resultCaptor.capture());
List<EquivalentAddressGroup> result = resultCaptor.getValue().getAddresses();
assertThat(result).hasSize(1);
EquivalentAddressGroup eag = result.get(0);
assertThat(eag.getAddresses()).hasSize(1);
HttpConnectProxiedSocketAddress socketAddress =
(HttpConnectProxiedSocketAddress) eag.getAddresses().get(0);
assertSame(proxyAddress, socketAddress.getProxyAddress());
assertEquals("username", socketAddress.getUsername());
assertEquals("password", socketAddress.getPassword());
assertTrue(socketAddress.getTargetAddress().isUnresolved());
}
/**
* Handle new addresses of the balancer and backends from the resolver, and create connection if
* not yet connected.
*/
void handleAddresses(
List<LbAddressGroup> newLbAddressGroups, List<EquivalentAddressGroup> newBackendServers) {
if (newLbAddressGroups.isEmpty()) {
propagateError(Status.UNAVAILABLE.withDescription(
"NameResolver returned no LB address while asking for GRPCLB"));
return;
}
LbAddressGroup newLbAddressGroup = flattenLbAddressGroups(newLbAddressGroups);
startLbComm(newLbAddressGroup);
// Avoid creating a new RPC just because the addresses were updated, as it can cause a
// stampeding herd. The current RPC may be on a connection to an address not present in
// newLbAddressGroups, but we're considering that "okay". If we detected the RPC is to an
// outdated backend, we could choose to re-create the RPC.
if (lbStream == null) {
startLbRpc();
}
fallbackBackendList = newBackendServers;
// Start the fallback timer if it's never started
if (fallbackTimer == null) {
fallbackTimer = syncContext.schedule(
new FallbackModeTask(), FALLBACK_TIMEOUT_MS, TimeUnit.MILLISECONDS, timerService);
}
if (usingFallbackBackends) {
// Populate the new fallback backends to round-robin list.
useFallbackBackends();
}
maybeUpdatePicker();
}
/**
* Populate the round-robin lists with the fallback backends.
*/
private void useFallbackBackends() {
usingFallbackBackends = true;
logger.log(ChannelLogLevel.INFO, "Using fallback backends");
List<DropEntry> newDropList = new ArrayList<>();
List<BackendAddressGroup> newBackendAddrList = new ArrayList<>();
for (EquivalentAddressGroup eag : fallbackBackendList) {
newDropList.add(null);
newBackendAddrList.add(new BackendAddressGroup(eag, null));
}
useRoundRobinLists(newDropList, newBackendAddrList, null);
}
@Test
public void nameResolutionFailsThenRecover() {
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertThat(logs).containsExactly(
"DEBUG: Error: " + error,
"INFO: TRANSIENT_FAILURE: picks="
+ "[Status{code=NOT_FOUND, description=www.google.com not found, cause=null}],"
+ " drops=[]")
.inOrder();
logs.clear();
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.dropList).isEmpty();
assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
// Recover with a subsequent success
List<EquivalentAddressGroup> resolvedServers = createResolvedServerAddresses(true);
EquivalentAddressGroup eag = resolvedServers.get(0);
Attributes resolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(resolvedServers, resolutionAttrs);
verify(helper).createOobChannel(eq(eag), eq(lbAuthority(0)));
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
}
@Test
public void grpclbThenNameResolutionFails() {
InOrder inOrder = inOrder(helper, subchannelPool);
// Go to GRPCLB first
List<EquivalentAddressGroup> grpclbResolutionList = createResolvedServerAddresses(true);
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
verify(helper).createOobChannel(eq(grpclbResolutionList.get(0)), eq(lbAuthority(0)));
assertEquals(1, fakeOobChannels.size());
ManagedChannel oobChannel = fakeOobChannels.poll();
verify(mockLbService).balanceLoad(lbResponseObserverCaptor.capture());
StreamObserver<LoadBalanceResponse> lbResponseObserver = lbResponseObserverCaptor.getValue();
// Let name resolution fail before round-robin list is ready
Status error = Status.NOT_FOUND.withDescription("www.google.com not found");
deliverNameResolutionError(error);
inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue();
assertThat(picker.dropList).isEmpty();
assertThat(picker.pickList).containsExactly(new ErrorEntry(error));
assertFalse(oobChannel.isShutdown());
// Simulate receiving LB response
List<ServerEntry> backends = Arrays.asList(
new ServerEntry("127.0.0.1", 2000, "TOKEN1"),
new ServerEntry("127.0.0.1", 2010, "TOKEN2"));
lbResponseObserver.onNext(buildInitialResponse());
lbResponseObserver.onNext(buildLbResponse(backends));
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends.get(0).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
inOrder.verify(subchannelPool).takeOrCreateSubchannel(
eq(new EquivalentAddressGroup(backends.get(1).addr, LB_BACKEND_ATTRS)),
any(Attributes.class));
}
/**
* Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object.
*/
private static EquivalentAddressGroup flattenEquivalentAddressGroup(
List<EquivalentAddressGroup> groupList, Attributes attrs) {
List<SocketAddress> addrs = new ArrayList<>();
for (EquivalentAddressGroup group : groupList) {
addrs.addAll(group.getAddresses());
}
return new EquivalentAddressGroup(addrs, attrs);
}
private void deliverResolvedAddresses(
final List<EquivalentAddressGroup> addrs, final Attributes attrs) {
syncContext.execute(new Runnable() {
@Override
public void run() {
balancer.handleResolvedAddressGroups(addrs, attrs);
}
});
}
@Test
public void updateAddresses_emptyEagList_throws() {
SocketAddress addr = new FakeSocketAddress();
createInternalSubchannel(addr);
thrown.expect(IllegalArgumentException.class);
internalSubchannel.updateAddresses(Arrays.<EquivalentAddressGroup>asList());
}
@Test
public void resolve_addressFailure_stillLookUpBalancersAndServiceConfig() throws Exception {
InetAddress lbAddr = InetAddress.getByAddress(new byte[] {10, 1, 0, 0});
int lbPort = 8080;
String lbName = "foo.example.com."; // original name in SRV record
SrvRecord srvRecord = new SrvRecord(lbName, 8080);
AddressResolver mockAddressResolver = mock(AddressResolver.class);
when(mockAddressResolver.resolveAddress(hostName))
.thenThrow(new UnknownHostException("I really tried"));
when(mockAddressResolver.resolveAddress(lbName))
.thenReturn(Collections.singletonList(lbAddr));
ResourceResolver mockResourceResolver = mock(ResourceResolver.class);
when(mockResourceResolver.resolveTxt(anyString())).thenReturn(Collections.<String>emptyList());
when(mockResourceResolver.resolveSrv(anyString()))
.thenReturn(Collections.singletonList(srvRecord));
resolver.setAddressResolver(mockAddressResolver);
resolver.setResourceResolver(mockResourceResolver);
resolver.start(mockListener);
assertThat(fakeClock.runDueTasks()).isEqualTo(1);
verify(mockListener).onResult(resultCaptor.capture());
ResolutionResult result = resultCaptor.getValue();
assertThat(result.getAddresses()).isEmpty();
EquivalentAddressGroup resolvedBalancerAddr =
Iterables.getOnlyElement(result.getAttributes().get(GrpclbConstants.ATTR_LB_ADDRS));
assertThat(resolvedBalancerAddr.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY))
.isEqualTo("foo.example.com");
InetSocketAddress resolvedBalancerSockAddr =
(InetSocketAddress) Iterables.getOnlyElement(resolvedBalancerAddr.getAddresses());
assertThat(resolvedBalancerSockAddr.getAddress()).isEqualTo(lbAddr);
assertThat(resolvedBalancerSockAddr.getPort()).isEqualTo(lbPort);
assertThat(result.getServiceConfig()).isNull();
verify(mockAddressResolver).resolveAddress(hostName);
verify(mockResourceResolver).resolveTxt("_grpc_config." + hostName);
verify(mockResourceResolver).resolveSrv("_grpclb._tcp." + hostName);
}