下面列出了org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration#zipkin2.reporter.Sender 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* The {@link StackdriverSender} bean.
* @param cloudConfiguration The google cloud configuration
* @param credentials The credentials
* @param channel The channel to use
* @return The sender
*/
@RequiresGoogleProjectId
@Requires(classes = StackdriverSender.class)
@Singleton
protected @Nonnull Sender stackdriverSender(
@Nonnull GoogleCloudConfiguration cloudConfiguration,
@Nonnull GoogleCredentials credentials,
@Nonnull @Named("stackdriverTraceSenderChannel") ManagedChannel channel) {
GoogleCredentials traceCredentials = credentials.createScoped(Arrays.asList(TRACE_SCOPE.toString()));
return StackdriverSender.newBuilder(channel)
.projectId(cloudConfiguration.getProjectId())
.callOptions(CallOptions.DEFAULT
.withCallCredentials(MoreCallCredentials.from(traceCredentials)))
.build();
}
@Override
public Tracer getTracer() {
Sender sender = OkHttpSender.create("http://192.168.20.131:9411/api/v2/spans");
Reporter spanReporter = AsyncReporter.create(sender);
// If you want to support baggage, indicate the fields you'd like to
// whitelist, in this case "country-code" and "user-id". On the wire,
// they will be prefixed like "baggage-country-code"
Propagation.Factory propagationFactory = ExtraFieldPropagation.newFactoryBuilder(B3Propagation.FACTORY)
.addPrefixedFields("baggage-", Arrays.asList("country-code", "user-id"))
.build();
Tracing braveTracing = Tracing.newBuilder()
.localServiceName("gateway")
.propagationFactory(propagationFactory)
.spanReporter(spanReporter)
.build();
Tracer tracer = BraveTracer.create(braveTracing);
return tracer;
}
@Bean
Sender sender(DynamicProperties dynamicProperties) {
apiVersion = dynamicProperties.getStringProperty(CONFIG_TRACING_COLLECTOR_API_VERSION,
CONFIG_TRACING_COLLECTOR_API_V2).toLowerCase();
// use default value if the user set value is invalid
if (apiVersion.compareTo(CONFIG_TRACING_COLLECTOR_API_V1) != 0) {
apiVersion = CONFIG_TRACING_COLLECTOR_API_V2;
}
String path = MessageFormat.format(CONFIG_TRACING_COLLECTOR_PATH, apiVersion);
return OkHttpSender.create(
dynamicProperties.getStringProperty(
CONFIG_TRACING_COLLECTOR_ADDRESS,
DEFAULT_TRACING_COLLECTOR_ADDRESS)
.trim()
.replaceAll("/+$", "")
.concat(path));
}
@Bean(REPORTER_BEAN_NAME)
@ConditionalOnMissingBean(name = REPORTER_BEAN_NAME)
public Reporter<Span> stackdriverReporter(ReporterMetrics reporterMetrics,
GcpTraceProperties trace, @Qualifier(SENDER_BEAN_NAME) Sender sender) {
AsyncReporter<Span> asyncReporter = AsyncReporter.builder(sender)
// historical constraint. Note: AsyncReporter supports memory bounds
.queuedMaxSpans(1000)
.messageTimeout(trace.getMessageTimeout(), TimeUnit.SECONDS)
.metrics(reporterMetrics).build(StackdriverEncoder.V2);
CheckResult checkResult = asyncReporter.check();
if (!checkResult.ok()) {
LOGGER.warn(
"Error when performing Stackdriver AsyncReporter health check.", checkResult.error());
}
return asyncReporter;
}
@Test
public void testAsyncReporterHealthCheck() {
Sender senderMock = mock(Sender.class);
when(senderMock.check()).thenReturn(CheckResult.failed(new RuntimeException()));
when(senderMock.encoding()).thenReturn(SpanBytesEncoder.PROTO3.encoding());
this.contextRunner
.withBean(
StackdriverTraceAutoConfiguration.SENDER_BEAN_NAME,
Sender.class,
() -> senderMock)
.run(context -> {
Reporter<Span> asyncReporter = context.getBean(Reporter.class);
assertThat(asyncReporter).isNotNull();
verify(senderMock, times(1)).check();
});
}
/**
* Creates and registers the Zipkin Trace exporter to the OpenCensus library. Only one Zipkin
* exporter can be registered at any point.
*
* @param configuration configuration for this exporter.
* @throws IllegalStateException if a Zipkin exporter is already registered.
* @since 0.22
*/
public static void createAndRegister(ZipkinExporterConfiguration configuration) {
synchronized (monitor) {
checkState(handler == null, "Zipkin exporter is already registered.");
Sender sender = configuration.getSender();
if (sender == null) {
sender = URLConnectionSender.create(configuration.getV2Url());
}
Handler newHandler =
new ZipkinExporterHandler(
configuration.getEncoder(),
sender,
configuration.getServiceName(),
configuration.getDeadline());
handler = newHandler;
register(Tracing.getExportComponent().getSpanExporter(), newHandler);
}
}
@Override protected Sender createSender() throws Exception {
RabbitMQSender result = RabbitMQSender.newBuilder()
.queue("zipkin-jmh")
.addresses("localhost:5672").build();
CheckResult check = result.check();
if (!check.ok()) {
throw new AssumptionViolatedException(check.error().getMessage(), check.error());
}
channel = result.localChannel();
channel.queueDelete(result.queue);
channel.queueDeclare(result.queue, false, true, true, null);
Thread.sleep(500L);
new Thread(() -> {
try {
channel.basicConsume(result.queue, true, new DefaultConsumer(channel));
} catch (IOException e) {
e.printStackTrace();
}
}).start();
return result;
}
@Test
public void overrideActiveMqQueue() throws Exception {
this.context = new AnnotationConfigApplicationContext();
environment().setProperty("spring.jms.cache.enabled", "false");
environment().setProperty("spring.zipkin.activemq.queue", "zipkin2");
environment().setProperty("spring.zipkin.activemq.message-max-bytes", "50");
environment().setProperty("spring.zipkin.sender.type", "activemq");
this.context.register(PropertyPlaceholderAutoConfiguration.class,
ActiveMQAutoConfiguration.class, ZipkinAutoConfiguration.class,
TraceAutoConfiguration.class);
this.context.refresh();
then(this.context.getBean(Sender.class)).isInstanceOf(ActiveMQSender.class);
this.context.close();
}
Tracing build() {
Tracing.Builder builder = Tracing.newBuilder();
Sender sender = new SenderBuilder(configuration).build();
if (sender != null) {
AsyncReporter<Span> reporter = AsyncReporter.builder(sender).build();
builder.spanReporter(reporter);
}
Sampler sampler = new SamplerBuilder(configuration).build();
return builder.sampler(sampler)
.localServiceName(localServiceName)
.traceId128Bit(traceId128Bit)
.build();
}
Sender build() {
Encoding encoding = new EncodingBuilder(configuration).build();
switch (senderType) {
case HTTP:
return new HttpSenderBuilder(configuration).build(encoding);
case KAFKA:
return new KafkaSenderBuilder(configuration).build(encoding);
case NONE:
return null;
default:
throw new IllegalArgumentException("Zipkin sender type unknown");
}
}
@Test
public void shouldBuildNullSender() {
// Given
Map<String, String> map = new HashMap<>();
TracingConfiguration config = new TracingConfiguration(map);
// When
Sender sender = new TracingBuilder.SenderBuilder(config).build();
// Then
assertNull(sender);
}
@Test
public void shouldBuildNoneSender() {
// Given
Map<String, String> map = new HashMap<>();
map.put(SENDER_TYPE_CONFIG, TracingBuilder.SenderBuilder.SenderType.NONE.name());
TracingConfiguration config = new TracingConfiguration(map);
// When
Sender sender = new TracingBuilder.SenderBuilder(config).build();
// Then
assertNull(sender);
}
@Test
public void shouldBuildHttpSender() {
// Given
Map<String, String> map = new HashMap<>();
map.put(SENDER_TYPE_CONFIG, TracingBuilder.SenderBuilder.SenderType.HTTP.name());
TracingConfiguration config = new TracingConfiguration(map);
// When
Sender sender = new TracingBuilder.SenderBuilder(config).build();
// Then
assertTrue(sender instanceof OkHttpSender);
}
@Test
public void shouldBuildKafkaSenderWithConfig() {
// Given
Map<String, String> map = new HashMap<>();
map.put(SENDER_TYPE_CONFIG, TracingBuilder.SenderBuilder.SenderType.KAFKA.name());
map.put(KAFKA_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
TracingConfiguration config = new TracingConfiguration(map);
// When
Sender sender = new TracingBuilder.SenderBuilder(config).build();
// Then
assertTrue(sender instanceof KafkaSender);
}
@Test
public void shouldBuildKafkaSenderWithDefault() {
// Given
Map<String, String> map = new HashMap<>();
map.put(SENDER_TYPE_CONFIG, TracingBuilder.SenderBuilder.SenderType.KAFKA.name());
map.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
TracingConfiguration config = new TracingConfiguration(map);
// When
Sender sender = new TracingBuilder.SenderBuilder(config).build();
// Then
assertTrue(sender instanceof KafkaSender);
}
@Test
public void shouldBuildKafkaSenderWithList() {
// Given
Map<String, Object> map = new HashMap<>();
map.put(SENDER_TYPE_CONFIG, TracingBuilder.SenderBuilder.SenderType.KAFKA.name());
map.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
Arrays.asList("localhost:9092", "localhost:9094"));
TracingConfiguration config = new TracingConfiguration(map);
// When
Sender sender = new TracingBuilder.SenderBuilder(config).build();
// Then
assertTrue(sender instanceof KafkaSender);
}
/** Configuration for how to buffer spans into messages for Zipkin */
@Bean
Reporter<Span> reporter(
Sender sender,
@Value("${zipkin.self-tracing.message-timeout:1}") int messageTimeout,
CollectorMetrics metrics) {
return AsyncReporter.builder(sender)
.messageTimeout(messageTimeout, TimeUnit.SECONDS)
.metrics(new ReporterMetricsAdapter(metrics.forTransport("local")))
.build();
}
public static void main(String[] args) {
byte[] arr = new byte[10];
Arrays.fill(arr, Byte.MAX_VALUE);
try (Sender sender = OkHttpSender.create("http://192.168.20.131:9411/api/v2/spans")) {
sender.sendSpans(Collections.singletonList(arr));
} catch (IOException e) {
e.printStackTrace();
}
}
@Bean
Reporter<Span> zipkinReporter(Sender sender) {
if (apiVersion.compareTo(CONFIG_TRACING_COLLECTOR_API_V1) == 0) {
return AsyncReporter.builder(sender).build(SpanBytesEncoder.JSON_V1);
}
return AsyncReporter.builder(sender).build();
}
@Override
public Sender getSender() {
return KafkaSender
.newBuilder()
.bootstrapServers(super.getZipkinUrl())
.topic(super.getTopic())
.encoding(Encoding.JSON)
.build();
}
@Test
public void test() {
this.contextRunner.run((context) -> {
SleuthProperties sleuthProperties = context.getBean(SleuthProperties.class);
assertThat(sleuthProperties.isTraceId128()).isTrue();
assertThat(sleuthProperties.isSupportsJoin()).isFalse();
assertThat(context.getBean(HttpClientParser.class)).isNotNull();
assertThat(context.getBean(HttpServerParser.class)).isNotNull();
assertThat(context.getBean(Sender.class)).isNotNull();
assertThat(context.getBean(ManagedChannel.class)).isNotNull();
});
}
protected Optional<HttpTracing> buildTracing(final Environment environment, Sender sender) {
final AsyncReporter<Span> reporter =
AsyncReporter.builder(sender)
.metrics(new DropwizardReporterMetrics(environment.metrics()))
.messageTimeout(reportTimeout.toNanoseconds(), TimeUnit.NANOSECONDS)
.build();
environment.lifecycle().manage(new ReporterManager(reporter, sender));
return buildTracing(environment, ZipkinSpanHandler.create(reporter));
}
ZipkinExporterHandler(
SpanBytesEncoder encoder, Sender sender, String serviceName, Duration deadline) {
super(deadline, EXPORT_SPAN_NAME);
this.encoder = encoder;
this.sender = sender;
this.localEndpoint = produceLocalEndpoint(serviceName);
}
@Bean(REPORTER_BEAN_NAME)
@ConditionalOnMissingBean(name = REPORTER_BEAN_NAME)
public Reporter<Span> reporter(ReporterMetrics reporterMetrics,
ZipkinProperties zipkin, @Qualifier(SENDER_BEAN_NAME) Sender sender) {
CheckResult checkResult = checkResult(sender, 1_000L);
logCheckResult(sender, checkResult);
// historical constraint. Note: AsyncReporter supports memory bounds
AsyncReporter<Span> asyncReporter = AsyncReporter.builder(sender)
.queuedMaxSpans(1000)
.messageTimeout(zipkin.getMessageTimeout(), TimeUnit.SECONDS)
.metrics(reporterMetrics).build(zipkin.getEncoder());
return asyncReporter;
}
private void logCheckResult(Sender sender, CheckResult checkResult) {
if (log.isDebugEnabled() && checkResult != null && checkResult.ok()) {
log.debug("Check result of the [" + sender.toString() + "] is [" + checkResult
+ "]");
}
else if (checkResult != null && !checkResult.ok()) {
log.warn("Check result of the [" + sender.toString() + "] contains an error ["
+ checkResult + "]");
}
}
@Bean(ZipkinAutoConfiguration.SENDER_BEAN_NAME)
Sender kafkaSender(KafkaProperties config) {
Map<String, Object> properties = config.buildProducerProperties();
properties.put("key.serializer", ByteArraySerializer.class.getName());
properties.put("value.serializer", ByteArraySerializer.class.getName());
// Kafka expects the input to be a String, but KafkaProperties returns a list
Object bootstrapServers = properties.get("bootstrap.servers");
if (bootstrapServers instanceof List) {
properties.put("bootstrap.servers", join((List) bootstrapServers));
}
return KafkaSender.newBuilder().topic(this.topic).overrides(properties)
.build();
}
@Bean(ZipkinAutoConfiguration.SENDER_BEAN_NAME)
public Sender restTemplateSender(ZipkinProperties zipkin,
ZipkinRestTemplateCustomizer zipkinRestTemplateCustomizer) {
RestTemplate restTemplate = new ZipkinRestTemplateWrapper(zipkin, this.extractor);
restTemplate = zipkinRestTemplateCustomizer.customizeTemplate(restTemplate);
return new RestTemplateSender(restTemplate, zipkin.getBaseUrl(),
zipkin.getEncoder());
}
@Bean(ZipkinAutoConfiguration.SENDER_BEAN_NAME)
Sender rabbitSender(CachingConnectionFactory connectionFactory,
RabbitProperties config) {
String addresses = StringUtils.hasText(this.addresses) ? this.addresses
: config.determineAddresses();
return RabbitMQSender.newBuilder()
.connectionFactory(connectionFactory.getRabbitConnectionFactory())
.queue(this.queue).addresses(addresses).build();
}
@Test
public void overrideRabbitMQQueue() throws Exception {
this.context = new AnnotationConfigApplicationContext();
environment().setProperty("spring.zipkin.rabbitmq.queue", "zipkin2");
environment().setProperty("spring.zipkin.sender.type", "rabbit");
this.context.register(PropertyPlaceholderAutoConfiguration.class,
RabbitAutoConfiguration.class, ZipkinAutoConfiguration.class,
TraceAutoConfiguration.class);
this.context.refresh();
then(this.context.getBean(Sender.class)).isInstanceOf(RabbitMQSender.class);
this.context.close();
}
@Test
public void overrideKafkaTopic() throws Exception {
this.context = new AnnotationConfigApplicationContext();
environment().setProperty("spring.zipkin.kafka.topic", "zipkin2");
environment().setProperty("spring.zipkin.sender.type", "kafka");
this.context.register(PropertyPlaceholderAutoConfiguration.class,
KafkaAutoConfiguration.class, ZipkinAutoConfiguration.class,
TraceAutoConfiguration.class);
this.context.refresh();
then(this.context.getBean(Sender.class)).isInstanceOf(KafkaSender.class);
this.context.close();
}