下面列出了org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration#zipkin2.reporter.kafka.KafkaSender 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Build a new {@link HttpTracing} instance for interfacing with Zipkin
*
* @param environment Environment
* @return Brave instance
*/
@Override
public Optional<HttpTracing> build(final Environment environment) {
if (!isEnabled()) {
LOGGER.warn("Zipkin tracing is disabled");
return Optional.empty();
}
final KafkaSender sender =
KafkaSender.newBuilder()
.bootstrapServers(bootstrapServers)
.topic(topic)
.overrides(overrides)
.build();
LOGGER.info("Sending spans to Kafka topic \"{}\" at: {}", topic, bootstrapServers);
return buildTracing(environment, sender);
}
@Override protected Sender createSender() throws Exception {
broker.start();
kafka = new KafkaJunitRule(broker).waitForStartup();
consumer = kafka.helper().createByteConsumer();
consumer.subscribe(Collections.singletonList("zipkin"));
new Thread(() -> {
while (true) {
Iterator<ConsumerRecord<byte[], byte[]>> messages = consumer.poll(1000L).iterator();
while (messages.hasNext()) {
messages.next();
}
}
}).start();
return KafkaSender.create(broker.getBrokerList().get());
}
@Test
public void shouldBuildKafkaSenderWithConfig() {
// Given
Map<String, String> map = new HashMap<>();
map.put(SENDER_TYPE_CONFIG, TracingBuilder.SenderBuilder.SenderType.KAFKA.name());
map.put(KAFKA_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
TracingConfiguration config = new TracingConfiguration(map);
// When
Sender sender = new TracingBuilder.SenderBuilder(config).build();
// Then
assertTrue(sender instanceof KafkaSender);
}
@Test
public void shouldBuildKafkaSenderWithDefault() {
// Given
Map<String, String> map = new HashMap<>();
map.put(SENDER_TYPE_CONFIG, TracingBuilder.SenderBuilder.SenderType.KAFKA.name());
map.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
TracingConfiguration config = new TracingConfiguration(map);
// When
Sender sender = new TracingBuilder.SenderBuilder(config).build();
// Then
assertTrue(sender instanceof KafkaSender);
}
@Test
public void shouldBuildKafkaSenderWithList() {
// Given
Map<String, Object> map = new HashMap<>();
map.put(SENDER_TYPE_CONFIG, TracingBuilder.SenderBuilder.SenderType.KAFKA.name());
map.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
Arrays.asList("localhost:9092", "localhost:9094"));
TracingConfiguration config = new TracingConfiguration(map);
// When
Sender sender = new TracingBuilder.SenderBuilder(config).build();
// Then
assertTrue(sender instanceof KafkaSender);
}
/**
* Create reporter.
*/
private static AsyncReporter<zipkin2.Span> setupReporter() {
var sender = KafkaSender.newBuilder()
.encoding(Encoding.PROTO3)
.bootstrapServers(kafkaContainer.getBootstrapServers())
.build();
return AsyncReporter.create(sender);
}
KafkaZipkinTracer(Config config, StatsReceiver stats) {
this(KafkaSender.newBuilder()
.encoding(Encoding.JSON)
.bootstrapServers(config.bootstrapServers())
.topic(config.topic())
.build(), config, stats);
}
@Test public void whenKafkaIsDown() throws Exception {
broker.stop();
// Make a new tracer that fails faster than 60 seconds
tracer.close();
stats.clear();
Map<String, String> overrides = new LinkedHashMap<>();
overrides.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "100");
tracer = new KafkaZipkinTracer(KafkaSender.newBuilder()
.bootstrapServers(config.bootstrapServers())
.topic(config.topic())
.overrides(overrides)
.build(), config, stats);
tracer.record(new Record(FinagleTestObjects.root, Time.fromMilliseconds(FinagleTestObjects.TODAY), new ServiceName("web"), none));
tracer.record(new Record(FinagleTestObjects.root, Time.fromMilliseconds(FinagleTestObjects.TODAY), new Rpc("get"), none));
tracer.record(new Record(FinagleTestObjects.root, Time.fromMilliseconds(FinagleTestObjects.TODAY), ClientSend$.MODULE$, none));
tracer.record(new Record(
FinagleTestObjects.root, Time.fromMilliseconds(FinagleTestObjects.TODAY + 1), ClientRecv$.MODULE$, none));
Thread.sleep(1500); // wait for kafka request attempt to go through
assertThat(mapAsJavaMap(stats.counters())).containsOnly(
entry(FinagleTestObjects.seq("spans"), 1L),
entry(FinagleTestObjects.seq("span_bytes"), 185L),
entry(FinagleTestObjects.seq("spans_dropped"), 1L),
entry(FinagleTestObjects.seq("messages"), 1L),
entry(FinagleTestObjects.seq("message_bytes"), 187L),
entry(FinagleTestObjects.seq("messages_dropped"), 1L),
entry(FinagleTestObjects.seq("messages_dropped", "org.apache.kafka.common.errors.TimeoutException"), 1L)
);
}
@Override protected KafkaSender createInstance() {
KafkaSender.Builder builder = KafkaSender.newBuilder();
if (bootstrapServers != null) builder.bootstrapServers(bootstrapServers);
if (encoding != null) builder.encoding(encoding);
if (topic != null) builder.topic(topic);
if (messageMaxBytes != null) builder.messageMaxBytes(messageMaxBytes);
return builder.build();
}
@Test public void bootstrapServers() {
context = new XmlBeans(""
+ "<bean id=\"sender\" class=\"zipkin2.reporter.beans.KafkaSenderFactoryBean\">\n"
+ " <property name=\"bootstrapServers\" value=\"localhost:9092\"/>\n"
+ "</bean>"
);
assertThat(context.getBean("sender", KafkaSender.class))
.isEqualToComparingFieldByField(KafkaSender.newBuilder()
.bootstrapServers("localhost:9092")
.build());
}
@Test public void topic() {
context = new XmlBeans(""
+ "<bean id=\"sender\" class=\"zipkin2.reporter.beans.KafkaSenderFactoryBean\">\n"
+ " <property name=\"bootstrapServers\" value=\"localhost:9092\"/>\n"
+ " <property name=\"topic\" value=\"zipkin2\"/>\n"
+ "</bean>"
);
assertThat(context.getBean("sender", KafkaSender.class))
.isEqualToComparingFieldByField(KafkaSender.newBuilder()
.bootstrapServers("localhost:9092")
.topic("zipkin2").build());
}
@Test public void messageMaxBytes() {
context = new XmlBeans(""
+ "<bean id=\"sender\" class=\"zipkin2.reporter.beans.KafkaSenderFactoryBean\">\n"
+ " <property name=\"bootstrapServers\" value=\"localhost:9092\"/>\n"
+ " <property name=\"messageMaxBytes\" value=\"1024\"/>\n"
+ "</bean>"
);
assertThat(context.getBean("sender", KafkaSender.class))
.extracting("messageMaxBytes")
.isEqualTo(1024);
}
@Test public void encoding() {
context = new XmlBeans(""
+ "<bean id=\"sender\" class=\"zipkin2.reporter.beans.KafkaSenderFactoryBean\">\n"
+ " <property name=\"bootstrapServers\" value=\"localhost:9092\"/>\n"
+ " <property name=\"encoding\" value=\"PROTO3\"/>\n"
+ "</bean>"
);
assertThat(context.getBean("sender", KafkaSender.class))
.extracting("encoding")
.isEqualTo(Encoding.PROTO3);
}
@Test(expected = IllegalStateException.class) public void close_closesSender() {
context = new XmlBeans(""
+ "<bean id=\"sender\" class=\"zipkin2.reporter.beans.KafkaSenderFactoryBean\">\n"
+ " <property name=\"bootstrapServers\" value=\"localhost:9092\"/>\n"
+ "</bean>"
);
KafkaSender sender = context.getBean("sender", KafkaSender.class);
context.close();
sender.sendSpans(Arrays.asList(new byte[] {'{', '}'}));
}
@Bean(ZipkinAutoConfiguration.SENDER_BEAN_NAME)
Sender kafkaSender(KafkaProperties config) {
Map<String, Object> properties = config.buildProducerProperties();
properties.put("key.serializer", ByteArraySerializer.class.getName());
properties.put("value.serializer", ByteArraySerializer.class.getName());
// Kafka expects the input to be a String, but KafkaProperties returns a list
Object bootstrapServers = properties.get("bootstrap.servers");
if (bootstrapServers instanceof List) {
properties.put("bootstrap.servers", join((List) bootstrapServers));
}
return KafkaSender.newBuilder().topic(this.topic).overrides(properties)
.build();
}
@Test
public void overrideKafkaTopic() throws Exception {
this.context = new AnnotationConfigApplicationContext();
environment().setProperty("spring.zipkin.kafka.topic", "zipkin2");
environment().setProperty("spring.zipkin.sender.type", "kafka");
this.context.register(PropertyPlaceholderAutoConfiguration.class,
KafkaAutoConfiguration.class, ZipkinAutoConfiguration.class,
TraceAutoConfiguration.class);
this.context.refresh();
then(this.context.getBean(Sender.class)).isInstanceOf(KafkaSender.class);
this.context.close();
}
Sender build(Encoding encoding) {
return KafkaSender.newBuilder().bootstrapServers(bootstrapServers).encoding(encoding).build();
}
KafkaZipkinTracer(KafkaSender kafka, Config config, StatsReceiver stats) {
super(kafka, config, stats);
this.kafka = kafka;
}
@Override public Class<? extends KafkaSender> getObjectType() {
return KafkaSender.class;
}
@Override protected void destroyInstance(Object instance) {
((KafkaSender) instance).close();
}