下面列出了io.grpc.InternalStatus#io.grpc.Grpc 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/** Helper for asserting TLS info in SSLSession {@link io.grpc.ServerCall#getAttributes()} */
protected void assertX500SubjectDn(String tlsInfo) {
TestServiceGrpc.TestServiceBlockingStub stub =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);
stub.unaryCall(SimpleRequest.getDefaultInstance());
List<Certificate> certificates;
SSLSession sslSession =
serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
try {
certificates = Arrays.asList(sslSession.getPeerCertificates());
} catch (SSLPeerUnverifiedException e) {
// Should never happen
throw new AssertionError(e);
}
X509Certificate x509cert = (X509Certificate) certificates.get(0);
assertEquals(1, certificates.size());
assertEquals(tlsInfo, x509cert.getSubjectDN().toString());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
writeBufferedAndRemove(ctx);
grpcHandler.handleProtocolNegotiationCompleted(
Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
.build(),
/*securityInfo=*/ null);
} else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
fail(ctx, unavailableException("HTTP/2 upgrade rejected"));
}
super.userEventTriggered(ctx, evt);
}
@Test
public void clientStreamGetsAttributes() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator());
callMeMaybe(transport.start(clientTransportListener));
Rpc rpc = new Rpc(transport).halfClose();
rpc.waitForResponse();
assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION));
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
Attributes serverTransportAttrs = serverTransportAttributesList.poll(1, TimeUnit.SECONDS);
assertNotNull(serverTransportAttrs);
SocketAddress clientAddr = serverTransportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
assertNotNull(clientAddr);
assertEquals(clientAddr, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR));
}
@Override
public Authentication readAuthentication(final ServerCall<?, ?> call, final Metadata metadata) {
final SSLSession sslSession = call.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
if (sslSession == null) {
log.trace("Peer not verified via SSL");
return null;
}
Certificate[] certs;
try {
certs = sslSession.getPeerCertificates();
} catch (final SSLPeerUnverifiedException e) {
log.trace("Peer not verified via certificate", e);
return null;
}
return fromCertificate(certs[certs.length - 1]);
}
@Override
public Authentication readAuthentication(final ServerCall<?, ?> call, final Metadata metadata) {
final SSLSession sslSession = call.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
if (sslSession == null) {
log.trace("Peer not verified via SSL");
return null;
}
Certificate[] certs;
try {
certs = sslSession.getPeerCertificates();
} catch (final SSLPeerUnverifiedException e) {
log.trace("Peer not verified via certificate", e);
return null;
}
return fromCertificate(certs[certs.length - 1]);
}
private ServerInterceptor mutualTLSInterceptor(byte[] expectedClientCert, AtomicBoolean toggleHandshakeOccured) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
SSLSession sslSession = serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
try {
javax.security.cert.X509Certificate[] certChain = sslSession.getPeerCertificateChain();
Assert.assertFalse("Client didn't send TLS certificate", certChain == null || certChain.length == 0);
byte[] clientRawCert = certChain[0].getEncoded();
// Ensure the client TLS cert matches the expected one - the one it was created with
boolean equalCerts = Arrays.equals(clientRawCert, expectedClientCert);
Assert.assertTrue("Expected certificate doesn't match actual", equalCerts);
toggleHandshakeOccured.set(true);
} catch (Exception e) {
Assert.fail(String.format("Uncaught exception: %s", e.toString()));
e.printStackTrace();
}
return serverCallHandler.startCall(serverCall, metadata);
}
};
}
@Override
public Attributes transportReady(final Attributes attributes) {
if (logger.isDebugEnabled()) {
logger.debug("Ready attributes={}", attributes);
}
final InetSocketAddress remoteSocketAddress = (InetSocketAddress) attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (remoteSocketAddress == null) {
// Unauthenticated
logger.warn("Unauthenticated transport. TRANSPORT_ATTR_REMOTE_ADDR must not be null");
throw Status.INTERNAL.withDescription("RemoteAddress is null").asRuntimeException();
}
final InetAddress inetAddress = remoteSocketAddress.getAddress();
if (addressFilter.accept(inetAddress)) {
return attributes;
}
// Permission denied
logger.debug("Permission denied transport.");
throw Status.PERMISSION_DENIED.withDescription("invalid IP").asRuntimeException();
}
/** Helper for asserting TLS info in SSLSession {@link io.grpc.ServerCall#getAttributes()} */
protected void assertX500SubjectDn(String tlsInfo) {
TestServiceGrpc.TestServiceBlockingStub stub =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);
stub.unaryCall(SimpleRequest.getDefaultInstance());
List<Certificate> certificates;
SSLSession sslSession =
serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
try {
certificates = Arrays.asList(sslSession.getPeerCertificates());
} catch (SSLPeerUnverifiedException e) {
// Should never happen
throw new AssertionError(e);
}
X509Certificate x509cert = (X509Certificate) certificates.get(0);
assertEquals(1, certificates.size());
assertEquals(tlsInfo, x509cert.getSubjectDN().toString());
}
@Test
public void clientStreamGetsAttributes() throws Exception {
startServer();
NettyClientTransport transport = newTransport(newNegotiator());
callMeMaybe(transport.start(clientTransportListener));
Rpc rpc = new Rpc(transport).halfClose();
rpc.waitForResponse();
assertNotNull(rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION));
assertEquals(address, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
Attributes serverTransportAttrs = serverTransportAttributesList.poll(1, TimeUnit.SECONDS);
assertNotNull(serverTransportAttrs);
SocketAddress clientAddr = serverTransportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
assertNotNull(clientAddr);
assertEquals(clientAddr, rpc.stream.getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR));
}
private InProcessTransport(String name, int maxInboundMetadataSize, String authority,
String userAgent, Attributes eagAttrs,
Optional<ServerListener> optionalServerListener, boolean includeCauseWithStatus) {
this.name = name;
this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
this.authority = authority;
this.userAgent = GrpcUtil.getGrpcUserAgent("inprocess", userAgent);
checkNotNull(eagAttrs, "eagAttrs");
this.attributes = Attributes.newBuilder()
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
.set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name))
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name))
.build();
this.optionalServerListener = optionalServerListener;
logId = InternalLogId.allocate(getClass(), name);
this.includeCauseWithStatus = includeCauseWithStatus;
}
/** Helper for getting remote address from {@link io.grpc.ServerCall#getAttributes()} */
protected SocketAddress obtainRemoteClientAddr() {
TestServiceGrpc.TestServiceBlockingStub stub =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);
stub.unaryCall(SimpleRequest.getDefaultInstance());
return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
}
/** Helper for getting local address from {@link io.grpc.ServerCall#getAttributes()} */
protected SocketAddress obtainLocalClientAddr() {
TestServiceGrpc.TestServiceBlockingStub stub =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);
stub.unaryCall(SimpleRequest.getDefaultInstance());
return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
}
/**
* Create a server plaintext handler for gRPC.
*/
public static ProtocolNegotiator serverPlaintext() {
return new ProtocolNegotiator() {
@Override
public Handler newHandler(final GrpcHttp2ConnectionHandler handler) {
class PlaintextHandler extends ChannelHandlerAdapter implements Handler {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// Set sttributes before replace to be sure we pass it before accepting any requests.
handler.handleProtocolNegotiationCompleted(Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.build(),
/*securityInfo=*/ null);
// Just replace this handler with the gRPC handler.
ctx.pipeline().replace(this, null, handler);
}
@Override
public AsciiString scheme() {
return Utils.HTTP;
}
}
return new PlaintextHandler();
}
@Override
public void close() {}
};
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof SslHandshakeCompletionEvent) {
SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt;
if (handshakeEvent.isSuccess()) {
if (NEXT_PROTOCOL_VERSIONS.contains(sslHandler(ctx.pipeline()).applicationProtocol())) {
SSLSession session = sslHandler(ctx.pipeline()).engine().getSession();
// Successfully negotiated the protocol.
// Notify about completion and pass down SSLSession in attributes.
grpcHandler.handleProtocolNegotiationCompleted(
Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.build(),
new InternalChannelz.Security(new InternalChannelz.Tls(session)));
// Replace this handler with the GRPC handler.
ctx.pipeline().replace(this, null, grpcHandler);
} else {
fail(ctx, new Exception(
"Failed protocol negotiation: Unable to find compatible protocol."));
}
} else {
fail(ctx, handshakeEvent.cause());
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof SslHandshakeCompletionEvent) {
SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt;
if (handshakeEvent.isSuccess()) {
SslHandler handler = ctx.pipeline().get(SslHandler.class);
if (NEXT_PROTOCOL_VERSIONS.contains(handler.applicationProtocol())) {
// Successfully negotiated the protocol.
logSslEngineDetails(Level.FINER, ctx, "TLS negotiation succeeded.", null);
// Wait until negotiation is complete to add gRPC. If added too early, HTTP/2 writes
// will fail before we see the userEvent, and the channel is closed down prematurely.
ctx.pipeline().addBefore(ctx.name(), null, grpcHandler);
SSLSession session = handler.engine().getSession();
// Successfully negotiated the protocol.
// Notify about completion and pass down SSLSession in attributes.
grpcHandler.handleProtocolNegotiationCompleted(
Attributes.newBuilder()
.set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
.build(),
new InternalChannelz.Security(new InternalChannelz.Tls(session)));
writeBufferedAndRemove(ctx);
} else {
Exception ex = new Exception(
"Failed ALPN negotiation: Unable to find compatible protocol.");
logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed.", ex);
fail(ctx, ex);
}
} else {
fail(ctx, handshakeEvent.cause());
}
}
super.userEventTriggered(ctx, evt);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
writeBufferedAndRemove(ctx);
handler.handleProtocolNegotiationCompleted(
Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
.build(),
/*securityInfo=*/ null);
super.channelActive(ctx);
}
@Test
public void getPeerSocketTest() {
assertNull(getPeerSocket(Attributes.EMPTY));
assertSame(
peer,
getPeerSocket(Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build()));
}
@Test
public void peerPropagated() throws Exception {
doHandshake();
assertThat(grpcHandler.attrs.get(AltsProtocolNegotiator.TSI_PEER_KEY))
.isEqualTo(mockedTsiPeer);
assertThat(grpcHandler.attrs.get(AltsProtocolNegotiator.ALTS_CONTEXT_KEY))
.isEqualTo(mockedAltsContext);
assertThat(grpcHandler.attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR).toString())
.isEqualTo("embedded");
assertThat(grpcHandler.attrs.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR).toString())
.isEqualTo("embedded");
assertThat(grpcHandler.attrs.get(CallCredentials.ATTR_SECURITY_LEVEL))
.isEqualTo(SecurityLevel.PRIVACY_AND_INTEGRITY);
}
@Test
public void socketStats() throws Exception {
server.start(serverListener);
ManagedClientTransport client = newClientTransport(server);
startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
StreamCreation serverStreamCreation
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
SocketAddress serverAddress = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
SocketAddress clientAddress = serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
SocketStats clientSocketStats = client.getStats().get();
assertEquals(clientAddress, clientSocketStats.local);
assertEquals(serverAddress, clientSocketStats.remote);
// very basic sanity check that socket options are populated
assertNotNull(clientSocketStats.socketOptions.lingerSeconds);
assertTrue(clientSocketStats.socketOptions.others.containsKey("SO_SNDBUF"));
SocketStats serverSocketStats = serverTransportListener.transport.getStats().get();
assertEquals(serverAddress, serverSocketStats.local);
assertEquals(clientAddress, serverSocketStats.remote);
// very basic sanity check that socket options are populated
assertNotNull(serverSocketStats.socketOptions.lingerSeconds);
assertTrue(serverSocketStats.socketOptions.others.containsKey("SO_SNDBUF"));
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call,
final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {
final Context ctx = Context.current() //
.withValue(REMOTE_ADDRESS, call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
return Contexts.interceptCall(ctx, call, headers, next);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
// Extract the Span Context from the metadata of the gRPC request
Context extractedContext = textFormat.extract(Context.current(), headers, getter);
InetSocketAddress clientInfo =
(InetSocketAddress) call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
// Build a span based on the received context
try (Scope scope = ContextUtils.withScopedContext(extractedContext)) {
Span span =
tracer
.spanBuilder("helloworld.Greeter/SayHello")
.setSpanKind(Span.Kind.SERVER)
.startSpan();
span.setAttribute("component", "grpc");
span.setAttribute("rpc.service", "Greeter");
span.setAttribute("net.peer.ip", clientInfo.getHostString());
span.setAttribute("net.peer.port", clientInfo.getPort());
// Process the gRPC call normally
try {
span.setStatus(Status.OK);
return Contexts.interceptCall(Context.current(), call, headers, next);
} finally {
span.end();
}
}
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> serverCall, Metadata metadata,
ServerCallHandler<ReqT, RespT> serverCallHandler) {
accessLogger.info("gRPC [" +
serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR) +
"]: " + serverCall.getMethodDescriptor().getFullMethodName());
ServerCall.Listener<ReqT> delegate = serverCallHandler.startCall(serverCall, metadata);
return new SimpleForwardingServerCallListener<ReqT>(delegate) {
@Override
public void onHalfClose() {
try {
super.onHalfClose();
} catch (Exception e) {
logger.error("Caught an unexpected error.", e);
serverCall.close(Status.INTERNAL
.withCause(e)
.withDescription(e.toString() + "\n" + e.getMessage()),
new Metadata());
}
}
@Override
public void onMessage(ReqT request) {
accessLogger.info("Request Data: " + request);
super.onMessage(request);
}
};
}
@Override
public void set(Span span, Attributes attributes) {
SocketAddress address = attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (address instanceof InProcessSocketAddress) {
span.setTag(super.key, ((InProcessSocketAddress) address).getName());
} else if (address instanceof InetSocketAddress) {
final InetSocketAddress inetAddress = (InetSocketAddress) address;
span.setTag(super.key, inetAddress.getHostString() + ':' + inetAddress.getPort());
}
}
@Test
public void testPeerAddressSocket() {
final InetSocketAddress address =
new InetSocketAddress("127.0.0.1", ThreadLocalRandom.current().nextInt(65535));
final Attributes attributes =
Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address).build();
MockSpan span = new MockTracer().buildSpan("").start();
GrpcTags.PEER_ADDRESS.set(span, attributes);
assertThat(span.tags())
.containsOnly(
MapEntry.entry(
GrpcTags.PEER_ADDRESS.getKey(), address.getHostString() + ':' + address.getPort()));
}
@Test
public void testPeerAddressInProcess() {
final InProcessSocketAddress address = new InProcessSocketAddress(UUID.randomUUID().toString());
final Attributes attributes =
Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, address).build();
MockSpan span = new MockTracer().buildSpan("").start();
GrpcTags.PEER_ADDRESS.set(span, attributes);
assertThat(span.tags())
.containsOnly(MapEntry.entry(GrpcTags.PEER_ADDRESS.getKey(), address.getName()));
}
@Override
public void onClose(Status status, Metadata trailers) {
try {
SocketAddress remoteServer = clientCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
callOptions.getOption(GrpcCallOptions.CALLOPTIONS_CUSTOME_KEY)
.put(GrpcCallOptions.GRPC_CURRENT_ADDR_KEY, remoteServer);
} finally {
if (status.isOk()) {
statusOk(trailers);
} else {
statusError(status, trailers);
}
}
}
public TransportMetadata build(Attributes attributes) {
final InetSocketAddress remoteSocketAddress = (InetSocketAddress) attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
if (remoteSocketAddress == null) {
// Unauthenticated
throw Status.INTERNAL.withDescription("RemoteSocketAddress is null").asRuntimeException();
}
final long transportId = idGenerator.getAndIncrement();
final long connectedTime = System.currentTimeMillis();
return new DefaultTransportMetadata(debugString, remoteSocketAddress, transportId, connectedTime);
}
/** Helper for getting remote address from {@link io.grpc.ServerCall#getAttributes()} */
protected SocketAddress obtainRemoteClientAddr() {
TestServiceGrpc.TestServiceBlockingStub stub =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);
stub.unaryCall(SimpleRequest.getDefaultInstance());
return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
}
/** Helper for getting remote address from {@link io.grpc.ClientCall#getAttributes()} */
protected SocketAddress obtainRemoteServerAddr() {
TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
.withInterceptors(recordClientCallInterceptor(clientCallCapture))
.withDeadlineAfter(5, TimeUnit.SECONDS);
stub.unaryCall(SimpleRequest.getDefaultInstance());
return clientCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
}
/** Helper for getting local address from {@link io.grpc.ServerCall#getAttributes()} */
protected SocketAddress obtainLocalServerAddr() {
TestServiceGrpc.TestServiceBlockingStub stub =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);
stub.unaryCall(SimpleRequest.getDefaultInstance());
return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
}