下面列出了io.grpc.Metadata#get ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override public TraceContextOrSamplingFlags extract(R request) {
if (!(request instanceof GrpcServerRequest)) return delegate.extract(request);
Metadata metadata = ((GrpcClientRequest) request).headers;
// First, check if we are propagating gRPC tags.
TagsBin tagsBin = metadata.get(GRPC_TAGS_BIN);
// Next, check to see if there is a gRPC formatted trace context: use it if parsable.
byte[] bytes = metadata.get(GRPC_TRACE_BIN);
if (bytes != null) {
TraceContext maybeContext = TraceContextBinaryFormat.parseBytes(bytes, tagsBin);
if (maybeContext != null) return TraceContextOrSamplingFlags.create(maybeContext);
}
// Finally, try to extract an incoming, non-gRPC trace context. If tags exist, propagate them.
TraceContextOrSamplingFlags result = delegate.extract(request);
if (tagsBin == null) return result;
return result.toBuilder().addExtra(tagsBin).build();
}
private Context.CancellableContext createContext(
Metadata headers, StatsTraceContext statsTraceCtx) {
Long timeoutNanos = headers.get(TIMEOUT_KEY);
Context baseContext =
statsTraceCtx
.serverFilterContext(rootContext)
.withValue(io.grpc.InternalServer.SERVER_CONTEXT_KEY, ServerImpl.this);
if (timeoutNanos == null) {
return baseContext.withCancellation();
}
Context.CancellableContext context =
baseContext.withDeadline(
Deadline.after(timeoutNanos, NANOSECONDS, ticker),
transport.getScheduledExecutorService());
return context;
}
/**
* Extract the response status from trailers.
*/
private Status statusFromTrailers(Metadata trailers) {
Status status = trailers.get(InternalStatus.CODE_KEY);
if (status != null) {
return status.withDescription(trailers.get(InternalStatus.MESSAGE_KEY));
}
// No status; something is broken. Try to provide a resonanable error.
if (headersReceived) {
return Status.UNKNOWN.withDescription("missing GRPC status in response");
}
Integer httpStatus = trailers.get(HTTP2_STATUS);
if (httpStatus != null) {
status = GrpcUtil.httpStatusToGrpcStatus(httpStatus);
} else {
status = Status.INTERNAL.withDescription("missing HTTP status code");
}
return status.augmentDescription(
"missing GRPC status, inferred error from HTTP status code");
}
@Override
public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(ServerCall<REQUEST, RESPONSE> call,
Metadata headers, ServerCallHandler<REQUEST, RESPONSE> handler) {
final ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
String contextValue = headers.get(Metadata.Key.of(next.getHeadKey(), Metadata.ASCII_STRING_MARSHALLER));
if (!StringUtil.isEmpty(contextValue)) {
next.setHeadValue(contextValue);
}
}
final AbstractSpan span = ContextManager.createEntrySpan(OperationNameFormatUtil.formatOperationName(call.getMethodDescriptor()), contextCarrier);
span.setComponent(ComponentsDefine.GRPC);
span.setLayer(SpanLayer.RPC_FRAMEWORK);
try {
return new TracingServerCallListener<>(handler.startCall(new TracingServerCall<>(call, ContextManager.capture()), headers), call
.getMethodDescriptor(), ContextManager.capture());
} finally {
ContextManager.stopSpan();
}
}
@Override
void logTrailer(
long seq,
Status status,
Metadata metadata,
GrpcLogEntry.Logger logger,
long callId,
// null on server, can be non null on client if this is a trailer-only response
@Nullable SocketAddress peerAddress) {
Preconditions.checkArgument(
peerAddress == null || logger == GrpcLogEntry.Logger.LOGGER_CLIENT,
"peerSocket can only be specified for client");
MaybeTruncated<io.grpc.binarylog.v1.Metadata.Builder> pair
= createMetadataProto(metadata, maxHeaderBytes);
io.grpc.binarylog.v1.Trailer.Builder trailerBuilder
= io.grpc.binarylog.v1.Trailer.newBuilder()
.setStatusCode(status.getCode().value())
.setMetadata(pair.proto);
String statusDescription = status.getDescription();
if (statusDescription != null) {
trailerBuilder.setStatusMessage(statusDescription);
}
byte[] statusDetailBytes = metadata.get(STATUS_DETAILS_KEY);
if (statusDetailBytes != null) {
trailerBuilder.setStatusDetails(ByteString.copyFrom(statusDetailBytes));
}
GrpcLogEntry.Builder entryBuilder = newTimestampedBuilder()
.setSequenceIdWithinCall(seq)
.setType(EventType.EVENT_TYPE_SERVER_TRAILER)
.setTrailer(trailerBuilder)
.setPayloadTruncated(pair.truncated)
.setLogger(logger)
.setCallId(callId);
if (peerAddress != null) {
entryBuilder.setPeer(socketToProto(peerAddress));
}
sink.write(entryBuilder.build());
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
RequestMetadata meta = headers.get(TracingMetadataUtils.METADATA_KEY);
assertThat(meta.getCorrelatedInvocationsId()).isEqualTo("build-req-id");
assertThat(meta.getToolInvocationId()).isEqualTo("command-id");
assertThat(meta.getActionId()).isNotEmpty();
assertThat(meta.getToolDetails().getToolName()).isEqualTo("bazel");
assertThat(meta.getToolDetails().getToolVersion())
.isEqualTo(BlazeVersionInfo.instance().getVersion());
return next.startCall(call, headers);
}
ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method,
Metadata inboundHeaders, Context.CancellableContext context,
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
CallTracer serverCallTracer, Tag tag) {
this.stream = stream;
this.method = method;
this.context = context;
this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY);
this.decompressorRegistry = decompressorRegistry;
this.compressorRegistry = compressorRegistry;
this.serverCallTracer = serverCallTracer;
this.serverCallTracer.reportCallStarted();
this.tag = tag;
}
/**
* intercept point of call.
*
* @param serverCall call of server.
* @param metadata of call.
* @param serverCallHandler handler of call.
* @param <REQUEST> of call.
* @param <RESPONSE> of call.
* @return lister of call.
*/
@Override
public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(ServerCall<REQUEST, RESPONSE> serverCall,
Metadata metadata, ServerCallHandler<REQUEST, RESPONSE> serverCallHandler) {
String token = metadata.get(AUTH_HEAD_HEADER_NAME);
if (expectedToken.equals(token)) {
return serverCallHandler.startCall(serverCall, metadata);
} else {
serverCall.close(Status.PERMISSION_DENIED, new Metadata());
return listener;
}
}
@Test
public void changeMetadataKeyType() {
Metadata.Key<String> stringKey = Metadata.Key.of("key", Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<Long> longKey = Metadata.Key.of("key", MoreMetadata.LONG_MARSHALLER);
Metadata metadata = new Metadata();
metadata.put(stringKey, "12345");
Long bool = metadata.get(longKey);
assertThat(bool).isEqualTo(12345);
}
@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
TagContext parentCtx = headers.get(statsHeader);
if (parentCtx == null) {
parentCtx = tagger.empty();
}
TagValue methodTag = TagValue.create(fullMethodName);
parentCtx =
tagger
.toBuilder(parentCtx)
.putLocal(RpcMeasureConstants.GRPC_SERVER_METHOD, methodTag)
.build();
return new ServerTracer(CensusStatsModule.this, parentCtx);
}
@SuppressWarnings("ReferenceEquality")
@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
SpanContext remoteSpan = headers.get(tracingHeader);
if (remoteSpan == SpanContext.INVALID) {
remoteSpan = null;
}
return new ServerTracer(fullMethodName, remoteSpan);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
RequestMetadata meta = headers.get(TracingMetadataUtils.METADATA_KEY);
assertThat(meta.getCorrelatedInvocationsId()).isEqualTo("build-req-id");
assertThat(meta.getToolInvocationId()).isEqualTo("command-id");
assertThat(meta.getActionId()).isNotEmpty();
assertThat(meta.getToolDetails().getToolName()).isEqualTo("bazel");
assertThat(meta.getToolDetails().getToolVersion())
.isEqualTo(BlazeVersionInfo.instance().getVersion());
return next.startCall(call, headers);
}
@Test
public void responseTrailersContainAllReportedMetrics() {
applicationMetrics.put("cost1", 1231.4543);
applicationMetrics.put("cost2", 0.1367);
applicationMetrics.put("cost3", 7614.145);
ClientCalls.blockingUnaryCall(channelToUse, SIMPLE_METHOD, CallOptions.DEFAULT, REQUEST);
Metadata receivedTrailers = trailersCapture.get();
OrcaLoadReport report =
receivedTrailers.get(OrcaMetricReportingServerInterceptor.ORCA_ENDPOINT_LOAD_METRICS_KEY);
assertThat(report.getRequestCostMap())
.containsExactly("cost1", 1231.4543, "cost2", 0.1367, "cost3", 7614.145);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
String value = headers.get(CONTEXT_KEY);
SampleContext context;
if (value != null) {
context = new SampleContext(value);
} else {
context = CONTEXT_UNDEFINED;
}
return Contexts.interceptCall(Context.current().withValue(CALLER_ID_CONTEXT_KEY, context), call, headers, next);
}
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> serverCall,
Metadata metadata,
ServerCallHandler<ReqT, RespT> serverCallHandler) {
String token = metadata.get(Metadata.Key.of("token", ASCII_STRING_MARSHALLER));
System.out.println("Token: " + token);
boolean tokenIsValid = false;
try {
FirebaseToken firebaseToken = FirebaseAuth.getInstance().verifyIdToken(token);
System.err.println("Email for token: " + firebaseToken.getEmail());
// TODO: properly validate whether user has rights
//noinspection ConstantConditions
if (true) {
tokenIsValid = true;
}
} catch (Exception e) {
e.printStackTrace();
System.err.println("Was unable to parse token");
}
if (!tokenIsValid) {
serverCall.close(Status.UNAUTHENTICATED, metadata);
return new ServerCall.Listener<ReqT>() {};
} else {
return serverCallHandler.startCall(serverCall, metadata);
}
}
private static String getRequestId(Metadata responseHeaders, Metadata responseTrailers) {
if (responseHeaders != null && responseHeaders.containsKey(REQUEST_ID_HEADER_KEY)) {
return responseHeaders.get(REQUEST_ID_HEADER_KEY);
} else if (responseTrailers != null && responseTrailers.containsKey(REQUEST_ID_HEADER_KEY)) {
return responseTrailers.get(REQUEST_ID_HEADER_KEY);
} else {
return null;
}
}
@Override
public Authentication readAuthentication(final ServerCall<?, ?> call, final Metadata headers) {
final String header = headers.get(AUTHORIZATION_HEADER);
if (header == null || !header.toLowerCase().startsWith(PREFIX)) {
log.debug("No bearer auth header found");
return null;
}
// Cut away the "bearer " prefix
final String accessToken = header.substring(PREFIX_LENGTH);
// Not authenticated yet, token needs to be processed
return tokenWrapper.apply(accessToken);
}
/**
* Decides in current situation whether or not the RPC should retry and if it should retry how
* long the backoff should be. The decision does not take the commitment status into account, so
* caller should check it separately. It also updates the throttle. It does not change state.
*/
private RetryPlan makeRetryDecision(Status status, Metadata trailer) {
boolean shouldRetry = false;
long backoffNanos = 0L;
boolean isRetryableStatusCode = retryPolicy.retryableStatusCodes.contains(status.getCode());
boolean isNonFatalStatusCode = hedgingPolicy.nonFatalStatusCodes.contains(status.getCode());
if (isHedging && !isNonFatalStatusCode) {
// isFatal is true, no pushback
return new RetryPlan(/* shouldRetry = */ false, /* isFatal = */ true, 0, null);
}
String pushbackStr = trailer.get(GRPC_RETRY_PUSHBACK_MS);
Integer pushbackMillis = null;
if (pushbackStr != null) {
try {
pushbackMillis = Integer.valueOf(pushbackStr);
} catch (NumberFormatException e) {
pushbackMillis = -1;
}
}
boolean isThrottled = false;
if (throttle != null) {
if (isRetryableStatusCode || isNonFatalStatusCode
|| (pushbackMillis != null && pushbackMillis < 0)) {
isThrottled = !throttle.onQualifiedFailureThenCheckIsAboveThreshold();
}
}
if (retryPolicy.maxAttempts > substream.previousAttemptCount + 1 && !isThrottled) {
if (pushbackMillis == null) {
if (isRetryableStatusCode) {
shouldRetry = true;
backoffNanos = (long) (nextBackoffIntervalNanos * random.nextDouble());
nextBackoffIntervalNanos = Math.min(
(long) (nextBackoffIntervalNanos * retryPolicy.backoffMultiplier),
retryPolicy.maxBackoffNanos);
} // else no retry
} else if (pushbackMillis >= 0) {
shouldRetry = true;
backoffNanos = TimeUnit.MILLISECONDS.toNanos(pushbackMillis);
nextBackoffIntervalNanos = retryPolicy.initialBackoffNanos;
} // else no retry
} // else no retry
return new RetryPlan(
shouldRetry, /* isFatal = */ false, backoffNanos, isHedging ? pushbackMillis : null);
}
/**
* Extracts a {@link RequestMetadata} from a {@link Metadata} and returns it if it exists. If it
* does not exist, returns {@code null}.
*/
public static @Nullable RequestMetadata requestMetadataFromHeaders(Metadata headers) {
return headers.get(METADATA_KEY);
}
/**
* Extracts a {@link RequestMetadata} from a {@link Metadata} and returns it if it exists. If it
* does not exist, returns {@code null}.
*/
public static @Nullable RequestMetadata requestMetadataFromHeaders(Metadata headers) {
return headers.get(METADATA_KEY);
}