org.apache.kafka.clients.producer.MockProducer#history ( )源码实例Demo

下面列出了org.apache.kafka.clients.producer.MockProducer#history ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: baleen   文件: KafkaTransportsTest.java
@Test
public void testKafkaTransportCanSend() throws UIMAException, IOException {

  MockKafkaResource mockKafkaResource = new MockKafkaResource();
  when(context.getResourceObject(SharedKafkaResource.RESOURCE_KEY)).thenReturn(locator);
  when(locator.getResource()).thenReturn(mockKafkaResource);

  KafkaTransportSender kafkaTransportSender = new KafkaTransportSender();
  kafkaTransportSender.initialize(context);

  JCas in = JCasFactory.createJCas();
  kafkaTransportSender.process(in);

  MockProducer<String, String> mockProducer = mockKafkaResource.getMockProducer();

  List<ProducerRecord<String, String>> history = mockProducer.history();
  assertTrue(history.size() == 1);
  assertEquals(JCasSerializationTester.EMPTY_JSON, history.get(0).value());
  kafkaTransportSender.closeQueue();
}
 
/**
 * Publish tests need to manipulate the MockProducer in a separate thread from the blocking
 * publish request so we'll use the PUBLISH_EXECUTOR to submit Runnables that implement the
 * desired producer behaviors.
 */
@Test
public void publish() {
  int messages = 5;
  PublishRequest request =
      PublishRequest.newBuilder()
          .setTopic("projects/project-1/topics/topic-1")
          .addAllMessages(generatePubsubMessages(messages))
          .build();

  MockProducer<String, ByteBuffer> mockProducer = startPublishExecutor(messages);

  PublishResponse response = blockingStub.publish(request);
  List<String> topics = new ArrayList<>();
  List<String> data = new ArrayList<>();
  for (ProducerRecord<String, ByteBuffer> producerRecord : mockProducer.history()) {
    topics.add(producerRecord.topic());
    data.add(UTF_8.decode(producerRecord.value()).toString());
  }

  assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2", "0-3", "0-4"));
  assertThat(
      topics,
      Matchers.contains(
          "kafka-topic-1", "kafka-topic-1", "kafka-topic-1", "kafka-topic-1", "kafka-topic-1"));
  assertThat(
      data, Matchers.contains("message-0", "message-1", "message-2", "message-3", "message-4"));

  verify(statisticsManager, times(5))
      .computePublish(
          eq("projects/project-1/topics/topic-1"),
          argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
          anyLong());
  verify(statisticsManager, never()).computePublishError(anyString());
}
 
源代码3 项目: beam   文件: KafkaIOTest.java
private static void verifyProducerRecords(
    MockProducer<Integer, Long> mockProducer,
    String topic,
    int numElements,
    boolean keyIsAbsent,
    boolean verifyTimestamp) {

  // verify that appropriate messages are written to kafka
  List<ProducerRecord<Integer, Long>> sent = mockProducer.history();

  // sort by values
  sent.sort(Comparator.comparingLong(ProducerRecord::value));

  for (int i = 0; i < numElements; i++) {
    ProducerRecord<Integer, Long> record = sent.get(i);
    assertEquals(topic, record.topic());
    if (keyIsAbsent) {
      assertNull(record.key());
    } else {
      assertEquals(i, record.key().intValue());
    }
    assertEquals(i, record.value().longValue());
    if (verifyTimestamp) {
      assertEquals(i, record.timestamp().intValue());
    }
  }
}
 
源代码4 项目: ache   文件: KafkaTargetRepositoryTest.java
@Test
public void shouldSendDataToKafka() throws IOException {
    // given
    Page target = new Page(new URL(url), html, responseHeaders);
    target.setCrawlerId("mycrawler");
    target.setTargetRelevance(TargetRelevance.RELEVANT);
    String topicName = "ache-data-topic";

    StringSerializer ss = new StringSerializer();
    MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);

    KafkaConfig.Format format = KafkaConfig.Format.JSON;

    KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);

    // when
    repository.insert(target);
    repository.close();

    // then
    List<ProducerRecord<String, String>> history = producer.history();

    TargetModelJson page = mapper.readValue(history.get(0).value(), TargetModelJson.class);
    assertThat(page.getContentAsString(), is(html));
    assertThat(page.getUrl(), is(url));
    assertThat(page.getResponseHeaders().get("content-type").get(0), is("text/html"));
    assertThat(page.getRelevance().isRelevant(), is(TargetRelevance.RELEVANT.isRelevant()));
    assertThat(page.getRelevance().getRelevance(), is(TargetRelevance.RELEVANT.getRelevance()));
    assertThat(page.getCrawlerId(), is("mycrawler"));
}
 
源代码5 项目: ache   文件: KafkaTargetRepositoryTest.java
@Test
public void shouldSendDataToKafkaUsingCDR31() throws IOException {
    // given
    Page target = new Page(new URL(url), html, responseHeaders);
    target.setCrawlerId("mycrawler");
    target.setTargetRelevance(TargetRelevance.RELEVANT);
    String topicName = "ache-data-topic";

    StringSerializer ss = new StringSerializer();
    MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);

    KafkaConfig.Format format = KafkaConfig.Format.CDR31;

    KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);

    // when
    repository.insert(target);
    repository.close();

    // then
    List<ProducerRecord<String, String>> history = producer.history();

    CDR31Document page = mapper.readValue(history.get(0).value(), CDR31Document.class);
    assertThat(page.getRawContent(), is(html));
    assertThat(page.getUrl(), is(url));
    assertThat(page.getResponseHeaders().get("content-type"), is("text/html"));
    assertThat(page.getCrawler(), is("mycrawler"));
}
 
源代码6 项目: ache   文件: KafkaTargetRepositoryTest.java
@Test
public void shouldSendDataToKafkaUsingElasticsearchJsonFormat() throws IOException {
    // given
    Page target = new Page(new URL(url), html, responseHeaders);
    target.setCrawlerId("mycrawler");
    target.setTargetRelevance(TargetRelevance.RELEVANT);
    String topicName = "ache-data-topic";

    StringSerializer ss = new StringSerializer();
    MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);

    KafkaConfig.Format format = KafkaConfig.Format.ELASTIC;

    KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);

    // when
    repository.insert(target);
    repository.close();

    // then
    List<ProducerRecord<String, String>> history = producer.history();

    TargetModelElasticSearch page =
            mapper.readValue(history.get(0).value(), TargetModelElasticSearch.class);
    assertThat(page.getHtml(), is(html));
    assertThat(page.getUrl(), is(url));
    assertThat(page.getResponseHeaders().get("content-type").get(0), is("text/html"));
    assertThat(page.getCrawlerId(), is("mycrawler"));
}
 
源代码7 项目: brave   文件: KafkaTest.java
static Map<String, String> lastHeaders(MockProducer<Object, String> mockProducer) {
  Map<String, String> headers = new LinkedHashMap<>();
  List<ProducerRecord<Object, String>> history = mockProducer.history();
  ProducerRecord<Object, String> lastRecord = history.get(history.size() - 1);
  for (Header header : lastRecord.headers()) {
    headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
  }
  return headers;
}
 
@Test
public void publish_implicitKafkaTopic() {
  blockingStub.createTopic(
      Topic.newBuilder().setName("projects/project-1/topics/implicit-kafka-topic").build());

  int messages = 5;
  PublishRequest request =
      PublishRequest.newBuilder()
          .setTopic("projects/project-1/topics/implicit-kafka-topic")
          .addAllMessages(generatePubsubMessages(messages))
          .build();

  MockProducer<String, ByteBuffer> mockProducer = startPublishExecutor(messages);

  PublishResponse response = blockingStub.publish(request);
  List<String> topics = new ArrayList<>();
  List<String> data = new ArrayList<>();
  for (ProducerRecord<String, ByteBuffer> producerRecord : mockProducer.history()) {
    topics.add(producerRecord.topic());
    data.add(UTF_8.decode(producerRecord.value()).toString());
  }

  assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2", "0-3", "0-4"));
  assertThat(
      topics,
      Matchers.contains(
          "project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
          "project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
          "project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
          "project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
          "project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic"));
  assertThat(
      data, Matchers.contains("message-0", "message-1", "message-2", "message-3", "message-4"));

  verify(statisticsManager, times(5))
      .computePublish(
          eq("projects/project-1/topics/implicit-kafka-topic"),
          argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
          anyLong());
  verify(statisticsManager, never()).computePublishError(anyString());
}