org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration#zipkin2.reporter.kafka.KafkaSender源码实例Demo

下面列出了org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration#zipkin2.reporter.kafka.KafkaSender 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: dropwizard-zipkin   文件: KafkaZipkinFactory.java
/**
 * Build a new {@link HttpTracing} instance for interfacing with Zipkin
 *
 * @param environment Environment
 * @return Brave instance
 */
@Override
public Optional<HttpTracing> build(final Environment environment) {
  if (!isEnabled()) {
    LOGGER.warn("Zipkin tracing is disabled");
    return Optional.empty();
  }

  final KafkaSender sender =
      KafkaSender.newBuilder()
          .bootstrapServers(bootstrapServers)
          .topic(topic)
          .overrides(overrides)
          .build();

  LOGGER.info("Sending spans to Kafka topic \"{}\" at: {}", topic, bootstrapServers);

  return buildTracing(environment, sender);
}
 
@Override protected Sender createSender() throws Exception {
  broker.start();
  kafka = new KafkaJunitRule(broker).waitForStartup();
  consumer = kafka.helper().createByteConsumer();
  consumer.subscribe(Collections.singletonList("zipkin"));

  new Thread(() -> {
    while (true) {
      Iterator<ConsumerRecord<byte[], byte[]>> messages = consumer.poll(1000L).iterator();
      while (messages.hasNext()) {
        messages.next();
      }
    }
  }).start();

  return KafkaSender.create(broker.getBrokerList().get());
}
 
@Test
public void shouldBuildKafkaSenderWithConfig() {
  // Given
  Map<String, String> map = new HashMap<>();
  map.put(SENDER_TYPE_CONFIG, TracingBuilder.SenderBuilder.SenderType.KAFKA.name());
  map.put(KAFKA_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  TracingConfiguration config = new TracingConfiguration(map);
  // When
  Sender sender = new TracingBuilder.SenderBuilder(config).build();
  // Then
  assertTrue(sender instanceof KafkaSender);
}
 
@Test
public void shouldBuildKafkaSenderWithDefault() {
  // Given
  Map<String, String> map = new HashMap<>();
  map.put(SENDER_TYPE_CONFIG, TracingBuilder.SenderBuilder.SenderType.KAFKA.name());
  map.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  TracingConfiguration config = new TracingConfiguration(map);
  // When
  Sender sender = new TracingBuilder.SenderBuilder(config).build();
  // Then
  assertTrue(sender instanceof KafkaSender);
}
 
@Test
public void shouldBuildKafkaSenderWithList() {
  // Given
  Map<String, Object> map = new HashMap<>();
  map.put(SENDER_TYPE_CONFIG, TracingBuilder.SenderBuilder.SenderType.KAFKA.name());
  map.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
    Arrays.asList("localhost:9092", "localhost:9094"));
  TracingConfiguration config = new TracingConfiguration(map);
  // When
  Sender sender = new TracingBuilder.SenderBuilder(config).build();
  // Then
  assertTrue(sender instanceof KafkaSender);
}
 
源代码6 项目: pitchfork   文件: KafkaIngressTest.java
/**
 * Create reporter.
 */
private static AsyncReporter<zipkin2.Span> setupReporter() {
    var sender = KafkaSender.newBuilder()
            .encoding(Encoding.PROTO3)
            .bootstrapServers(kafkaContainer.getBootstrapServers())
            .build();
    return AsyncReporter.create(sender);
}
 
源代码7 项目: zipkin-finagle   文件: KafkaZipkinTracer.java
KafkaZipkinTracer(Config config, StatsReceiver stats) {
  this(KafkaSender.newBuilder()
      .encoding(Encoding.JSON)
      .bootstrapServers(config.bootstrapServers())
      .topic(config.topic())
      .build(), config, stats);
}
 
@Test public void whenKafkaIsDown() throws Exception {
  broker.stop();

  // Make a new tracer that fails faster than 60 seconds
  tracer.close();
  stats.clear();
  Map<String, String> overrides = new LinkedHashMap<>();
  overrides.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "100");
  tracer = new KafkaZipkinTracer(KafkaSender.newBuilder()
      .bootstrapServers(config.bootstrapServers())
      .topic(config.topic())
      .overrides(overrides)
      .build(), config, stats);

  tracer.record(new Record(FinagleTestObjects.root, Time.fromMilliseconds(FinagleTestObjects.TODAY), new ServiceName("web"), none));
  tracer.record(new Record(FinagleTestObjects.root, Time.fromMilliseconds(FinagleTestObjects.TODAY), new Rpc("get"), none));
  tracer.record(new Record(FinagleTestObjects.root, Time.fromMilliseconds(FinagleTestObjects.TODAY), ClientSend$.MODULE$, none));
  tracer.record(new Record(
      FinagleTestObjects.root, Time.fromMilliseconds(FinagleTestObjects.TODAY + 1), ClientRecv$.MODULE$, none));

  Thread.sleep(1500); // wait for kafka request attempt to go through

  assertThat(mapAsJavaMap(stats.counters())).containsOnly(
      entry(FinagleTestObjects.seq("spans"), 1L),
      entry(FinagleTestObjects.seq("span_bytes"), 185L),
      entry(FinagleTestObjects.seq("spans_dropped"), 1L),
      entry(FinagleTestObjects.seq("messages"), 1L),
      entry(FinagleTestObjects.seq("message_bytes"), 187L),
      entry(FinagleTestObjects.seq("messages_dropped"), 1L),
      entry(FinagleTestObjects.seq("messages_dropped", "org.apache.kafka.common.errors.TimeoutException"), 1L)
  );
}
 
@Override protected KafkaSender createInstance() {
  KafkaSender.Builder builder = KafkaSender.newBuilder();
  if (bootstrapServers != null) builder.bootstrapServers(bootstrapServers);
  if (encoding != null) builder.encoding(encoding);
  if (topic != null) builder.topic(topic);
  if (messageMaxBytes != null) builder.messageMaxBytes(messageMaxBytes);
  return builder.build();
}
 
@Test public void bootstrapServers() {
  context = new XmlBeans(""
      + "<bean id=\"sender\" class=\"zipkin2.reporter.beans.KafkaSenderFactoryBean\">\n"
      + "  <property name=\"bootstrapServers\" value=\"localhost:9092\"/>\n"
      + "</bean>"
  );

  assertThat(context.getBean("sender", KafkaSender.class))
      .isEqualToComparingFieldByField(KafkaSender.newBuilder()
          .bootstrapServers("localhost:9092")
          .build());
}
 
@Test public void topic() {
  context = new XmlBeans(""
      + "<bean id=\"sender\" class=\"zipkin2.reporter.beans.KafkaSenderFactoryBean\">\n"
      + "  <property name=\"bootstrapServers\" value=\"localhost:9092\"/>\n"
      + "  <property name=\"topic\" value=\"zipkin2\"/>\n"
      + "</bean>"
  );

  assertThat(context.getBean("sender", KafkaSender.class))
      .isEqualToComparingFieldByField(KafkaSender.newBuilder()
          .bootstrapServers("localhost:9092")
          .topic("zipkin2").build());
}
 
@Test public void messageMaxBytes() {
  context = new XmlBeans(""
      + "<bean id=\"sender\" class=\"zipkin2.reporter.beans.KafkaSenderFactoryBean\">\n"
      + "  <property name=\"bootstrapServers\" value=\"localhost:9092\"/>\n"
      + "  <property name=\"messageMaxBytes\" value=\"1024\"/>\n"
      + "</bean>"
  );

  assertThat(context.getBean("sender", KafkaSender.class))
      .extracting("messageMaxBytes")
      .isEqualTo(1024);
}
 
@Test public void encoding() {
  context = new XmlBeans(""
      + "<bean id=\"sender\" class=\"zipkin2.reporter.beans.KafkaSenderFactoryBean\">\n"
      + "  <property name=\"bootstrapServers\" value=\"localhost:9092\"/>\n"
      + "  <property name=\"encoding\" value=\"PROTO3\"/>\n"
      + "</bean>"
  );

  assertThat(context.getBean("sender", KafkaSender.class))
      .extracting("encoding")
      .isEqualTo(Encoding.PROTO3);
}
 
@Test(expected = IllegalStateException.class) public void close_closesSender() {
  context = new XmlBeans(""
      + "<bean id=\"sender\" class=\"zipkin2.reporter.beans.KafkaSenderFactoryBean\">\n"
      + "  <property name=\"bootstrapServers\" value=\"localhost:9092\"/>\n"
      + "</bean>"
  );

  KafkaSender sender = context.getBean("sender", KafkaSender.class);
  context.close();

  sender.sendSpans(Arrays.asList(new byte[] {'{', '}'}));
}
 
@Bean(ZipkinAutoConfiguration.SENDER_BEAN_NAME)
Sender kafkaSender(KafkaProperties config) {
	Map<String, Object> properties = config.buildProducerProperties();
	properties.put("key.serializer", ByteArraySerializer.class.getName());
	properties.put("value.serializer", ByteArraySerializer.class.getName());
	// Kafka expects the input to be a String, but KafkaProperties returns a list
	Object bootstrapServers = properties.get("bootstrap.servers");
	if (bootstrapServers instanceof List) {
		properties.put("bootstrap.servers", join((List) bootstrapServers));
	}
	return KafkaSender.newBuilder().topic(this.topic).overrides(properties)
			.build();
}
 
@Test
public void overrideKafkaTopic() throws Exception {
	this.context = new AnnotationConfigApplicationContext();
	environment().setProperty("spring.zipkin.kafka.topic", "zipkin2");
	environment().setProperty("spring.zipkin.sender.type", "kafka");
	this.context.register(PropertyPlaceholderAutoConfiguration.class,
			KafkaAutoConfiguration.class, ZipkinAutoConfiguration.class,
			TraceAutoConfiguration.class);
	this.context.refresh();

	then(this.context.getBean(Sender.class)).isInstanceOf(KafkaSender.class);

	this.context.close();
}
 
源代码17 项目: brave-kafka-interceptor   文件: TracingBuilder.java
Sender build(Encoding encoding) {
  return KafkaSender.newBuilder().bootstrapServers(bootstrapServers).encoding(encoding).build();
}
 
源代码18 项目: zipkin-finagle   文件: KafkaZipkinTracer.java
KafkaZipkinTracer(KafkaSender kafka, Config config, StatsReceiver stats) {
  super(kafka, config, stats);
  this.kafka = kafka;
}
 
@Override public Class<? extends KafkaSender> getObjectType() {
  return KafkaSender.class;
}
 
@Override protected void destroyInstance(Object instance) {
  ((KafkaSender) instance).close();
}