下面列出了怎么用io.grpc.Context的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Header headerObject;
try {
headerObject = headerReader.extract(headers);
} catch (Exception e) {
if (logger.isInfoEnabled()) {
logger.info("Header extract fail cause={}, method={} headers={}, attr={}",
e.getMessage(), call.getMethodDescriptor().getFullMethodName(), headers, call.getAttributes(), e);
}
call.close(Status.INVALID_ARGUMENT.withDescription(e.getMessage()), new Metadata());
return new ServerCall.Listener<ReqT>() {
};
}
final Context currentContext = Context.current();
final Context newContext = currentContext.withValue(contextKey, headerObject);
if (logger.isDebugEnabled()) {
logger.debug("headerPropagation method={}, headers={}, attr={}", call.getMethodDescriptor().getFullMethodName(), headers, call.getAttributes());
}
ServerCall.Listener<ReqT> contextPropagateInterceptor = Contexts.interceptCall(newContext, call, headers, next);
return contextPropagateInterceptor;
}
@Test
public void extract_InvalidTraceId_Size_SingleHeader() {
Map<String, String> invalidHeaders = new LinkedHashMap<>();
invalidHeaders.put(
B3Propagator.COMBINED_HEADER,
"abcdefghijklmnopabcdefghijklmnop"
+ "00"
+ "-"
+ SPAN_ID_BASE16
+ "-"
+ B3Propagator.TRUE_INT);
assertThat(
getSpanContext(
b3PropagatorSingleHeader.extract(Context.current(), invalidHeaders, getter)))
.isSameInstanceAs(SpanContext.getInvalid());
}
@Override
public <C> void inject(Context context, C carrier, HttpTextFormat.Setter<C> setter) {
Objects.requireNonNull(context, "context");
Objects.requireNonNull(setter, "setter");
Span span = TracingContextUtils.getSpanWithoutDefault(context);
if (span == null || !span.getContext().isValid()) {
return;
}
SpanContext spanContext = span.getContext();
char[] chars = new char[COMBINED_HEADER_SIZE];
spanContext.getTraceId().copyLowerBase16To(chars, 0);
chars[SPAN_ID_OFFSET - 1] = B3Propagator.COMBINED_HEADER_DELIMITER_CHAR;
spanContext.getSpanId().copyLowerBase16To(chars, SPAN_ID_OFFSET);
chars[SAMPLED_FLAG_OFFSET - 1] = B3Propagator.COMBINED_HEADER_DELIMITER_CHAR;
chars[SAMPLED_FLAG_OFFSET] =
spanContext.getTraceFlags().isSampled()
? B3Propagator.IS_SAMPLED
: B3Propagator.NOT_SAMPLED;
setter.set(carrier, B3Propagator.COMBINED_HEADER, new String(chars));
}
/**
* Construct a server.
*
* @param builder builder with configuration for server
* @param transportServer transport server that will create new incoming transports
* @param rootContext context that callbacks for new RPCs should be derived from
*/
ServerImpl(
AbstractServerImplBuilder<?> builder,
InternalServer transportServer,
Context rootContext) {
this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
this.fallbackRegistry =
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
// Fork from the passed in context so that it does not propagate cancellation, it only
// inherits values.
this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork();
this.decompressorRegistry = builder.decompressorRegistry;
this.compressorRegistry = builder.compressorRegistry;
this.transportFilters = Collections.unmodifiableList(
new ArrayList<>(builder.transportFilters));
this.interceptors =
builder.interceptors.toArray(new ServerInterceptor[builder.interceptors.size()]);
this.handshakeTimeoutMillis = builder.handshakeTimeoutMillis;
this.binlog = builder.binlog;
this.channelz = builder.channelz;
this.serverCallTracer = builder.callTracerFactory.create();
channelz.addServer(this);
}
/** Never returns {@code null}. */
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
Context.CancellableContext context, StatsTraceContext statsTraceCtx) {
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
statsTraceCtx.serverCallStarted(
new ServerCallInfoImpl<ReqT, RespT>(
methodDef.getMethodDescriptor(), // notify with original method descriptor
stream.getAttributes(),
stream.getAuthority()));
ServerCallHandler<ReqT, RespT> handler = methodDef.getServerCallHandler();
for (ServerInterceptor interceptor : interceptors) {
handler = InternalServerInterceptors.interceptCallHandler(interceptor, handler);
}
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
ServerMethodDefinition<?, ?> wMethodDef = binlog == null
? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context);
}
@Test
public void extract_SampledContext_Short_TraceId() {
Map<String, String> carrier = new LinkedHashMap<>();
JaegerSpanContext context =
new JaegerSpanContext(
SHORT_TRACE_ID_HI,
SHORT_TRACE_ID_LOW,
SPAN_ID_LONG,
DEPRECATED_PARENT_SPAN_LONG,
(byte) 1);
carrier.put(PROPAGATION_HEADER, TextMapCodec.contextAsString(context));
assertThat(getSpanContext(jaegerPropagator.extract(Context.current(), carrier, getter)))
.isEqualTo(
SpanContext.createFromRemoteParent(
SHORT_TRACE_ID, SPAN_ID, SAMPLED_TRACE_OPTIONS, TRACE_STATE_DEFAULT));
}
@Test
public void inject_SampledContext_nullCarrierUsage() {
final Map<String, String> carrier = new LinkedHashMap<>();
jaegerPropagator.inject(
withSpanContext(
SpanContext.create(TRACE_ID, SPAN_ID, SAMPLED_TRACE_OPTIONS, TRACE_STATE_DEFAULT),
Context.current()),
null,
(Setter<Map<String, String>>) (ignored, key, value) -> carrier.put(key, value));
assertThat(carrier)
.containsEntry(
PROPAGATION_HEADER,
generateTraceIdHeaderValue(
TRACE_ID_BASE16, SPAN_ID_BASE16, DEPRECATED_PARENT_SPAN, "1"));
}
@Test
public void halfClosed_runtimeExceptionCancelsCall() {
JumpToApplicationThreadServerStreamListener listener
= new JumpToApplicationThreadServerStreamListener(
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
RuntimeException expectedT = new RuntimeException();
doThrow(expectedT).when(mockListener).halfClosed();
listener.halfClosed();
try {
executor.runDueTasks();
fail("Expected exception");
} catch (RuntimeException t) {
assertSame(expectedT, t);
ensureServerStateNotLeaked();
}
}
@Test
public void onReady_runtimeExceptionCancelsCall() {
JumpToApplicationThreadServerStreamListener listener
= new JumpToApplicationThreadServerStreamListener(
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation());
ServerStreamListener mockListener = mock(ServerStreamListener.class);
listener.setListener(mockListener);
RuntimeException expectedT = new RuntimeException();
doThrow(expectedT).when(mockListener).onReady();
listener.onReady();
try {
executor.runDueTasks();
fail("Expected exception");
} catch (RuntimeException t) {
assertSame(expectedT, t);
ensureServerStateNotLeaked();
}
}
@Test
public void traceHeadersPropagateSpanContext() throws Exception {
CensusTracingModule.ClientCallTracer callTracer =
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
Metadata headers = new Metadata();
callTracer.newClientStreamTracer(STREAM_INFO, headers);
verify(mockTracingPropagationHandler).toByteArray(same(fakeClientSpanContext));
verifyNoMoreInteractions(mockTracingPropagationHandler);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
verify(spyClientSpanBuilder).setRecordEvents(eq(true));
verifyNoMoreInteractions(tracer);
assertTrue(headers.containsKey(censusTracing.tracingHeader));
ServerStreamTracer serverTracer =
censusTracing.getServerTracerFactory().newServerStreamTracer(
method.getFullMethodName(), headers);
verify(mockTracingPropagationHandler).fromByteArray(same(binarySpanContext));
verify(tracer).spanBuilderWithRemoteParent(
eq("Recv.package1.service2.method3"), same(spyClientSpan.getContext()));
verify(spyServerSpanBuilder).setRecordEvents(eq(true));
Context filteredContext = serverTracer.filterContext(Context.ROOT);
assertSame(spyServerSpan, ContextUtils.getValue(filteredContext));
}
@Nullable
public SpanContextShim extractTextFormat(TextMapExtract carrier) {
Map<String, String> carrierMap = new HashMap<>();
for (Map.Entry<String, String> entry : carrier) {
carrierMap.put(entry.getKey(), entry.getValue());
}
Context context =
propagators()
.getHttpTextFormat()
.extract(Context.current(), carrierMap, TextMapGetter.INSTANCE);
io.opentelemetry.trace.Span span = TracingContextUtils.getSpan(context);
if (!span.getContext().isValid()) {
return null;
}
return new SpanContextShim(
telemetryInfo, span.getContext(), CorrelationsContextUtils.getCorrelationContext(context));
}
private void requestObjectMedia(OptionalInt bytesToRead) throws IOException {
GetObjectMediaRequest.Builder requestBuilder =
GetObjectMediaRequest.newBuilder()
.setBucket(bucketName)
.setObject(objectName)
.setGeneration(objectGeneration)
.setReadOffset(position);
if (bytesToRead.isPresent()) {
requestBuilder.setReadLimit(bytesToRead.getAsInt());
}
GetObjectMediaRequest request = requestBuilder.build();
try {
requestContext = Context.current().withCancellation();
Context toReattach = requestContext.attach();
try {
resIterator = stub.getObjectMedia(request);
} finally {
requestContext.detach(toReattach);
}
} catch (StatusRuntimeException e) {
throw convertError(e, bucketName, objectName);
}
}
@Override
public void union(final UnionRequest request, final StreamObserver<UnionResponse> responseObserver) {
if (Context.current().isCancelled()) {
sendCancelledError(responseObserver, Context.current().cancellationCause());
return;
}
try {
final UnionResponse.Builder responseBuilder = UnionResponse.newBuilder();
final Map<String, Long> results = getSets().union(
request.getNamespace(),
request.getSetsList(),
request.getMin(),
request.getMax(),
request.getStart(),
request.getCount(),
request.getAscending()
);
if (!results.isEmpty()) {
responseBuilder.putAllEntries(results);
}
sendResponse(responseObserver, responseBuilder.build());
} catch (IOException e) {
sendError(responseObserver, e);
}
}
/** Benchmark for measuring HttpTraceContext extract. */
@Benchmark
@BenchmarkMode({Mode.AverageTime})
@Fork(1)
@Measurement(iterations = 15, time = 1)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 1)
@OperationsPerInvocation(COUNT)
@Nullable
public Context measureExtract() {
Context result = null;
for (int i = 0; i < COUNT; i++) {
result = httpTraceContext.extract(Context.ROOT, carriers.get(i), getter);
}
return result;
}
private void process(Message message) {
Context context =
OpenTelemetry.getPropagators()
.getHttpTextFormat()
.extract(
Context.current(),
message,
new Getter<Message>() {
@Nullable
@Override
public String get(Message carrier, String key) {
return carrier.get(key);
}
});
SpanContext spanContext = TracingContextUtils.getSpan(context).getContext();
Span span =
tracer.spanBuilder("receive").setSpanKind(Kind.SERVER).setParent(spanContext).startSpan();
span.setAttribute("component", "example-server");
try (Scope ignored = tracer.withSpan(span)) {
// Simulate work.
tracer.getCurrentSpan().addEvent("DoWork");
} finally {
span.end();
}
}
@Test
public void inject_NotSampledContext() {
Map<String, String> carrier = new LinkedHashMap<>();
b3Propagator.inject(
withSpanContext(
SpanContext.create(TRACE_ID, SPAN_ID, TraceFlags.getDefault(), TRACE_STATE_DEFAULT),
Context.current()),
carrier,
setter);
assertThat(carrier).containsEntry(B3Propagator.TRACE_ID_HEADER, TRACE_ID_BASE16);
assertThat(carrier).containsEntry(B3Propagator.SPAN_ID_HEADER, SPAN_ID_BASE16);
assertThat(carrier).containsEntry(B3Propagator.SAMPLED_HEADER, "0");
}
@Test
public void extract_InvalidSpanId() {
Map<String, String> invalidHeaders = new LinkedHashMap<>();
invalidHeaders.put(
TRACE_HEADER_KEY,
"Root=1-8a3c60f7-d188f8fa79d48a391a778fa6;Parent=abcdefghijklmnop;Sampled=0");
assertThat(getSpanContext(xrayPropagator.extract(Context.current(), invalidHeaders, getter)))
.isSameInstanceAs(SpanContext.getInvalid());
}
@Test
public void extract_InvalidSpanId() {
Map<String, String> invalidHeaders = new LinkedHashMap<>();
invalidHeaders.put(
PROPAGATION_HEADER,
generateTraceIdHeaderValue(
TRACE_ID_BASE16, "abcdefghijklmnop", DEPRECATED_PARENT_SPAN, "0"));
assertThat(getSpanContext(jaegerPropagator.extract(Context.current(), invalidHeaders, getter)))
.isSameInstanceAs(SpanContext.getInvalid());
}
@Override
public void drop(final DropRequest request, final StreamObserver<VoidResponse> responseObserver) {
if (Context.current().isCancelled()) {
sendCancelledError(responseObserver, Context.current().cancellationCause());
return;
}
try {
getObjects().drop(request.getNamespace());
sendResponse(responseObserver, VoidResponse.getDefaultInstance());
} catch (IOException e) {
sendError(responseObserver, e);
}
}
@Override
public void watch(HealthCheckRequest request,
StreamObserver<HealthCheckResponse> responseObserver) {
final ServerSideCall call = new ServerSideCall(request, responseObserver);
Context.current().addListener(
new CancellationListener() {
@Override
public void cancelled(Context ctx) {
call.cancelled = true;
}
}, MoreExecutors.directExecutor());
calls.add(call);
}
@Override
public void inc(final IncRequest request, final StreamObserver<IncResponse> responseObserver) {
if (Context.current().isCancelled()) {
sendCancelledError(responseObserver, Context.current().cancellationCause());
return;
}
try {
final long result = getSets().inc(request.getNamespace(), request.getSet(), request.getEntry(), request.getCount());
sendResponse(responseObserver, IncResponse.newBuilder().setResult(result).build());
} catch (IOException e) {
sendError(responseObserver, e);
}
}
@Override
public <C> Context extract(Context context, C carrier, Getter<C> getter) {
for (int i = 0; i < textPropagators.length; i++) {
context = textPropagators[i].extract(context, carrier, getter);
}
return context;
}
@Test
public void extract_InvalidTracestate_EntriesDelimiter() {
Map<String, String> invalidHeaders = new HashMap<>();
invalidHeaders.put(TRACE_PARENT, "00-" + TRACE_ID_BASE16 + "-" + SPAN_ID_BASE16 + "-01");
invalidHeaders.put(TRACE_STATE, "foo=bar;test=test");
assertThat(getSpanContext(httpTraceContext.extract(Context.current(), invalidHeaders, getter)))
.isEqualTo(
SpanContext.createFromRemoteParent(
TRACE_ID, SPAN_ID, SAMPLED_TRACE_OPTIONS, TRACE_STATE_DEFAULT));
}
@Nullable
private static Span parentSpan(ParentType parentType, Span explicitParent) {
switch (parentType) {
case CURRENT_CONTEXT:
return TracingContextUtils.getSpanWithoutDefault(Context.current());
case EXPLICIT_PARENT:
return explicitParent;
default:
return null;
}
}
private TagContext getResultOfGetCurrentTagContext(TagContext tagsToSet) {
Context orig = ContextUtils.withValue(Context.current(), tagsToSet).attach();
try {
return tagger.getCurrentTagContext();
} finally {
Context.current().detach(orig);
}
}
private void withCancellation(
ServerCallStreamObserver<Operation> serverCallStreamObserver, ListenableFuture<Void> future) {
addCallback(
future,
new FutureCallback<Void>() {
boolean isCancelled() {
return serverCallStreamObserver.isCancelled() || Context.current().isCancelled();
}
@Override
public void onSuccess(Void result) {
if (!isCancelled()) {
try {
serverCallStreamObserver.onCompleted();
} catch (Exception e) {
onFailure(e);
}
}
}
@Override
public void onFailure(Throwable t) {
if (!isCancelled() && !(t instanceof CancellationException)) {
logger.log(Level.WARNING, "error occurred during execution", t);
serverCallStreamObserver.onError(Status.fromThrowable(t).asException());
}
}
},
Context.current().fixedContextExecutor(directExecutor()));
serverCallStreamObserver.setOnCancelHandler(() -> future.cancel(false));
}
@Test
public void inject_SampledContext_nullCarrierUsage() {
final Map<String, String> carrier = new LinkedHashMap<>();
b3Propagator.inject(
withSpanContext(
SpanContext.create(TRACE_ID, SPAN_ID, SAMPLED_TRACE_OPTIONS, TRACE_STATE_DEFAULT),
Context.current()),
null,
(Setter<Map<String, String>>) (ignored, key, value) -> carrier.put(key, value));
assertThat(carrier).containsEntry(B3Propagator.TRACE_ID_HEADER, TRACE_ID_BASE16);
assertThat(carrier).containsEntry(B3Propagator.SPAN_ID_HEADER, SPAN_ID_BASE16);
assertThat(carrier).containsEntry(B3Propagator.SAMPLED_HEADER, "1");
}
@Test(timeout = 60000)
public void currentContextExecutor() throws Exception {
final Thread callerThread = Thread.currentThread();
final Context context = Context.current().withValue(KEY, "myvalue");
previousContext = context.attach();
final Semaphore tested = new Semaphore(0);
Context.currentContextExecutor(executor)
.execute(
new Runnable() {
@Override
public void run() {
StackTraceElement[] ste = new Exception().fillInStackTrace().getStackTrace();
assertThat(ste[0].getClassName()).doesNotContain("Context");
assertThat(ste[1].getClassName()).startsWith("io.grpc.Context$");
// NB: Actually, we want the Runnable to be wrapped only once, but currently it is
// still wrapped twice. The two places where the Runnable is wrapped are: (1) the
// executor implementation itself, e.g. ThreadPoolExecutor, to which the Agent added
// automatic context propagation, (2) CurrentContextExecutor.
// ExecutorInstrumentation already avoids adding the automatic context propagation
// to CurrentContextExecutor, but does not make it a no-op yet. Also see
// ExecutorInstrumentation#createMatcher.
assertThat(ste[2].getClassName()).startsWith("io.grpc.Context$");
assertThat(ste[3].getClassName()).doesNotContain("Context");
assertThat(Thread.currentThread()).isNotSameInstanceAs(callerThread);
assertThat(Context.current()).isSameInstanceAs(context);
assertThat(KEY.get()).isEqualTo("myvalue");
tested.release();
}
});
tested.acquire();
}
@Test
public void extract_SampledContext() {
Map<String, String> carrier = new LinkedHashMap<>();
carrier.put(TRACE_PARENT, TRACEPARENT_HEADER_SAMPLED);
assertThat(getSpanContext(httpTraceContext.extract(Context.current(), carrier, getter)))
.isEqualTo(
SpanContext.createFromRemoteParent(
TRACE_ID, SPAN_ID, SAMPLED_TRACE_OPTIONS, TRACE_STATE_DEFAULT));
}
@Test
public void extract_InvalidTraceFlags_Size() {
Map<String, String> invalidHeaders = new HashMap<>();
invalidHeaders.put(TRACE_PARENT, "00-" + TRACE_ID_BASE16 + "-" + SPAN_ID_BASE16 + "-0100");
assertThat(getSpanContext(httpTraceContext.extract(Context.current(), invalidHeaders, getter)))
.isSameInstanceAs(SpanContext.getInvalid());
}