下面列出了 io.netty.handler.codec.http2.Http2FrameReader #io.grpc.ServerStreamTracer 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
@CheckReturnValue
protected NettyServer buildTransportServer(
List<ServerStreamTracer.Factory> streamTracerFactories) {
ProtocolNegotiator negotiator = protocolNegotiator;
if (negotiator == null) {
negotiator = sslContext != null ? ProtocolNegotiators.serverTls(sslContext) :
ProtocolNegotiators.serverPlaintext();
}
return new NettyServer(
address, channelType, channelOptions, bossEventLoopGroup, workerEventLoopGroup,
negotiator, streamTracerFactories, transportTracerFactory,
maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, channelz);
}
@Test
public void getPort_notStarted() throws Exception {
InetSocketAddress addr = new InetSocketAddress(0);
NettyServer ns = new NettyServer(
addr,
NioServerSocketChannel.class,
new HashMap<ChannelOption<?>, Object>(),
null, // no boss group
null, // no event group
new ProtocolNegotiators.PlaintextNegotiator(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
1, // ignore
1, // ignore
1, // ignore
1, // ignore
1, // ignore
1, 1, // ignore
1, 1, // ignore
true, 0, // ignore
channelz);
assertThat(ns.getPort()).isEqualTo(-1);
}
@VisibleForTesting
final List<ServerStreamTracer.Factory> getTracerFactories() {
ArrayList<ServerStreamTracer.Factory> tracerFactories =
new ArrayList<ServerStreamTracer.Factory>();
if (statsEnabled) {
CensusStatsModule censusStats = this.censusStatsOverride;
if (censusStats == null) {
censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true);
}
tracerFactories.add(
censusStats.getServerTracerFactory(recordStartedRpcs, recordFinishedRpcs));
}
if (tracingEnabled) {
CensusTracingModule censusTracing =
new CensusTracingModule(Tracing.getTracer(),
Tracing.getPropagationComponent().getBinaryFormat());
tracerFactories.add(censusTracing.getServerTracerFactory());
}
tracerFactories.addAll(streamTracerFactories);
return tracerFactories;
}
@Test
public void traceHeadersPropagateSpanContext() throws Exception {
CensusTracingModule.ClientCallTracer callTracer =
censusTracing.newClientCallTracer(fakeClientParentSpan, method);
Metadata headers = new Metadata();
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
verify(mockTracingPropagationHandler).toByteArray(same(fakeClientSpanContext));
verifyNoMoreInteractions(mockTracingPropagationHandler);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), same(fakeClientParentSpan));
verify(spyClientSpanBuilder).setRecordEvents(eq(true));
verifyNoMoreInteractions(tracer);
assertTrue(headers.containsKey(censusTracing.tracingHeader));
ServerStreamTracer serverTracer =
censusTracing.getServerTracerFactory().newServerStreamTracer(
method.getFullMethodName(), headers);
verify(mockTracingPropagationHandler).fromByteArray(same(binarySpanContext));
verify(tracer).spanBuilderWithRemoteParent(
eq("Recv.package1.service2.method3"), same(spyClientSpan.getContext()));
verify(spyServerSpanBuilder).setRecordEvents(eq(true));
Context filteredContext = serverTracer.filterContext(Context.ROOT);
assertSame(spyServerSpan, ContextUtils.CONTEXT_SPAN_KEY.get(filteredContext));
}
@Test
public void serverTracingSampledToLocalSpanStore() {
ServerStreamTracer.Factory tracerFactory = censusTracing.getServerTracerFactory();
ServerStreamTracer serverStreamTracer =
tracerFactory.newServerStreamTracer(sampledMethod.getFullMethodName(), new Metadata());
serverStreamTracer.filterContext(Context.ROOT);
serverStreamTracer.serverCallStarted(
new ServerCallInfoImpl<String, String>(sampledMethod, Attributes.EMPTY, null));
serverStreamTracer.streamClosed(Status.CANCELLED);
verify(spyServerSpan).end(
EndSpanOptions.builder()
.setStatus(io.opencensus.trace.Status.CANCELLED)
.setSampleToLocalSpanStore(true)
.build());
}
@Test
public void serverTracingSampledToLocalSpanStore() {
ServerStreamTracer.Factory tracerFactory = censusTracing.getServerTracerFactory();
ServerStreamTracer serverStreamTracer =
tracerFactory.newServerStreamTracer(sampledMethod.getFullMethodName(), new Metadata());
serverStreamTracer.filterContext(Context.ROOT);
serverStreamTracer.serverCallStarted(
new CallInfo<>(sampledMethod, Attributes.EMPTY, null));
serverStreamTracer.streamClosed(Status.CANCELLED);
verify(spyServerSpan).end(
EndSpanOptions.builder()
.setStatus(io.opencensus.trace.Status.CANCELLED)
.setSampleToLocalSpanStore(true)
.build());
}
/**
* Creates a new InProcessTransport.
*
* <p>When started, the transport will be registered with the given
* {@link ServerListener}.
*/
@Internal
public static ConnectionClientTransport createInProcessTransport(
String name,
int maxInboundMetadataSize,
String authority,
String userAgent,
Attributes eagAttrs,
ObjectPool<ScheduledExecutorService> serverSchedulerPool,
List<ServerStreamTracer.Factory> serverStreamTracerFactories,
ServerListener serverListener) {
return new InProcessTransport(
name,
maxInboundMetadataSize,
authority,
userAgent,
eagAttrs,
serverSchedulerPool,
serverStreamTracerFactories,
serverListener);
}
@Override
protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
return AccessProtectedHack.serverBuilderBuildTransportServer(
NettyServerBuilder
.forPort(0)
.flowControlWindow(65 * 1024),
streamTracerFactories,
fakeClockTransportTracer);
}
@Override
protected InternalServer newServer(
InternalServer server, List<ServerStreamTracer.Factory> streamTracerFactories) {
int port = server.getPort();
return AccessProtectedHack.serverBuilderBuildTransportServer(
NettyServerBuilder
.forPort(port)
.flowControlWindow(65 * 1024),
streamTracerFactories,
fakeClockTransportTracer);
}
public static InternalServer serverBuilderBuildTransportServer(
AbstractServerImplBuilder<?> builder,
List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory) {
builder.transportTracerFactory = transportTracerFactory;
return builder.buildTransportServer(streamTracerFactories);
}
@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
ServerStreamTracerInfo.InteropServerStreamTracer tracer
= new ServerStreamTracerInfo.InteropServerStreamTracer();
serverStreamTracers.add(new ServerStreamTracerInfo(fullMethodName, tracer));
return tracer;
}
NettyServerTransport(
Channel channel,
ChannelPromise channelUnused,
ProtocolNegotiator protocolNegotiator,
List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer transportTracer,
int maxStreams,
int flowControlWindow,
int maxMessageSize,
int maxHeaderListSize,
long keepAliveTimeInNanos,
long keepAliveTimeoutInNanos,
long maxConnectionIdleInNanos,
long maxConnectionAgeInNanos,
long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls,
long permitKeepAliveTimeInNanos) {
this.channel = Preconditions.checkNotNull(channel, "channel");
this.channelUnused = channelUnused;
this.protocolNegotiator = Preconditions.checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories =
Preconditions.checkNotNull(streamTracerFactories, "streamTracerFactories");
this.transportTracer = Preconditions.checkNotNull(transportTracer, "transportTracer");
this.maxStreams = maxStreams;
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
this.keepAliveTimeInNanos = keepAliveTimeInNanos;
this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
this.maxConnectionIdleInNanos = maxConnectionIdleInNanos;
this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
}
NettyServer(
SocketAddress address, Class<? extends ServerChannel> channelType,
Map<ChannelOption<?>, ?> channelOptions,
@Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
int maxStreamsPerConnection, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
long maxConnectionIdleInNanos,
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
InternalChannelz channelz) {
this.address = address;
this.channelType = checkNotNull(channelType, "channelType");
checkNotNull(channelOptions, "channelOptions");
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.usingSharedBossGroup = bossGroup == null;
this.usingSharedWorkerGroup = workerGroup == null;
this.transportTracerFactory = transportTracerFactory;
this.maxStreamsPerConnection = maxStreamsPerConnection;
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
this.keepAliveTimeInNanos = keepAliveTimeInNanos;
this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
this.maxConnectionIdleInNanos = maxConnectionIdleInNanos;
this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
this.channelz = Preconditions.checkNotNull(channelz);
}
@Override
protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
return NettyServerBuilder
.forPort(0)
.flowControlWindow(65 * 1024)
.setTransportTracerFactory(fakeClockTransportTracer)
.buildTransportServer(streamTracerFactories);
}
@Override
protected InternalServer newServer(
InternalServer server, List<ServerStreamTracer.Factory> streamTracerFactories) {
int port = server.getPort();
return NettyServerBuilder
.forPort(port)
.flowControlWindow(65 * 1024)
.setTransportTracerFactory(fakeClockTransportTracer)
.buildTransportServer(streamTracerFactories);
}
InProcessServer(
InProcessServerBuilder builder,
List<ServerStreamTracer.Factory> streamTracerFactories) {
this.name = builder.name;
this.schedulerPool = builder.schedulerPool;
this.maxInboundMetadataSize = builder.maxInboundMetadataSize;
this.streamTracerFactories =
Collections.unmodifiableList(checkNotNull(streamTracerFactories, "streamTracerFactories"));
}
/**
* Factory method for the server-side.
*/
public static StatsTraceContext newServerContext(
List<ServerStreamTracer.Factory> factories, String fullMethodName, Metadata headers) {
if (factories.isEmpty()) {
return NOOP;
}
StreamTracer[] tracers = new StreamTracer[factories.size()];
for (int i = 0; i < tracers.length; i++) {
tracers[i] = factories.get(i).newServerStreamTracer(fullMethodName, headers);
}
return new StatsTraceContext(tracers);
}
/**
* See {@link ServerStreamTracer#filterContext}. For server-side only.
*
* <p>Called from {@link io.grpc.internal.ServerImpl}.
*/
public <ReqT, RespT> Context serverFilterContext(Context context) {
Context ctx = checkNotNull(context, "context");
for (StreamTracer tracer : tracers) {
ctx = ((ServerStreamTracer) tracer).filterContext(ctx);
checkNotNull(ctx, "%s returns null context", tracer);
}
return ctx;
}
@SuppressWarnings("ReferenceEquality")
@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
SpanContext remoteSpan = headers.get(tracingHeader);
if (remoteSpan == SpanContext.INVALID) {
remoteSpan = null;
}
return new ServerTracer(fullMethodName, remoteSpan);
}
@Test
public void getTracerFactories_disableTracing() {
builder.addStreamTracerFactory(DUMMY_USER_TRACER);
builder.setTracingEnabled(false);
List<? extends ServerStreamTracer.Factory> factories = builder.getTracerFactories();
assertEquals(2, factories.size());
assertThat(factories.get(0).getClass().getName())
.isEqualTo("io.grpc.census.CensusStatsModule$ServerTracerFactory");
assertThat(factories.get(1)).isSameInstanceAs(DUMMY_USER_TRACER);
}
@Test
public void getPort_notStarted() throws Exception {
InProcessServer s =
new InProcessServer(builder, Collections.<ServerStreamTracer.Factory>emptyList());
Truth.assertThat(s.getPort()).isEqualTo(-1);
}
/**
* Factory method for the server-side.
*/
public static StatsTraceContext newServerContext(
List<? extends ServerStreamTracer.Factory> factories,
String fullMethodName,
Metadata headers) {
if (factories.isEmpty()) {
return NOOP;
}
StreamTracer[] tracers = new StreamTracer[factories.size()];
for (int i = 0; i < tracers.length; i++) {
tracers[i] = factories.get(i).newServerStreamTracer(fullMethodName, headers);
}
return new StatsTraceContext(tracers);
}
@Test
public void getTracerFactories_default() {
builder.addStreamTracerFactory(DUMMY_USER_TRACER);
List<ServerStreamTracer.Factory> factories = builder.getTracerFactories();
assertEquals(3, factories.size());
assertThat(factories.get(0)).isInstanceOf(CensusStatsModule.ServerTracerFactory.class);
assertThat(factories.get(1)).isInstanceOf(CensusTracingModule.ServerTracerFactory.class);
assertThat(factories.get(2)).isSameAs(DUMMY_USER_TRACER);
}
@Test
public void getTracerFactories_disableStats() {
builder.addStreamTracerFactory(DUMMY_USER_TRACER);
builder.setStatsEnabled(false);
List<ServerStreamTracer.Factory> factories = builder.getTracerFactories();
assertEquals(2, factories.size());
assertThat(factories.get(0)).isInstanceOf(CensusTracingModule.ServerTracerFactory.class);
assertThat(factories.get(1)).isSameAs(DUMMY_USER_TRACER);
}
@Test
public void getTracerFactories_disableTracing() {
builder.addStreamTracerFactory(DUMMY_USER_TRACER);
builder.setTracingEnabled(false);
List<ServerStreamTracer.Factory> factories = builder.getTracerFactories();
assertEquals(2, factories.size());
assertThat(factories.get(0)).isInstanceOf(CensusStatsModule.ServerTracerFactory.class);
assertThat(factories.get(1)).isSameAs(DUMMY_USER_TRACER);
}
@Test
public void getTracerFactories_disableStats() {
builder.addStreamTracerFactory(DUMMY_USER_TRACER);
builder.setStatsEnabled(false);
List<? extends ServerStreamTracer.Factory> factories = builder.getTracerFactories();
assertEquals(2, factories.size());
assertThat(factories.get(0).getClass().getName())
.isEqualTo("io.grpc.census.CensusTracingModule$ServerTracerFactory");
assertThat(factories.get(1)).isSameInstanceAs(DUMMY_USER_TRACER);
}
/**
* See {@link ServerStreamTracer#filterContext}. For server-side only.
*
* <p>Called from {@link io.grpc.internal.ServerImpl}.
*/
public <ReqT, RespT> Context serverFilterContext(Context context) {
Context ctx = checkNotNull(context, "context");
for (StreamTracer tracer : tracers) {
ctx = ((ServerStreamTracer) tracer).filterContext(ctx);
checkNotNull(ctx, "%s returns null context", tracer);
}
return ctx;
}
@Override
protected List<? extends InternalServer> newServer(
List<ServerStreamTracer.Factory> streamTracerFactories) {
return AccessProtectedHack.serverBuilderBuildTransportServer(
NettyServerBuilder
.forPort(0)
.flowControlWindow(65 * 1024),
streamTracerFactories,
fakeClockTransportTracer);
}
public static List<? extends InternalServer> serverBuilderBuildTransportServer(
AbstractServerImplBuilder<?> builder,
List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory) {
builder.transportTracerFactory = transportTracerFactory;
return builder.buildTransportServers(streamTracerFactories);
}
@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
ServerStreamTracerInfo.InteropServerStreamTracer tracer
= new ServerStreamTracerInfo.InteropServerStreamTracer();
serverStreamTracers.add(new ServerStreamTracerInfo(fullMethodName, tracer));
return tracer;
}