下面列出了io.grpc.Attributes#get ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void clientStreamGetsAttributes() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator());
callMeMaybe(transport.start(clientTransportListener));
Rpc rpc = new Rpc(transport).halfClose();
rpc.waitForResponse();
assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION));
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
Attributes serverTransportAttrs = serverTransportAttributesList.poll(1, TimeUnit.SECONDS);
assertNotNull(serverTransportAttrs);
SocketAddress clientAddr = serverTransportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
assertNotNull(clientAddr);
assertEquals(clientAddr, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR));
}
@Override
public Attributes transportReady(final Attributes attributes) {
if (logger.isDebugEnabled()) {
logger.debug("Ready attributes={}", attributes);
}
final InetSocketAddress remoteSocketAddress = (InetSocketAddress) attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (remoteSocketAddress == null) {
// Unauthenticated
logger.warn("Unauthenticated transport. TRANSPORT_ATTR_REMOTE_ADDR must not be null");
throw Status.INTERNAL.withDescription("RemoteAddress is null").asRuntimeException();
}
final InetAddress inetAddress = remoteSocketAddress.getAddress();
if (addressFilter.accept(inetAddress)) {
return attributes;
}
// Permission denied
logger.debug("Permission denied transport.");
throw Status.PERMISSION_DENIED.withDescription("invalid IP").asRuntimeException();
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> serverCall, Metadata headers, ServerCallHandler<ReqT, RespT> serverCallHandler) {
final Attributes attributes = serverCall.getAttributes();
final TransportMetadata transportMetadata = attributes.get(MetadataServerTransportFilter.TRANSPORT_METADATA_KEY);
if (transportMetadata == null) {
if (logger.isInfoEnabled()) {
logger.info("Close call. cause=transportMetadata is null, headers={}, attributes={}", headers, serverCall.getAttributes());
}
serverCall.close(Status.INTERNAL.withDescription("transportMetadata is null"), new Metadata());
return new ServerCall.Listener<ReqT>() {
};
}
final Context currentContext = Context.current();
final Context newContext = currentContext.withValue(ServerContext.getTransportMetadataKey(), transportMetadata);
if (logger.isDebugEnabled()) {
logger.debug("bind metadata method={}, headers={}, attr={}", serverCall.getMethodDescriptor().getFullMethodName(), headers, serverCall.getAttributes());
}
ServerCall.Listener<ReqT> listener = Contexts.interceptCall(newContext, serverCall, headers, serverCallHandler);
return listener;
}
@Override
public ClientStreamTracer newClientStreamTracer(
ClientStreamTracer.StreamInfo info, Metadata headers) {
Attributes transportAttrs = checkNotNull(info.getTransportAttrs(), "transportAttrs");
Attributes eagAttrs =
checkNotNull(transportAttrs.get(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS), "eagAttrs");
String token = eagAttrs.get(GrpclbConstants.TOKEN_ATTRIBUTE_KEY);
headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
if (token != null) {
headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
}
if (delegate != null) {
return delegate.newClientStreamTracer(info, headers);
} else {
return NOOP_TRACER;
}
}
@Test
public void clientStreamGetsAttributes() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator());
callMeMaybe(transport.start(clientTransportListener));
Rpc rpc = new Rpc(transport).halfClose();
rpc.waitForResponse();
assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION));
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
Attributes serverTransportAttrs = serverTransportAttributesList.poll(1, TimeUnit.SECONDS);
assertNotNull(serverTransportAttrs);
SocketAddress clientAddr = serverTransportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
assertNotNull(clientAddr);
assertEquals(clientAddr, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR));
}
@Override
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
Map<String, Object> serviceConfig =
attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(serviceConfig);
helper.setHealthCheckedService(serviceName);
super.handleResolvedAddressGroups(servers, attributes);
}
@Override
public void handleResolvedAddressGroups(
List<EquivalentAddressGroup> servers, Attributes attributes) {
Map<String, Object> configMap = attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
LoadBalancerProvider newlbp;
try {
newlbp = decideLoadBalancerProvider(servers, configMap);
} catch (PolicyNotFoundException e) {
Status s = Status.INTERNAL.withDescription(e.getMessage());
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new FailingPicker(s));
delegate.shutdown();
delegateProvider = null;
delegate = new NoopLoadBalancer();
return;
}
if (delegateProvider == null
|| !newlbp.getPolicyName().equals(delegateProvider.getPolicyName())) {
helper.updateBalancingState(ConnectivityState.CONNECTING, new EmptyPicker());
delegate.shutdown();
delegateProvider = newlbp;
LoadBalancer old = delegate;
delegate = delegateProvider.newLoadBalancer(helper);
helper.getChannelLogger().log(
ChannelLogLevel.INFO, "Load balancer changed from {0} to {1}",
old.getClass().getSimpleName(), delegate.getClass().getSimpleName());
}
getDelegate().handleResolvedAddressGroups(servers, attributes);
}
DnsNameResolver(@Nullable String nsAuthority, String name, Attributes params,
Resource<Executor> executorResource, ProxyDetector proxyDetector,
Stopwatch stopwatch, boolean isAndroid) {
// TODO: if a DNS server is provided as nsAuthority, use it.
// https://www.captechconsulting.com/blogs/accessing-the-dusty-corners-of-dns-with-java
this.executorResource = executorResource;
// Must prepend a "//" to the name when constructing a URI, otherwise it will be treated as an
// opaque URI, thus the authority and host of the resulted URI would be null.
URI nameUri = URI.create("//" + checkNotNull(name, "name"));
Preconditions.checkArgument(nameUri.getHost() != null, "Invalid DNS name: %s", name);
authority = Preconditions.checkNotNull(nameUri.getAuthority(),
"nameUri (%s) doesn't have an authority", nameUri);
host = nameUri.getHost();
if (nameUri.getPort() == -1) {
Integer defaultPort = params.get(NameResolver.Factory.PARAMS_DEFAULT_PORT);
if (defaultPort != null) {
port = defaultPort;
} else {
throw new IllegalArgumentException(
"name '" + name + "' doesn't contain a port, and default port is not set in params");
}
} else {
port = nameUri.getPort();
}
this.proxyDetector = proxyDetector;
this.resolveRunnable = new Resolve(this, stopwatch, getNetworkAddressCacheTtlNanos(isAndroid));
}
@Override
public void set(Span span, Attributes attributes) {
SocketAddress address = attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (address instanceof InProcessSocketAddress) {
span.setTag(super.key, ((InProcessSocketAddress) address).getName());
} else if (address instanceof InetSocketAddress) {
final InetSocketAddress inetAddress = (InetSocketAddress) address;
span.setTag(super.key, inetAddress.getHostString() + ':' + inetAddress.getPort());
}
}
public void refreshAffinity(Map<String, Object> affinity) {
Attributes nameresoveCache =
(Attributes) affinity.get(GrpcCallOptions.GRPC_NAMERESOVER_ATTRIBUTES);
this.current_server = (SocketAddress) affinity.get(GrpcCallOptions.GRPC_CURRENT_ADDR_KEY);
this.registry_servers = nameresoveCache.get(GrpcNameResolverProvider.REMOTE_ADDR_KEYS);
this.listener = nameresoveCache.get(GrpcNameResolverProvider.NAMERESOVER_LISTENER);
this.affinity = nameresoveCache;
}
public TransportMetadata build(Attributes attributes) {
final InetSocketAddress remoteSocketAddress = (InetSocketAddress) attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (remoteSocketAddress == null) {
// Unauthenticated
throw Status.INTERNAL.withDescription("RemoteSocketAddress is null").asRuntimeException();
}
final long transportId = idGenerator.getAndIncrement();
final long connectedTime = System.currentTimeMillis();
return new DefaultTransportMetadata(debugString, remoteSocketAddress, transportId, connectedTime);
}
String getRemoteAddress() {
Attributes attributes = serverStream.getAttributes();
if (attributes == null) {
return null;
}
try {
// keys method is being considered for removal,
Set<Attributes.Key<?>> keys = attributes.keys();
if (keys == null) {
if (isDebug) {
logger.debug("can't attributes keys");
}
return null;
}
for (Attributes.Key<?> key : keys) {
if (key != null && key.toString().equals("remote-addr")) {
Object remoteAddress = attributes.get(key);
if (remoteAddress instanceof SocketAddress) {
return getSocketAddressAsString((SocketAddress) remoteAddress);
} else if (remoteAddress instanceof String) {
return (String) remoteAddress;
}
}
}
} catch (Exception e) {
if (isDebug) {
logger.debug("can't find keys method");
}
}
return GrpcConstants.UNKNOWN_ADDRESS;
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
Attributes attributes = resolvedAddresses.getAttributes();
List<EquivalentAddressGroup> newLbAddresses = attributes.get(GrpclbConstants.ATTR_LB_ADDRS);
if ((newLbAddresses == null || newLbAddresses.isEmpty())
&& resolvedAddresses.getAddresses().isEmpty()) {
handleNameResolutionError(
Status.UNAVAILABLE.withDescription("No backend or balancer addresses found"));
return;
}
List<LbAddressGroup> newLbAddressGroups = new ArrayList<>();
if (newLbAddresses != null) {
for (EquivalentAddressGroup lbAddr : newLbAddresses) {
String lbAddrAuthority = lbAddr.getAttributes().get(GrpclbConstants.ATTR_LB_ADDR_AUTHORITY);
if (lbAddrAuthority == null) {
throw new AssertionError(
"This is a bug: LB address " + lbAddr + " does not have an authority.");
}
newLbAddressGroups.add(new LbAddressGroup(lbAddr, lbAddrAuthority));
}
}
newLbAddressGroups = Collections.unmodifiableList(newLbAddressGroups);
List<EquivalentAddressGroup> newBackendServers =
Collections.unmodifiableList(resolvedAddresses.getAddresses());
GrpclbConfig newConfig = (GrpclbConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
if (newConfig == null) {
newConfig = DEFAULT_CONFIG;
}
if (!config.equals(newConfig)) {
config = newConfig;
helper.getChannelLogger().log(ChannelLogLevel.INFO, "Config: " + newConfig);
recreateStates();
}
grpclbState.handleAddresses(newLbAddressGroups, newBackendServers);
}
@Test
public void nameResolverReturnsEmptySubLists_optionallyAllowed() throws Exception {
when(mockLoadBalancer.canHandleEmptyAddressListFromNameResolution()).thenReturn(true);
// Pass a FakeNameResolverFactory with an empty list and LB config
FakeNameResolverFactory nameResolverFactory =
new FakeNameResolverFactory.Builder(expectedUri).build();
String rawLbConfig = "{ \"setting1\": \"high\" }";
Map<String, Object> rawServiceConfig =
parseConfig("{\"loadBalancingConfig\": [ {\"mock_lb\": " + rawLbConfig + " } ] }");
ManagedChannelServiceConfig parsedServiceConfig =
createManagedChannelServiceConfig(
rawServiceConfig,
new PolicySelection(
mockLoadBalancerProvider,
parseConfig(rawLbConfig),
new Object()));
nameResolverFactory.nextConfigOrError.set(ConfigOrError.fromConfig(parsedServiceConfig));
channelBuilder.nameResolverFactory(nameResolverFactory);
createChannel();
// LoadBalancer received the empty list and the LB config
verify(mockLoadBalancerProvider).newLoadBalancer(any(Helper.class));
ArgumentCaptor<ResolvedAddresses> resultCaptor =
ArgumentCaptor.forClass(ResolvedAddresses.class);
verify(mockLoadBalancer).handleResolvedAddresses(resultCaptor.capture());
assertThat(resultCaptor.getValue().getAddresses()).isEmpty();
Attributes actualAttrs = resultCaptor.getValue().getAttributes();
Map<String, ?> lbConfig = actualAttrs.get(LoadBalancer.ATTR_LOAD_BALANCING_CONFIG);
assertEquals(ImmutableMap.of("setting1", "high"), lbConfig);
// A no resolution retry
assertEquals(0, timer.numPendingTasks(NAME_RESOLVER_REFRESH_TASK_FILTER));
}
static SocketAddress getPeerSocket(Attributes streamAttributes) {
return streamAttributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
}
@Override
public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) {
if (servers.isEmpty()) {
onError(Status.UNAVAILABLE.withDescription(
"Name resolver " + helper.nr + " returned an empty list"));
return;
}
channelLogger.log(
ChannelLogLevel.DEBUG, "Resolved address: {0}, config={1}", servers, config);
if (haveBackends == null || !haveBackends) {
channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
haveBackends = true;
}
final Map<String, Object> serviceConfig =
config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
if (serviceConfig != null && !serviceConfig.equals(lastServiceConfig)) {
channelLogger.log(ChannelLogLevel.INFO, "Service config changed");
lastServiceConfig = serviceConfig;
}
final class NamesResolved implements Runnable {
@Override
public void run() {
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
return;
}
nameResolverBackoffPolicy = null;
if (serviceConfig != null) {
try {
serviceConfigInterceptor.handleUpdate(serviceConfig);
if (retryEnabled) {
throttle = getThrottle(config);
}
} catch (RuntimeException re) {
logger.warn(
"[" + getLogId() + "] Unexpected exception from parsing service config",
re);
}
}
helper.lb.handleResolvedAddressGroups(servers, config);
}
}
syncContext.execute(new NamesResolved());
}
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
Object lbConfig = resolvedAddresses.getLoadBalancingPolicyConfig();
if (lbConfig == null) {
handleNameResolutionError(Status.UNAVAILABLE.withDescription("Missing EDS lb config"));
return;
}
EdsConfig newEdsConfig = (EdsConfig) lbConfig;
if (logger.isLoggable(XdsLogLevel.INFO)) {
logger.log(
XdsLogLevel.INFO,
"Received EDS lb config: cluster={0}, child_policy={1}, "
+ "eds_service_name={2}, report_load={3}",
newEdsConfig.clusterName,
newEdsConfig.endpointPickingPolicy.getProvider().getPolicyName(),
newEdsConfig.edsServiceName,
newEdsConfig.lrsServerName != null);
}
boolean firstUpdate = false;
if (clusterName == null) {
firstUpdate = true;
}
clusterName = newEdsConfig.clusterName;
if (xdsClientPool == null) {
Attributes attributes = resolvedAddresses.getAttributes();
xdsClientPool = attributes.get(XdsAttributes.XDS_CLIENT_POOL);
if (xdsClientPool == null) {
final BootstrapInfo bootstrapInfo;
try {
bootstrapInfo = bootstrapper.readBootstrap();
} catch (Exception e) {
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(
Status.UNAVAILABLE.withDescription("Failed to bootstrap").withCause(e)));
return;
}
final List<ServerInfo> serverList = bootstrapInfo.getServers();
final Node node = bootstrapInfo.getNode();
if (serverList.isEmpty()) {
helper.updateBalancingState(
TRANSIENT_FAILURE,
new ErrorPicker(
Status.UNAVAILABLE
.withDescription("No management server provided by bootstrap")));
return;
}
XdsClientFactory xdsClientFactory = new XdsClientFactory() {
@Override
XdsClient createXdsClient() {
return
new XdsClientImpl(
helper.getAuthority(),
serverList,
channelFactory,
node,
helper.getSynchronizationContext(),
helper.getScheduledExecutorService(),
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER);
}
};
xdsClientPool = new RefCountedXdsClientObjectPool(xdsClientFactory);
} else {
logger.log(XdsLogLevel.INFO, "Use xDS client from channel");
}
xdsClient = xdsClientPool.getObject();
}
// Note: childPolicy change will be handled in LocalityStore, to be implemented.
// If edsServiceName in XdsConfig is changed, do a graceful switch.
if (firstUpdate || !Objects.equals(newEdsConfig.edsServiceName, edsServiceName)) {
LoadBalancer.Factory clusterEndpointsLoadBalancerFactory =
new ClusterEndpointsBalancerFactory(newEdsConfig.edsServiceName);
switchingLoadBalancer.switchTo(clusterEndpointsLoadBalancerFactory);
}
switchingLoadBalancer.handleResolvedAddresses(resolvedAddresses);
this.edsServiceName = newEdsConfig.edsServiceName;
}
static SocketAddress getPeerSocket(Attributes streamAttributes) {
return streamAttributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
}
private void startNewTransport() {
syncContext.throwIfNotInThisSynchronizationContext();
Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
if (addressIndex.isAtBeginning()) {
connectingTimer.reset().start();
}
SocketAddress address = addressIndex.getCurrentAddress();
HttpConnectProxiedSocketAddress proxiedAddr = null;
if (address instanceof HttpConnectProxiedSocketAddress) {
proxiedAddr = (HttpConnectProxiedSocketAddress) address;
address = proxiedAddr.getTargetAddress();
}
Attributes currentEagAttributes = addressIndex.getCurrentEagAttributes();
String eagChannelAuthority = currentEagAttributes
.get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE);
ClientTransportFactory.ClientTransportOptions options =
new ClientTransportFactory.ClientTransportOptions()
.setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority)
.setEagAttributes(currentEagAttributes)
.setUserAgent(userAgent)
.setHttpConnectProxiedSocketAddress(proxiedAddr);
TransportLogger transportLogger = new TransportLogger();
// In case the transport logs in the constructor, use the subchannel logId
transportLogger.logId = getLogId();
ConnectionClientTransport transport =
new CallTracingTransport(
transportFactory
.newClientTransport(address, options, transportLogger), callsTracer);
transportLogger.logId = transport.getLogId();
channelz.addClientSocket(transport);
pendingTransport = transport;
transports.add(transport);
Runnable runnable = transport.start(new TransportListener(transport, address));
if (runnable != null) {
syncContext.executeLater(runnable);
}
channelLogger.log(ChannelLogLevel.INFO, "Started transport {0}", transportLogger.logId);
}
/**
* Intercept incoming and outgoing messages and enforce any necessary controls
*
* @param call the request message
* @param headers the request metadata
* @param next the next interceptor in the interceptor chain prior to the service implementation
* @param <I> The message request type (e.g. ReqT)
* @param <O> The message reply type (e.g. RespT)
*
* @return a listener for the incoming call.
*/
@Override
public <I, O> ServerCall.Listener<I> interceptCall(
final ServerCall<I, O> call,
final Metadata headers,
final ServerCallHandler<I, O> next) {
final Attributes attributes = call.getAttributes();
final SocketAddress socketAddress = attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
final String clientIp = clientIp(socketAddress);
String foundSubject = DEFAULT_FOUND_SUBJECT;
// enforce that the DN on the client cert matches the configured pattern
final SSLSession sslSession = attributes.get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
if(this.authorizedDNpattern != null && sslSession != null) {
try {
final X509Certificate[] certs = sslSession.getPeerCertificateChain();
if(certs != null && certs.length > 0) {
for (final X509Certificate cert : certs) {
foundSubject = cert.getSubjectDN().getName();
if(authorizedDNpattern.matcher(foundSubject).matches()) {
break;
} else {
logger.warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + clientIp);
call.close(Status.PERMISSION_DENIED.withDescription(DN_UNAUTHORIZED + foundSubject), headers);
return IDENTITY_LISTENER;
}
}
}
} catch (final SSLPeerUnverifiedException e) {
logger.debug("skipping DN authorization for request from {}.", new Object[] {clientIp}, e);
}
}
// contextualize the DN and IP for use in the RPC implementation
final Context context = Context.current()
.withValue(REMOTE_HOST_KEY, clientIp)
.withValue(REMOTE_DN_KEY, foundSubject);
// if we got to this point, there were no errors, call the next interceptor in the chain
return Contexts.interceptCall(context, call, headers, next);
}