下面列出了怎么用io.grpc.Attributes的API类实例代码及写法,或者点击链接到github查看源代码。
private Subchannel createSubchannel(final Helper helper, final int index,
final Attributes attrs) {
final AtomicReference<Subchannel> newSubchannel = new AtomicReference<>();
syncContext.execute(
new Runnable() {
@Override
public void run() {
Subchannel s =
helper.createSubchannel(
CreateSubchannelArgs.newBuilder()
.setAddresses(eagLists[index])
.setAttributes(attrs)
.build());
s.start(mockStateListeners[index]);
newSubchannel.set(s);
}
});
return newSubchannel.get();
}
@Test
public void parameterPropagation_overrideByCallOptions() {
Attributes transportAttrs = Attributes.newBuilder()
.set(ATTR_KEY, ATTR_VALUE)
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.INTEGRITY)
.build();
when(mockTransport.getAttributes()).thenReturn(transportAttrs);
Executor anotherExecutor = mock(Executor.class);
transport.newStream(method, origHeaders,
callOptions.withAuthority("calloptions-authority").withExecutor(anotherExecutor));
ArgumentCaptor<RequestInfo> infoCaptor = ArgumentCaptor.forClass(null);
verify(mockCreds).applyRequestMetadata(infoCaptor.capture(),
same(anotherExecutor), any(CallCredentials.MetadataApplier.class));
RequestInfo info = infoCaptor.getValue();
assertSame(transportAttrs, info.getTransportAttrs());
assertSame(method, info.getMethodDescriptor());
assertEquals("calloptions-authority", info.getAuthority());
assertSame(SecurityLevel.INTEGRITY, info.getSecurityLevel());
}
@Override
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> updatedServers, Attributes attributes) {
// LB addresses and backend addresses are treated separately
List<LbAddressGroup> newLbAddressGroups = new ArrayList<>();
List<EquivalentAddressGroup> newBackendServers = new ArrayList<>();
for (EquivalentAddressGroup server : updatedServers) {
String lbAddrAuthority = server.getAttributes().get(GrpcAttributes.ATTR_LB_ADDR_AUTHORITY);
if (lbAddrAuthority != null) {
newLbAddressGroups.add(new LbAddressGroup(server, lbAddrAuthority));
} else {
newBackendServers.add(server);
}
}
newLbAddressGroups = Collections.unmodifiableList(newLbAddressGroups);
newBackendServers = Collections.unmodifiableList(newBackendServers);
grpclbState.handleAddresses(newLbAddressGroups, newBackendServers);
}
@Override
public Attributes transportReady(final Attributes attributes) {
if (logger.isDebugEnabled()) {
logger.debug("Ready attributes={}", attributes);
}
final InetSocketAddress remoteSocketAddress = (InetSocketAddress) attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (remoteSocketAddress == null) {
// Unauthenticated
logger.warn("Unauthenticated transport. TRANSPORT_ATTR_REMOTE_ADDR must not be null");
throw Status.INTERNAL.withDescription("RemoteAddress is null").asRuntimeException();
}
final InetAddress inetAddress = remoteSocketAddress.getAddress();
if (addressFilter.accept(inetAddress)) {
return attributes;
}
// Permission denied
logger.debug("Permission denied transport.");
throw Status.PERMISSION_DENIED.withDescription("invalid IP").asRuntimeException();
}
@Test
public void decompressorNotFound() throws Exception {
String decompressorName = "NON_EXISTENT_DECOMPRESSOR";
createAndStartServer();
ServerTransportListener transportListener
= transportServer.registerNewServerTransport(new SimpleServerTransport());
transportListener.transportReady(Attributes.EMPTY);
Metadata requestHeaders = new Metadata();
requestHeaders.put(MESSAGE_ENCODING_KEY, decompressorName);
StatsTraceContext statsTraceCtx =
StatsTraceContext.newServerContext(
streamTracerFactories, "Waiter/nonexist", requestHeaders);
when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
transportListener.streamCreated(stream, "Waiter/nonexist", requestHeaders);
verify(stream).setListener(isA(ServerStreamListener.class));
verify(stream).streamId();
verify(stream).close(statusCaptor.capture(), any(Metadata.class));
Status status = statusCaptor.getValue();
assertEquals(Status.Code.UNIMPLEMENTED, status.getCode());
assertEquals("Can't find decompressor for " + decompressorName, status.getDescription());
verifyNoMoreInteractions(stream);
}
private static Subchannel createUnstartedSubchannel(
final Helper helper, final EquivalentAddressGroup addressGroup, final Attributes attrs) {
final AtomicReference<Subchannel> resultCapture = new AtomicReference<>();
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
Subchannel s = helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(addressGroup)
.setAttributes(attrs)
.build());
resultCapture.set(s);
}
});
return resultCapture.get();
}
@Test
public void channelTracing_subchannelCreationEvents() throws Exception {
channelBuilder.maxTraceEvents(10);
createChannel();
timer.forwardNanos(1234);
AbstractSubchannel subchannel =
(AbstractSubchannel) createSubchannelSafely(
helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
assertThat(getStats(channel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Child Subchannel started")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.setSubchannelRef(subchannel.getInstrumentedInternalSubchannel())
.build());
assertThat(getStats(subchannel).channelTrace.events).contains(new ChannelTrace.Event.Builder()
.setDescription("Subchannel for [[[test-addr]/{}]] created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
.setTimestampNanos(timer.getTicker().read())
.build());
}
@Test
public void parameterPropagation_transportSetSecurityLevel() {
Attributes transportAttrs = Attributes.newBuilder()
.set(ATTR_KEY, ATTR_VALUE)
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.INTEGRITY)
.build();
when(mockTransport.getAttributes()).thenReturn(transportAttrs);
transport.newStream(method, origHeaders, callOptions);
ArgumentCaptor<RequestInfo> infoCaptor = ArgumentCaptor.forClass(null);
verify(mockCreds).applyRequestMetadata(
infoCaptor.capture(), same(mockExecutor),
any(io.grpc.CallCredentials2.MetadataApplier.class));
RequestInfo info = infoCaptor.getValue();
assertSame(method, info.getMethodDescriptor());
assertSame(ATTR_VALUE, info.getTransportAttrs().get(ATTR_KEY));
assertSame(AUTHORITY, info.getAuthority());
assertSame(SecurityLevel.INTEGRITY, info.getSecurityLevel());
}
@Test
public void grpclbMultipleAuthorities() throws Exception {
List<EquivalentAddressGroup> grpclbResolutionList = Arrays.asList(
new EquivalentAddressGroup(
new FakeSocketAddress("fake-address-1"),
lbAttributes("fake-authority-1")),
new EquivalentAddressGroup(
new FakeSocketAddress("fake-address-2"),
lbAttributes("fake-authority-2")),
new EquivalentAddressGroup(
new FakeSocketAddress("not-a-lb-address")),
new EquivalentAddressGroup(
new FakeSocketAddress("fake-address-3"),
lbAttributes("fake-authority-1")));
final EquivalentAddressGroup goldenOobChannelEag = new EquivalentAddressGroup(
Arrays.<SocketAddress>asList(
new FakeSocketAddress("fake-address-1"),
new FakeSocketAddress("fake-address-3")),
lbAttributes("fake-authority-1")); // Supporting multiple authorities would be good, one day
Attributes grpclbResolutionAttrs = Attributes.EMPTY;
deliverResolvedAddresses(grpclbResolutionList, grpclbResolutionAttrs);
verify(helper).createOobChannel(goldenOobChannelEag, "fake-authority-1");
}
@Test
public void methodNotFound() throws Exception {
createAndStartServer();
ServerTransportListener transportListener
= transportServer.registerNewServerTransport(new SimpleServerTransport());
transportListener.transportReady(Attributes.EMPTY);
Metadata requestHeaders = new Metadata();
StatsTraceContext statsTraceCtx =
StatsTraceContext.newServerContext(
streamTracerFactories, "Waiter/nonexist", requestHeaders);
when(stream.statsTraceContext()).thenReturn(statsTraceCtx);
transportListener.streamCreated(stream, "Waiter/nonexist", requestHeaders);
verify(stream).setListener(isA(ServerStreamListener.class));
verify(stream, atLeast(1)).statsTraceContext();
assertEquals(1, executor.runDueTasks());
verify(stream).close(statusCaptor.capture(), any(Metadata.class));
Status status = statusCaptor.getValue();
assertEquals(Status.Code.UNIMPLEMENTED, status.getCode());
assertEquals("Method not found: Waiter/nonexist", status.getDescription());
verify(streamTracerFactory).newServerStreamTracer(eq("Waiter/nonexist"), same(requestHeaders));
assertNull(streamTracer.getServerCallInfo());
assertEquals(Status.Code.UNIMPLEMENTED, statusCaptor.getValue().getCode());
}
@Test
public void parameterPropagation_base() {
Attributes transportAttrs = Attributes.newBuilder().set(ATTR_KEY, ATTR_VALUE).build();
when(mockTransport.getAttributes()).thenReturn(transportAttrs);
transport.newStream(method, origHeaders, callOptions);
ArgumentCaptor<RequestInfo> infoCaptor = ArgumentCaptor.forClass(null);
verify(mockCreds).applyRequestMetadata(infoCaptor.capture(), same(mockExecutor),
any(CallCredentials.MetadataApplier.class));
RequestInfo info = infoCaptor.getValue();
assertSame(transportAttrs, info.getTransportAttrs());
assertSame(method, info.getMethodDescriptor());
assertSame(AUTHORITY, info.getAuthority());
assertSame(SecurityLevel.NONE, info.getSecurityLevel());
}
@Test
public void applyMetadata_delayed() {
when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY);
// Will call applyRequestMetadata(), which is no-op.
DelayedStream stream = (DelayedStream) transport.newStream(method, origHeaders, callOptions);
ArgumentCaptor<CallCredentials.MetadataApplier> applierCaptor = ArgumentCaptor.forClass(null);
verify(mockCreds).applyRequestMetadata(any(RequestInfo.class),
same(mockExecutor), applierCaptor.capture());
verify(mockTransport, never()).newStream(method, origHeaders, callOptions);
Metadata headers = new Metadata();
headers.put(CREDS_KEY, CREDS_VALUE);
applierCaptor.getValue().apply(headers);
verify(mockTransport).newStream(method, origHeaders, callOptions);
assertSame(mockStream, stream.getRealStream());
assertEquals(CREDS_VALUE, origHeaders.get(CREDS_KEY));
assertEquals(ORIG_HEADER_VALUE, origHeaders.get(ORIG_HEADER_KEY));
}
@Test
public void fail_inline() {
final Status error = Status.FAILED_PRECONDITION.withDescription("channel not secure for creds");
when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
MetadataApplier applier = (MetadataApplier) invocation.getArguments()[2];
applier.fail(error);
return null;
}
}).when(mockCreds).applyRequestMetadata(
any(RequestInfo.class), same(mockExecutor), any(MetadataApplier.class));
FailingClientStream stream =
(FailingClientStream) transport.newStream(method, origHeaders, callOptions);
verify(mockTransport, never()).newStream(method, origHeaders, callOptions);
assertSame(error, stream.getError());
}
/**
* Returns the next level hierarchical addresses derived from the given hierarchical addresses
* with the given filter name (any non-hierarchical addresses in the input will be ignored).
* This method does not modify the input addresses.
*/
static List<EquivalentAddressGroup> filter(List<EquivalentAddressGroup> addresses, String name) {
checkNotNull(addresses, "addresses");
checkNotNull(name, "name");
List<EquivalentAddressGroup> filteredAddresses = new ArrayList<>();
for (EquivalentAddressGroup address : addresses) {
PathChain pathChain = address.getAttributes().get(PATH_CHAIN_KEY);
if (pathChain != null && pathChain.name.equals(name)) {
Attributes filteredAddressAttrs =
address.getAttributes().toBuilder().set(PATH_CHAIN_KEY, pathChain.next).build();
filteredAddresses.add(
new EquivalentAddressGroup(address.getAddresses(), filteredAddressAttrs));
}
}
return Collections.unmodifiableList(filteredAddresses);
}
@Override
protected void manualSetUp() throws Exception {
assertNull("manualSetUp should not run more than once", handler());
initChannel(new GrpcHttp2ServerHeadersDecoder(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE));
// replace the keepAliveManager with spyKeepAliveManager
spyKeepAliveManager =
mock(KeepAliveManager.class, delegatesTo(handler().getKeepAliveManagerForTest()));
handler().setKeepAliveManagerForTest(spyKeepAliveManager);
// Simulate receipt of the connection preface
handler().handleProtocolNegotiationCompleted(Attributes.EMPTY, /*securityInfo=*/ null);
channelRead(Http2CodecUtil.connectionPrefaceBuf());
// Simulate receipt of initial remote settings.
ByteBuf serializedSettings = serializeSettings(new Http2Settings());
channelRead(serializedSettings);
}
@Test
public void setSoLingerChannelOption() throws IOException {
startServer();
Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>();
// set SO_LINGER option
int soLinger = 123;
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
NettyClientTransport transport = new NettyClientTransport(
address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */,
tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker());
transports.add(transport);
callMeMaybe(transport.start(clientTransportListener));
// verify SO_LINGER has been set
ChannelConfig config = transport.channel().config();
assertTrue(config instanceof SocketChannelConfig);
assertEquals(soLinger, ((SocketChannelConfig) config).getSoLinger());
}
private void update(Endpoints endpoints) {
List<EquivalentAddressGroup> servers = new ArrayList<>();
if (endpoints.getSubsets() == null) return;
endpoints.getSubsets().stream().forEach(subset -> {
long matchingPorts = subset.getPorts().stream().filter(p -> {
return p != null && p.getPort() == port;
}).count();
if (matchingPorts > 0) {
subset.getAddresses().stream().map(address -> {
return new EquivalentAddressGroup(new InetSocketAddress(address.getIp(), port));
}).forEach(address -> {
servers.add(address);
});
}
});
listener.onAddresses(servers, Attributes.EMPTY);
}
@Test
public void clientStreamGetsAttributes() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator());
callMeMaybe(transport.start(clientTransportListener));
Rpc rpc = new Rpc(transport).halfClose();
rpc.waitForResponse();
assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION));
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
Attributes serverTransportAttrs = serverTransportAttributesList.poll(1, TimeUnit.SECONDS);
assertNotNull(serverTransportAttrs);
SocketAddress clientAddr = serverTransportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
assertNotNull(clientAddr);
assertEquals(clientAddr, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR));
}
@Test
public void altsHandler() {
Attributes eagAttributes =
Attributes.newBuilder().set(GrpclbConstants.ATTR_LB_PROVIDED_BACKEND, true).build();
GrpcHttp2ConnectionHandler mockHandler = mock(GrpcHttp2ConnectionHandler.class);
when(mockHandler.getEagAttributes()).thenReturn(eagAttributes);
final AtomicReference<Throwable> failure = new AtomicReference<>();
ChannelHandler exceptionCaught = new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
failure.set(cause);
super.exceptionCaught(ctx, cause);
}
};
ChannelHandler h = googleProtocolNegotiator.newHandler(mockHandler);
EmbeddedChannel chan = new EmbeddedChannel(exceptionCaught);
// Add the negotiator handler last, but to the front. Putting this in ctor above would make it
// throw early.
chan.pipeline().addFirst(h);
chan.pipeline().fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
// Check that the message complained about the ALTS code, rather than SSL. ALTS throws on
// being added, so it's hard to catch it at the right time to make this assertion.
assertThat(failure.get()).hasMessageThat().contains("TsiHandshakeHandler");
}
private void deliverResolvedAddresses(
List<EquivalentAddressGroup> addresses, String childPolicy) {
PolicySelection childPolicyConfig =
new PolicySelection(new FakeLoadBalancerProvider(childPolicy), null, null);
LrsConfig config =
new LrsConfig(
CLUSTER_NAME, EDS_SERVICE_NAME, LRS_SERVER_NAME, TEST_LOCALITY, childPolicyConfig);
ResolvedAddresses resolvedAddresses =
ResolvedAddresses.newBuilder()
.setAddresses(addresses)
.setAttributes(
Attributes.newBuilder()
.set(XdsAttributes.ATTR_CLUSTER_SERVICE_LOAD_STATS_STORE, loadRecorder)
.build())
.setLoadBalancingPolicyConfig(config)
.build();
loadBalancer.handleResolvedAddresses(resolvedAddresses);
}
@Test
public void applyMetadata_inline() {
when(mockTransport.getAttributes()).thenReturn(Attributes.EMPTY);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
MetadataApplier applier = (MetadataApplier) invocation.getArguments()[2];
Metadata headers = new Metadata();
headers.put(CREDS_KEY, CREDS_VALUE);
applier.apply(headers);
return null;
}
}).when(mockCreds).applyRequestMetadata(
any(RequestInfo.class), same(mockExecutor),
any(io.grpc.CallCredentials2.MetadataApplier.class));
ClientStream stream = transport.newStream(method, origHeaders, callOptions);
verify(mockTransport).newStream(method, origHeaders, callOptions);
assertSame(mockStream, stream);
assertEquals(CREDS_VALUE, origHeaders.get(CREDS_KEY));
assertEquals(ORIG_HEADER_VALUE, origHeaders.get(ORIG_HEADER_KEY));
}
@Override
public DnsNameResolver newNameResolver(URI targetUri, Attributes params) {
if (SCHEME.equals(targetUri.getScheme())) {
String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath");
Preconditions.checkArgument(targetPath.startsWith("/"),
"the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
String name = targetPath.substring(1);
return new DnsNameResolver(
targetUri.getAuthority(),
name,
params,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
GrpcUtil.getDefaultProxyDetector(),
Stopwatch.createUnstarted(),
InternalServiceProviders.isAndroid(getClass().getClassLoader()));
} else {
return null;
}
}
String getRemoteAddress() {
Attributes attributes = serverStream.getAttributes();
if (attributes == null) {
return null;
}
try {
// keys method is being considered for removal,
Set<Attributes.Key<?>> keys = attributes.keys();
if (keys == null) {
if (isDebug) {
logger.debug("can't attributes keys");
}
return null;
}
for (Attributes.Key<?> key : keys) {
if (key != null && key.toString().equals("remote-addr")) {
Object remoteAddress = attributes.get(key);
if (remoteAddress instanceof SocketAddress) {
return getSocketAddressAsString((SocketAddress) remoteAddress);
} else if (remoteAddress instanceof String) {
return (String) remoteAddress;
}
}
}
} catch (Exception e) {
if (isDebug) {
logger.debug("can't find keys method");
}
}
return GrpcConstants.UNKNOWN_ADDRESS;
}
DnsNameResolver(@Nullable String nsAuthority, String name, Attributes params,
Resource<Executor> executorResource, ProxyDetector proxyDetector,
Stopwatch stopwatch, boolean isAndroid) {
// TODO: if a DNS server is provided as nsAuthority, use it.
// https://www.captechconsulting.com/blogs/accessing-the-dusty-corners-of-dns-with-java
this.executorResource = executorResource;
// Must prepend a "//" to the name when constructing a URI, otherwise it will be treated as an
// opaque URI, thus the authority and host of the resulted URI would be null.
URI nameUri = URI.create("//" + checkNotNull(name, "name"));
Preconditions.checkArgument(nameUri.getHost() != null, "Invalid DNS name: %s", name);
authority = Preconditions.checkNotNull(nameUri.getAuthority(),
"nameUri (%s) doesn't have an authority", nameUri);
host = nameUri.getHost();
if (nameUri.getPort() == -1) {
Integer defaultPort = params.get(NameResolver.Factory.PARAMS_DEFAULT_PORT);
if (defaultPort != null) {
port = defaultPort;
} else {
throw new IllegalArgumentException(
"name '" + name + "' doesn't contain a port, and default port is not set in params");
}
} else {
port = nameUri.getPort();
}
this.proxyDetector = proxyDetector;
this.resolveRunnable = new Resolve(this, stopwatch, getNetworkAddressCacheTtlNanos(isAndroid));
}
@Override
public final Attributes getAttributes() {
if (state.winningSubstream != null) {
return state.winningSubstream.stream.getAttributes();
}
return Attributes.EMPTY;
}
@Test
public void tlsHandler() {
Attributes eagAttributes = Attributes.EMPTY;
GrpcHttp2ConnectionHandler mockHandler = mock(GrpcHttp2ConnectionHandler.class);
when(mockHandler.getEagAttributes()).thenReturn(eagAttributes);
when(mockHandler.getAuthority()).thenReturn("authority");
ChannelHandler h = googleProtocolNegotiator.newHandler(mockHandler);
EmbeddedChannel chan = new EmbeddedChannel(h);
chan.pipeline().fireUserEventTriggered(InternalProtocolNegotiationEvent.getDefault());
assertThat(chan.pipeline().first().getClass().getSimpleName()).isEqualTo("SslHandler");
}
@Test
public void secondShouldFind() throws Exception {
NameResolver fakeResolver = new FakeResolver();
FakeResolverProvider canResolve = new FakeResolverProvider("aaa://", fakeResolver);
FakeResolverProvider cannotResolve = new FakeResolverProvider("bbb://", null);
NameResolver.Factory factory = FallbackResolver.startWith(cannotResolve).thenCheck(canResolve);
assertEquals(fakeResolver, factory.newNameResolver(new URI("bbb://foo"), Attributes.EMPTY));
}
private static List<EquivalentAddressGroup> createResolvedServerAddresses(boolean ... isLb) {
ArrayList<EquivalentAddressGroup> list = new ArrayList<>();
for (int i = 0; i < isLb.length; i++) {
SocketAddress addr = new FakeSocketAddress("fake-address-" + i);
EquivalentAddressGroup eag =
new EquivalentAddressGroup(
addr,
isLb[i] ? lbAttributes(lbAuthority(i)) : Attributes.EMPTY);
list.add(eag);
}
return list;
}
/**
* Flattens list of EquivalentAddressGroup objects into one EquivalentAddressGroup object.
*/
private static EquivalentAddressGroup flattenEquivalentAddressGroup(
List<EquivalentAddressGroup> groupList, Attributes attrs) {
List<SocketAddress> addrs = new ArrayList<>();
for (EquivalentAddressGroup group : groupList) {
addrs.addAll(group.getAddresses());
}
return new EquivalentAddressGroup(addrs, attrs);
}
@Test
public void updateReportingIntervalBeforeSubchannelReady() {
createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY);
setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs,
"DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
assertThat(orcaServiceImps[0].calls.poll().request)
.isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
}