下面列出了怎么用io.grpc.internal.GrpcUtil的API类实例代码及写法,或者点击链接到github查看源代码。
@Nullable
@Override
public NameResolver newNameResolver(final URI targetUri, final NameResolver.Args args) {
if (DISCOVERY_SCHEME.equals(targetUri.getScheme())) {
final String serviceName = targetUri.getPath();
if (serviceName == null || serviceName.length() <= 1 || !serviceName.startsWith("/")) {
throw new IllegalArgumentException("Incorrectly formatted target uri; "
+ "expected: '" + DISCOVERY_SCHEME + ":[//]/<service-name>'; "
+ "but was '" + targetUri.toString() + "'");
}
final AtomicReference<DiscoveryClientNameResolver> reference = new AtomicReference<>();
final DiscoveryClientNameResolver discoveryClientNameResolver =
new DiscoveryClientNameResolver(serviceName.substring(1), this.client, args,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
() -> this.discoveryClientNameResolvers.remove(reference.get()));
reference.set(discoveryClientNameResolver);
this.discoveryClientNameResolvers.add(discoveryClientNameResolver);
return discoveryClientNameResolver;
}
return null;
}
@Before
public void setUp() {
GrpclbNameResolver.setEnableTxt(true);
NameResolver.Args args =
NameResolver.Args.newBuilder()
.setDefaultPort(DEFAULT_PORT)
.setProxyDetector(GrpcUtil.NOOP_PROXY_DETECTOR)
.setSynchronizationContext(syncContext)
.setServiceConfigParser(serviceConfigParser)
.setChannelLogger(mock(ChannelLogger.class))
.build();
resolver =
new GrpclbNameResolver(
null, NAME, args, fakeExecutorResource, fakeClock.getStopwatchSupplier().get(),
/* isAndroid */false);
hostName = resolver.getHost();
assertThat(hostName).isEqualTo(NAME);
}
OkHttpClientTransport(InetSocketAddress address, String authority, @Nullable String userAgent,
Executor executor, @Nullable SSLSocketFactory sslSocketFactory,
@Nullable HostnameVerifier hostnameVerifier, ConnectionSpec connectionSpec,
int maxMessageSize, int initialWindowSize, @Nullable ProxyParameters proxy,
Runnable tooManyPingsRunnable, int maxInboundMetadataSize, TransportTracer transportTracer) {
this.address = Preconditions.checkNotNull(address, "address");
this.defaultAuthority = authority;
this.maxMessageSize = maxMessageSize;
this.initialWindowSize = initialWindowSize;
this.executor = Preconditions.checkNotNull(executor, "executor");
serializingExecutor = new SerializingExecutor(executor);
// Client initiated streams are odd, server initiated ones are even. Server should not need to
// use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
nextStreamId = 3;
this.sslSocketFactory = sslSocketFactory;
this.hostnameVerifier = hostnameVerifier;
this.connectionSpec = Preconditions.checkNotNull(connectionSpec, "connectionSpec");
this.stopwatchFactory = GrpcUtil.STOPWATCH_SUPPLIER;
this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
this.proxy = proxy;
this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
this.maxInboundMetadataSize = maxInboundMetadataSize;
this.transportTracer = Preconditions.checkNotNull(transportTracer);
initTransportTracer();
}
@Override
public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
String data = debugData.utf8();
log.log(Level.WARNING, String.format(
"%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
if ("too_many_pings".equals(data)) {
tooManyPingsRunnable.run();
}
}
Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
.augmentDescription("Received Goaway");
if (debugData.size() > 0) {
// If a debug message was provided, use it.
status = status.augmentDescription(debugData.utf8());
}
startGoAway(lastGoodStreamId, null, status);
}
@Override
protected Attributes getNameResolverParams() {
int defaultPort;
switch (negotiationType) {
case PLAINTEXT:
defaultPort = GrpcUtil.DEFAULT_PORT_PLAINTEXT;
break;
case TLS:
defaultPort = GrpcUtil.DEFAULT_PORT_SSL;
break;
default:
throw new AssertionError(negotiationType + " not handled");
}
return Attributes.newBuilder()
.set(NameResolver.Factory.PARAMS_DEFAULT_PORT, defaultPort).build();
}
@Test
public void overrideDefaultUserAgent() throws Exception {
startTransport(3, null, true, DEFAULT_MAX_MESSAGE_SIZE, INITIAL_WINDOW_SIZE, "fakeUserAgent");
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
List<Header> expectedHeaders = Arrays.asList(SCHEME_HEADER, METHOD_HEADER,
new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"),
new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()),
new Header(GrpcUtil.USER_AGENT_KEY.name(),
GrpcUtil.getGrpcUserAgent("okhttp", "fakeUserAgent")),
CONTENT_TYPE_HEADER, TE_HEADER);
verify(frameWriter, timeout(TIME_OUT_MS))
.synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders));
getStream(3).cancel(Status.CANCELLED);
shutdownAndVerify();
}
@Nullable
@Override
public NameResolver newNameResolver(final URI targetUri, final NameResolver.Args args) {
if (DISCOVERY_SCHEME.equals(targetUri.getScheme())) {
final String serviceName = targetUri.getPath();
if (serviceName == null || serviceName.length() <= 1 || !serviceName.startsWith("/")) {
throw new IllegalArgumentException("Incorrectly formatted target uri; "
+ "expected: '" + DISCOVERY_SCHEME + ":[//]/<service-name>'; "
+ "but was '" + targetUri.toString() + "'");
}
final AtomicReference<DiscoveryClientNameResolver> reference = new AtomicReference<>();
final DiscoveryClientNameResolver discoveryClientNameResolver =
new DiscoveryClientNameResolver(serviceName.substring(1), this.client, args,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
() -> this.discoveryClientNameResolvers.remove(reference.get()));
reference.set(discoveryClientNameResolver);
this.discoveryClientNameResolvers.add(discoveryClientNameResolver);
return discoveryClientNameResolver;
}
return null;
}
@Test
public void wrongHostNameFailHostnameVerification() throws Exception {
ManagedChannel channel = createChannelBuilder()
.overrideAuthority(GrpcUtil.authorityFromHostAndPort(
BAD_HOSTNAME, getPort()))
.build();
TestServiceGrpc.TestServiceBlockingStub blockingStub =
TestServiceGrpc.newBlockingStub(channel);
Throwable actualThrown = null;
try {
blockingStub.emptyCall(Empty.getDefaultInstance());
} catch (Throwable t) {
actualThrown = t;
}
assertNotNull("The rpc should have been failed due to hostname verification", actualThrown);
Throwable cause = Throwables.getRootCause(actualThrown);
assertTrue(
"Failed by unexpected exception: " + cause, cause instanceof SSLPeerUnverifiedException);
channel.shutdown();
}
@Test
public void keepAliveEnabled_shouldSetTcpUserTimeout() throws Exception {
assume().that(Utils.isEpollAvailable()).isTrue();
startServer();
EventLoopGroup epollGroup = Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP.create();
int keepAliveTimeMillis = 12345670;
int keepAliveTimeoutMillis = 1234567;
try {
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, true /* keep alive */,
TimeUnit.MILLISECONDS.toNanos(keepAliveTimeMillis),
TimeUnit.MILLISECONDS.toNanos(keepAliveTimeoutMillis),
new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE), epollGroup);
callMeMaybe(transport.start(clientTransportListener));
ChannelOption<Integer> tcpUserTimeoutOption = Utils.maybeGetTcpUserTimeoutOption();
assertThat(tcpUserTimeoutOption).isNotNull();
// on some linux based system, the integer value may have error (usually +-1)
assertThat((double) transport.channel().config().getOption(tcpUserTimeoutOption))
.isWithin(5.0).of((double) keepAliveTimeoutMillis);
} finally {
epollGroup.shutdownGracefully();
}
}
/**
* Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
*/
private void onRstStreamRead(int streamId, long errorCode) {
NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
if (stream != null) {
Status status = GrpcUtil.Http2Error.statusForCode((int) errorCode)
.augmentDescription("Received Rst Stream");
stream.transportReportStatus(
status,
errorCode == Http2Error.REFUSED_STREAM.code()
? RpcProgress.REFUSED : RpcProgress.PROCESSED,
false /*stop delivery*/,
new Metadata());
if (keepAliveManager != null) {
keepAliveManager.onDataReceived();
}
}
}
@Test
@SuppressWarnings("GuardedBy")
public void start_headerPlaintext() throws IOException {
Metadata metaData = new Metadata();
metaData.put(GrpcUtil.USER_AGENT_KEY, "misbehaving-application");
when(transport.isUsingPlaintext()).thenReturn(true);
stream = new OkHttpClientStream(methodDescriptor, metaData, frameWriter, transport,
flowController, lock, MAX_MESSAGE_SIZE, INITIAL_WINDOW_SIZE, "localhost",
"good-application", StatsTraceContext.NOOP, transportTracer, CallOptions.DEFAULT, false);
stream.start(new BaseClientStreamListener());
stream.transportState().start(3);
verify(mockedFrameWriter)
.synStream(eq(false), eq(false), eq(3), eq(0), headersCaptor.capture());
assertThat(headersCaptor.getValue()).containsExactly(
Headers.HTTP_SCHEME_HEADER,
Headers.METHOD_HEADER,
new Header(Header.TARGET_AUTHORITY, "localhost"),
new Header(Header.TARGET_PATH, "/" + methodDescriptor.getFullMethodName()),
new Header(GrpcUtil.USER_AGENT_KEY.name(), "good-application"),
Headers.CONTENT_TYPE_HEADER,
Headers.TE_HEADER)
.inOrder();
}
public static Http2Headers convertClientHeaders(Metadata headers,
AsciiString scheme,
AsciiString defaultPath,
AsciiString authority,
AsciiString method,
AsciiString userAgent) {
Preconditions.checkNotNull(defaultPath, "defaultPath");
Preconditions.checkNotNull(authority, "authority");
Preconditions.checkNotNull(method, "method");
// Discard any application supplied duplicates of the reserved headers
headers.discardAll(CONTENT_TYPE_KEY);
headers.discardAll(GrpcUtil.TE_HEADER);
headers.discardAll(GrpcUtil.USER_AGENT_KEY);
return GrpcHttp2OutboundHeaders.clientRequestHeaders(
toHttp2Headers(headers),
authority,
defaultPath,
method,
scheme,
userAgent);
}
/**
* 解析应答
*
* @param message 消息
* @param wrapper 返回类型
* @return 应答
* @throws IOException
*/
protected ResponsePayload decodePayload(final Http2ResponseMessage message, final ClassWrapper wrapper) throws IOException {
Http2Headers headers = message.headers();
InputStream in = new UnsafeByteArrayInputStream(message.content());
//读压缩位标识
int isCompression = in.read();
//读长度共4位
if (in.skip(4) < 4) {
throw new IOException(String.format("request data is not full. id=%d", message.getMsgId()));
}
//解压处理
if (isCompression > 0) {
Pair<String, Compression> pair = getEncoding((String) headers.get(GrpcUtil.MESSAGE_ACCEPT_ENCODING));
if (pair != null) {
in = pair.getValue().decompress(in);
}
}
//反序列化
Object response = serialization.getSerializer().deserialize(in, wrapper.getClazz());
if (wrapper.isWrapper()) {
//性能优化
Object[] parameters = wrapper.getConversion().getToParameter().apply(response);
response = parameters[0];
}
return new ResponsePayload(response);
}
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
server = new NettyServer(
TestUtils.testServerAddress(0),
NioServerSocketChannel.class,
new HashMap<ChannelOption<?>, Object>(),
group, group, negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize,
DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS,
MAX_CONNECTION_IDLE_NANOS_DISABLED,
MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0,
channelz);
server.start(serverListener);
address = TestUtils.testServerAddress(server.getPort());
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
}
@Test
@SuppressWarnings("UndefinedEquals") // AsciiString.equals
public void convertServerHeaders_sanitizes() {
Metadata metaData = new Metadata();
// Intentionally being explicit here rather than relying on any pre-defined lists of headers,
// since the goal of this test is to validate the correctness of such lists in the first place.
metaData.put(GrpcUtil.CONTENT_TYPE_KEY, "to-be-removed");
metaData.put(GrpcUtil.TE_HEADER, "to-be-removed");
metaData.put(GrpcUtil.USER_AGENT_KEY, "to-be-removed");
metaData.put(userKey, userValue);
Http2Headers output = Utils.convertServerHeaders(metaData);
DefaultHttp2Headers headers = new DefaultHttp2Headers();
for (Map.Entry<CharSequence, CharSequence> entry : output) {
headers.add(entry.getKey(), entry.getValue());
}
// 2 reserved headers, 1 user header
assertEquals(2 + 1, headers.size());
assertEquals(Utils.CONTENT_TYPE_GRPC, headers.get(GrpcUtil.CONTENT_TYPE_KEY.name()));
}
@Test
public void overrideDefaultUserAgent() throws Exception {
startTransport(3, null, true, DEFAULT_MAX_MESSAGE_SIZE, INITIAL_WINDOW_SIZE, "fakeUserAgent");
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream =
clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
stream.start(listener);
List<Header> expectedHeaders = Arrays.asList(HTTP_SCHEME_HEADER, METHOD_HEADER,
new Header(Header.TARGET_AUTHORITY, "notarealauthority:80"),
new Header(Header.TARGET_PATH, "/" + method.getFullMethodName()),
new Header(GrpcUtil.USER_AGENT_KEY.name(),
GrpcUtil.getGrpcUserAgent("okhttp", "fakeUserAgent")),
CONTENT_TYPE_HEADER, TE_HEADER);
verify(frameWriter, timeout(TIME_OUT_MS))
.synStream(eq(false), eq(false), eq(3), eq(0), eq(expectedHeaders));
getStream(3).cancel(Status.CANCELLED);
shutdownAndVerify();
}
@Test
@SuppressWarnings("UndefinedEquals") // AsciiString.equals
public void convertServerHeaders_sanitizes() {
Metadata metaData = new Metadata();
// Intentionally being explicit here rather than relying on any pre-defined lists of headers,
// since the goal of this test is to validate the correctness of such lists in the first place.
metaData.put(GrpcUtil.CONTENT_TYPE_KEY, "to-be-removed");
metaData.put(GrpcUtil.TE_HEADER, "to-be-removed");
metaData.put(GrpcUtil.USER_AGENT_KEY, "to-be-removed");
metaData.put(userKey, userValue);
Http2Headers output = Utils.convertServerHeaders(metaData);
DefaultHttp2Headers headers = new DefaultHttp2Headers();
for (Map.Entry<CharSequence, CharSequence> entry : output) {
headers.add(entry.getKey(), entry.getValue());
}
// 2 reserved headers, 1 user header
assertEquals(2 + 1, headers.size());
assertEquals(Utils.CONTENT_TYPE_GRPC, headers.get(GrpcUtil.CONTENT_TYPE_KEY.name()));
}
@VisibleForTesting
static HostPort parseAuthority(String authority) {
URI uri = GrpcUtil.authorityToUri(Preconditions.checkNotNull(authority, "authority"));
String host;
int port;
if (uri.getHost() != null) {
host = uri.getHost();
port = uri.getPort();
} else {
/*
* Implementation note: We pick -1 as the port here rather than deriving it from the
* original socket address. The SSL engine doesn't use this port number when contacting the
* remote server, but rather it is used for other things like SSL Session caching. When an
* invalid authority is provided (like "bad_cert"), picking the original port and passing it
* in would mean that the port might used under the assumption that it was correct. By
* using -1 here, it forces the SSL implementation to treat it as invalid.
*/
host = authority;
port = -1;
}
return new HostPort(host, port);
}
@Test
public void channelFactoryShouldNNotSetSocketOptionKeepAlive() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator(),
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true,
TimeUnit.SECONDS.toNanos(10L), TimeUnit.SECONDS.toNanos(1L),
new ReflectiveChannelFactory<>(LocalChannel.class), group);
callMeMaybe(transport.start(clientTransportListener));
assertThat(transport.channel().config().getOption(ChannelOption.SO_KEEPALIVE))
.isNull();
}
/**
* Javadoc comment.
*/
@Benchmark
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public byte[][] marshalOld() {
Metadata m = new Metadata();
m.put(
GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY,
InternalDecompressorRegistry.getRawAdvertisedMessageEncodings(reg));
return TransportFrameUtil.toHttp2Headers(m);
}
@Test
public void keepAliveDisabled() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator(), DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, null /* user agent */, false /* keep alive */);
callMeMaybe(transport.start(clientTransportListener));
Rpc rpc = new Rpc(transport).halfClose();
rpc.waitForResponse();
assertNull(transport.keepAliveManager());
}
@Test
public void removeUserAgentFromApplicationHeaders() {
Metadata metadata = new Metadata();
metadata.put(GrpcUtil.USER_AGENT_KEY, "bad agent");
listener = mock(ClientStreamListener.class);
Mockito.reset(writeQueue);
ChannelPromise completedPromise = new DefaultChannelPromise(channel)
.setSuccess();
when(writeQueue.enqueue(any(QueuedCommand.class), anyBoolean())).thenReturn(completedPromise);
stream = new NettyClientStream(
new TransportStateImpl(handler, DEFAULT_MAX_MESSAGE_SIZE),
methodDescriptor,
new Metadata(),
channel,
AsciiString.of("localhost"),
AsciiString.of("http"),
AsciiString.of("good agent"),
StatsTraceContext.NOOP,
transportTracer,
CallOptions.DEFAULT,
false);
stream.start(listener);
ArgumentCaptor<CreateStreamCommand> cmdCap = ArgumentCaptor.forClass(CreateStreamCommand.class);
verify(writeQueue).enqueue(cmdCap.capture(), eq(false));
assertThat(ImmutableListMultimap.copyOf(cmdCap.getValue().headers()))
.containsEntry(Utils.USER_AGENT, AsciiString.of("good agent"));
}
/**
* Create a transport connected to a fake peer for test.
*/
@VisibleForTesting
OkHttpClientTransport(
String userAgent,
Executor executor,
FrameReader frameReader,
FrameWriter testFrameWriter,
int nextStreamId,
Socket socket,
Supplier<Stopwatch> stopwatchFactory,
@Nullable Runnable connectingCallback,
SettableFuture<Void> connectedFuture,
int maxMessageSize,
int initialWindowSize,
Runnable tooManyPingsRunnable,
TransportTracer transportTracer) {
address = null;
this.maxMessageSize = maxMessageSize;
this.initialWindowSize = initialWindowSize;
defaultAuthority = "notarealauthority:80";
this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
this.executor = Preconditions.checkNotNull(executor, "executor");
serializingExecutor = new SerializingExecutor(executor);
this.testFrameReader = Preconditions.checkNotNull(frameReader, "frameReader");
this.testFrameWriter = Preconditions.checkNotNull(testFrameWriter, "testFrameWriter");
this.socket = Preconditions.checkNotNull(socket, "socket");
this.nextStreamId = nextStreamId;
this.stopwatchFactory = stopwatchFactory;
this.connectionSpec = null;
this.connectingCallback = connectingCallback;
this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
this.proxy = null;
this.tooManyPingsRunnable =
Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
this.maxInboundMetadataSize = Integer.MAX_VALUE;
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
initTransportTracer();
}
@Test
public void overrideDefaultUserAgent() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator(),
DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, "testUserAgent", true);
callMeMaybe(transport.start(clientTransportListener));
new Rpc(transport, new Metadata()).halfClose().waitForResponse();
// Verify that the received headers contained the User-Agent.
assertEquals(1, serverListener.streamListeners.size());
Metadata receivedHeaders = serverListener.streamListeners.get(0).headers;
assertEquals(GrpcUtil.getGrpcUserAgent("netty", "testUserAgent"),
receivedHeaders.get(USER_AGENT_KEY));
}
@VisibleForTesting
int getOverridenPort() {
URI uri = GrpcUtil.authorityToUri(defaultAuthority);
if (uri.getPort() != -1) {
return uri.getPort();
}
return address.getPort();
}
@Override
public void run() {
String threadName = Thread.currentThread().getName();
if (!GrpcUtil.IS_RESTRICTED_APPENGINE) {
Thread.currentThread().setName("OkHttpClientTransport");
}
try {
// Read until the underlying socket closes.
while (frameReader.nextFrame(this)) {
if (keepAliveManager != null) {
keepAliveManager.onDataReceived();
}
}
// frameReader.nextFrame() returns false when the underlying read encounters an IOException,
// it may be triggered by the socket closing, in such case, the startGoAway() will do
// nothing, otherwise, we finish all streams since it's a real IO issue.
startGoAway(0, ErrorCode.INTERNAL_ERROR,
Status.UNAVAILABLE.withDescription("End of stream or IOException"));
} catch (Throwable t) {
// TODO(madongfly): Send the exception message to the server.
startGoAway(
0,
ErrorCode.PROTOCOL_ERROR,
Status.INTERNAL.withDescription("error in frame handler").withCause(t));
} finally {
try {
frameReader.close();
} catch (IOException ex) {
log.log(Level.INFO, "Exception closing frame reader", ex);
}
listener.transportTerminated();
if (!GrpcUtil.IS_RESTRICTED_APPENGINE) {
// Restore the original thread name.
Thread.currentThread().setName(threadName);
}
}
}
@VisibleForTesting
@Nullable
SSLSocketFactory createSocketFactory() {
switch (negotiationType) {
case TLS:
try {
if (sslSocketFactory == null) {
SSLContext sslContext;
if (GrpcUtil.IS_RESTRICTED_APPENGINE) {
// The following auth code circumvents the following AccessControlException:
// access denied ("java.util.PropertyPermission" "javax.net.ssl.keyStore" "read")
// Conscrypt will attempt to load the default KeyStore if a trust manager is not
// provided, which is forbidden on AppEngine
sslContext = SSLContext.getInstance("TLS", Platform.get().getProvider());
TrustManagerFactory trustManagerFactory =
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init((KeyStore) null);
sslContext.init(
null,
trustManagerFactory.getTrustManagers(),
// Use an algorithm that doesn't need /dev/urandom
SecureRandom.getInstance("SHA1PRNG", Platform.get().getProvider()));
} else {
sslContext = SSLContext.getInstance("Default", Platform.get().getProvider());
}
sslSocketFactory = sslContext.getSocketFactory();
}
return sslSocketFactory;
} catch (GeneralSecurityException gse) {
throw new RuntimeException("TLS Provider failure", gse);
}
case PLAINTEXT:
return null;
default:
throw new RuntimeException("Unknown negotiation type: " + negotiationType);
}
}
/** Creates an XdsClient and starts a watch. */
public void createXdsClientAndStart() {
checkState(xdsClient == null, "start() called more than once");
Bootstrapper.BootstrapInfo bootstrapInfo;
List<Bootstrapper.ServerInfo> serverList;
try {
bootstrapInfo = Bootstrapper.getInstance().readBootstrap();
serverList = bootstrapInfo.getServers();
if (serverList.isEmpty()) {
throw new ManagementServerNotFoundException("No management server provided by bootstrap");
}
} catch (IOException | ManagementServerNotFoundException e) {
logger.log(Level.FINE, "Exception reading bootstrap", e);
logger.log(Level.INFO, "Fallback to plaintext for server at port {0}", port);
return;
}
Node node = bootstrapInfo.getNode();
timeService = SharedResourceHolder.get(timeServiceResource);
XdsClientImpl xdsClientImpl =
new XdsClientImpl(
"",
serverList,
XdsClient.XdsChannelFactory.getInstance(),
node,
createSynchronizationContext(),
timeService,
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER);
start(xdsClientImpl);
}
@Override
public void close() {
if (closed) {
return;
}
closed = true;
if (usingSharedScheduler) {
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService);
}
if (usingSharedExecutor) {
SharedResourceHolder.release(SHARED_EXECUTOR, executor);
}
}
private InProcessClientTransportFactory(
String name,
@Nullable ScheduledExecutorService scheduledExecutorService,
int maxInboundMetadataSize, boolean includeCauseWithStatus) {
this.name = name;
useSharedTimer = scheduledExecutorService == null;
timerService = useSharedTimer
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
this.maxInboundMetadataSize = maxInboundMetadataSize;
this.includeCauseWithStatus = includeCauseWithStatus;
}