类org.apache.kafka.clients.producer.MockProducer源码实例Demo

下面列出了怎么用org.apache.kafka.clients.producer.MockProducer的API类实例代码及写法,或者点击链接到github查看源代码。

@Test
public void testSendMessage() throws Exception {

    TestWorkItemManager manager = new TestWorkItemManager();
    WorkItemImpl workItem = new WorkItemImpl();
    workItem.setParameter("Topic",
                          "myTopic");
    workItem.setParameter("Key",
                          "1");
    workItem.setParameter("Value",
                          "Sample");

    Producer<Long, String> mockProducer = new MockProducer();
    KafkaWorkItemHandler handler = new KafkaWorkItemHandler(mockProducer);
    handler.executeWorkItem(workItem,
                            manager);
    assertNotNull(manager.getResults());
}
 
源代码2 项目: baleen   文件: KafkaTransportsTest.java
@Test
public void testKafkaTransportCanSend() throws UIMAException, IOException {

  MockKafkaResource mockKafkaResource = new MockKafkaResource();
  when(context.getResourceObject(SharedKafkaResource.RESOURCE_KEY)).thenReturn(locator);
  when(locator.getResource()).thenReturn(mockKafkaResource);

  KafkaTransportSender kafkaTransportSender = new KafkaTransportSender();
  kafkaTransportSender.initialize(context);

  JCas in = JCasFactory.createJCas();
  kafkaTransportSender.process(in);

  MockProducer<String, String> mockProducer = mockKafkaResource.getMockProducer();

  List<ProducerRecord<String, String>> history = mockProducer.history();
  assertTrue(history.size() == 1);
  assertEquals(JCasSerializationTester.EMPTY_JSON, history.get(0).value());
  kafkaTransportSender.closeQueue();
}
 
源代码3 项目: tutorials   文件: KafkaProducerUnitTest.java
@Test
void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber() throws ExecutionException, InterruptedException {

    PartitionInfo partitionInfo0 = new PartitionInfo(TOPIC_NAME, 0, null, null, null);
    PartitionInfo partitionInfo1 = new PartitionInfo(TOPIC_NAME, 1, null, null, null);
    List<PartitionInfo> list = new ArrayList<>();
    list.add(partitionInfo0);
    list.add(partitionInfo1);
    Cluster cluster = new Cluster("kafkab", new ArrayList<Node>(), list, emptySet(), emptySet());
    this.mockProducer = new MockProducer<>(cluster, true, new EvenOddPartitioner(), new StringSerializer(), new StringSerializer());
    //when
    kafkaProducer = new KafkaProducer(mockProducer);
    Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("partition", "{\"site\" : \"baeldung\"}");

    //then
    assertTrue(recordMetadataFuture.get().partition() == 1);

}
 
/**
 * Publish tests need to manipulate the MockProducer in a separate thread from the blocking
 * publish request so we'll use the PUBLISH_EXECUTOR to submit Runnables that implement the
 * desired producer behaviors.
 */
@Test
public void publish() {
  int messages = 5;
  PublishRequest request =
      PublishRequest.newBuilder()
          .setTopic("projects/project-1/topics/topic-1")
          .addAllMessages(generatePubsubMessages(messages))
          .build();

  MockProducer<String, ByteBuffer> mockProducer = startPublishExecutor(messages);

  PublishResponse response = blockingStub.publish(request);
  List<String> topics = new ArrayList<>();
  List<String> data = new ArrayList<>();
  for (ProducerRecord<String, ByteBuffer> producerRecord : mockProducer.history()) {
    topics.add(producerRecord.topic());
    data.add(UTF_8.decode(producerRecord.value()).toString());
  }

  assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2", "0-3", "0-4"));
  assertThat(
      topics,
      Matchers.contains(
          "kafka-topic-1", "kafka-topic-1", "kafka-topic-1", "kafka-topic-1", "kafka-topic-1"));
  assertThat(
      data, Matchers.contains("message-0", "message-1", "message-2", "message-3", "message-4"));

  verify(statisticsManager, times(5))
      .computePublish(
          eq("projects/project-1/topics/topic-1"),
          argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
          anyLong());
  verify(statisticsManager, never()).computePublishError(anyString());
}
 
@Test
public void publish_withAttributes() {
  int messages = 3;
  PublishRequest request =
      PublishRequest.newBuilder()
          .setTopic("projects/project-1/topics/topic-2")
          .addAllMessages(generatePubsubMessagesWithHeader(messages))
          .build();

  MockProducer<String, ByteBuffer> producer = startPublishExecutor(messages);

  PublishResponse response = blockingStub.publish(request);
  assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2"));

  List<Headers> headers =
      producer.history().stream().map(ProducerRecord::headers).collect(Collectors.toList());
  assertThat(
      headers,
      Matchers.contains(
          new RecordHeaders(
              Collections.singletonList(
                  new RecordHeader("some-key", "some-value".getBytes(UTF_8)))),
          new RecordHeaders(
              Collections.singletonList(
                  new RecordHeader("some-key", "some-value".getBytes(UTF_8)))),
          new RecordHeaders(
              Collections.singletonList(
                  new RecordHeader("some-key", "some-value".getBytes(UTF_8))))));

  verify(statisticsManager, times(3))
      .computePublish(
          eq("projects/project-1/topics/topic-2"),
          argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
          anyLong());
  verify(statisticsManager, never()).computePublishError(anyString());
}
 
@Test
public void publish_producerFails() {
  int messages = 5;
  PublishRequest request =
      PublishRequest.newBuilder()
          .setTopic("projects/project-1/topics/topic-1")
          .addAllMessages(generatePubsubMessages(messages))
          .build();

  PUBLISH_EXECUTOR.submit(
      () -> {
        MockProducer<String, ByteBuffer> producer =
            kafkaClientFactory.getCreatedProducers().get(0);
        while (producer.history().size() < messages) {
          Thread.yield();
        }
        for (int i = 0; i < messages; i++) {
          producer.errorNext(new RuntimeException("Send Operation Failed"));
        }
      });

  expectedException.expect(StatusRuntimeException.class);
  expectedException.expectMessage(Status.INTERNAL.getCode().toString());

  blockingStub.publish(request);
  verify(statisticsManager).computePublishError(eq("projects/project-1/topics/topic-1"));
  verify(statisticsManager, never())
      .computePublish(anyString(), any(ByteString.class), anyLong());
}
 
@Test
public void publish_producerTimesOut() {
  int messages = 5;
  PublishRequest request =
      PublishRequest.newBuilder()
          .setTopic("projects/project-1/topics/topic-1")
          .addAllMessages(generatePubsubMessages(messages))
          .build();
  PUBLISH_EXECUTOR.submit(
      () -> {
        MockProducer<String, ByteBuffer> producer =
            kafkaClientFactory.getCreatedProducers().get(0);
        while (producer.history().size() < messages) {
          Thread.yield();
        }
        for (int i = 0; i < messages - 1; i++) {
          producer.completeNext();
        }
      });

  PublishResponse response = blockingStub.publish(request);
  assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2", "0-3"));

  verify(statisticsManager, times(4))
      .computePublish(
          eq("projects/project-1/topics/topic-1"),
          argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
          anyLong());
  verify(statisticsManager, never()).computePublishError(anyString());
}
 
private MockProducer<String, ByteBuffer> startPublishExecutor(int numberOfMessages) {
  MockProducer<String, ByteBuffer> producer = kafkaClientFactory.getCreatedProducers().get(0);
  PUBLISH_EXECUTOR.submit(
      () -> {
        while (producer.history().size() < numberOfMessages) {
          Thread.yield();
        }
        for (int i = 0; i < numberOfMessages; i++) {
          producer.completeNext();
        }
      });
  return producer;
}
 
源代码9 项目: beam   文件: KafkaIOTest.java
private static void verifyProducerRecords(
    MockProducer<Integer, Long> mockProducer,
    String topic,
    int numElements,
    boolean keyIsAbsent,
    boolean verifyTimestamp) {

  // verify that appropriate messages are written to kafka
  List<ProducerRecord<Integer, Long>> sent = mockProducer.history();

  // sort by values
  sent.sort(Comparator.comparingLong(ProducerRecord::value));

  for (int i = 0; i < numElements; i++) {
    ProducerRecord<Integer, Long> record = sent.get(i);
    assertEquals(topic, record.topic());
    if (keyIsAbsent) {
      assertNull(record.key());
    } else {
      assertEquals(i, record.key().intValue());
    }
    assertEquals(i, record.value().longValue());
    if (verifyTimestamp) {
      assertEquals(i, record.timestamp().intValue());
    }
  }
}
 
源代码10 项目: beam   文件: KafkaIOTest.java
MockProducerWrapper() {
  producerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
  mockProducer =
      new MockProducer<Integer, Long>(
          false, // disable synchronous completion of send. see ProducerSendCompletionThread
          // below.
          new IntegerSerializer(),
          new LongSerializer()) {

        // override flush() so that it does not complete all the waiting sends, giving a chance
        // to
        // ProducerCompletionThread to inject errors.

        @Override
        public synchronized void flush() {
          while (completeNext()) {
            // there are some uncompleted records. let the completion thread handle them.
            try {
              Thread.sleep(10);
            } catch (InterruptedException e) {
              // ok to retry.
            }
          }
        }
      };

  // Add the producer to the global map so that producer factory function can access it.
  assertNull(MOCK_PRODUCER_MAP.putIfAbsent(producerKey, mockProducer));
}
 
源代码11 项目: beam   文件: KafkaIOTest.java
ProducerSendCompletionThread(
    MockProducer<Integer, Long> mockProducer, int maxErrors, int errorFrequency) {
  this.mockProducer = mockProducer;
  this.maxErrors = maxErrors;
  this.errorFrequency = errorFrequency;
  injectorThread = Executors.newSingleThreadExecutor();
}
 
源代码12 项目: ache   文件: KafkaTargetRepositoryTest.java
@Test
public void shouldSendDataToKafka() throws IOException {
    // given
    Page target = new Page(new URL(url), html, responseHeaders);
    target.setCrawlerId("mycrawler");
    target.setTargetRelevance(TargetRelevance.RELEVANT);
    String topicName = "ache-data-topic";

    StringSerializer ss = new StringSerializer();
    MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);

    KafkaConfig.Format format = KafkaConfig.Format.JSON;

    KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);

    // when
    repository.insert(target);
    repository.close();

    // then
    List<ProducerRecord<String, String>> history = producer.history();

    TargetModelJson page = mapper.readValue(history.get(0).value(), TargetModelJson.class);
    assertThat(page.getContentAsString(), is(html));
    assertThat(page.getUrl(), is(url));
    assertThat(page.getResponseHeaders().get("content-type").get(0), is("text/html"));
    assertThat(page.getRelevance().isRelevant(), is(TargetRelevance.RELEVANT.isRelevant()));
    assertThat(page.getRelevance().getRelevance(), is(TargetRelevance.RELEVANT.getRelevance()));
    assertThat(page.getCrawlerId(), is("mycrawler"));
}
 
源代码13 项目: ache   文件: KafkaTargetRepositoryTest.java
@Test
public void shouldSendDataToKafkaUsingCDR31() throws IOException {
    // given
    Page target = new Page(new URL(url), html, responseHeaders);
    target.setCrawlerId("mycrawler");
    target.setTargetRelevance(TargetRelevance.RELEVANT);
    String topicName = "ache-data-topic";

    StringSerializer ss = new StringSerializer();
    MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);

    KafkaConfig.Format format = KafkaConfig.Format.CDR31;

    KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);

    // when
    repository.insert(target);
    repository.close();

    // then
    List<ProducerRecord<String, String>> history = producer.history();

    CDR31Document page = mapper.readValue(history.get(0).value(), CDR31Document.class);
    assertThat(page.getRawContent(), is(html));
    assertThat(page.getUrl(), is(url));
    assertThat(page.getResponseHeaders().get("content-type"), is("text/html"));
    assertThat(page.getCrawler(), is("mycrawler"));
}
 
源代码14 项目: ache   文件: KafkaTargetRepositoryTest.java
@Test
public void shouldSendDataToKafkaUsingElasticsearchJsonFormat() throws IOException {
    // given
    Page target = new Page(new URL(url), html, responseHeaders);
    target.setCrawlerId("mycrawler");
    target.setTargetRelevance(TargetRelevance.RELEVANT);
    String topicName = "ache-data-topic";

    StringSerializer ss = new StringSerializer();
    MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);

    KafkaConfig.Format format = KafkaConfig.Format.ELASTIC;

    KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);

    // when
    repository.insert(target);
    repository.close();

    // then
    List<ProducerRecord<String, String>> history = producer.history();

    TargetModelElasticSearch page =
            mapper.readValue(history.get(0).value(), TargetModelElasticSearch.class);
    assertThat(page.getHtml(), is(html));
    assertThat(page.getUrl(), is(url));
    assertThat(page.getResponseHeaders().get("content-type").get(0), is("text/html"));
    assertThat(page.getCrawlerId(), is("mycrawler"));
}
 
源代码15 项目: brave   文件: KafkaTest.java
static Map<String, String> lastHeaders(MockProducer<Object, String> mockProducer) {
  Map<String, String> headers = new LinkedHashMap<>();
  List<ProducerRecord<Object, String>> history = mockProducer.history();
  ProducerRecord<Object, String> lastRecord = history.get(history.size() - 1);
  for (Header header : lastRecord.headers()) {
    headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
  }
  return headers;
}
 
@Test
public void publish_implicitKafkaTopic() {
  blockingStub.createTopic(
      Topic.newBuilder().setName("projects/project-1/topics/implicit-kafka-topic").build());

  int messages = 5;
  PublishRequest request =
      PublishRequest.newBuilder()
          .setTopic("projects/project-1/topics/implicit-kafka-topic")
          .addAllMessages(generatePubsubMessages(messages))
          .build();

  MockProducer<String, ByteBuffer> mockProducer = startPublishExecutor(messages);

  PublishResponse response = blockingStub.publish(request);
  List<String> topics = new ArrayList<>();
  List<String> data = new ArrayList<>();
  for (ProducerRecord<String, ByteBuffer> producerRecord : mockProducer.history()) {
    topics.add(producerRecord.topic());
    data.add(UTF_8.decode(producerRecord.value()).toString());
  }

  assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2", "0-3", "0-4"));
  assertThat(
      topics,
      Matchers.contains(
          "project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
          "project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
          "project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
          "project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
          "project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic"));
  assertThat(
      data, Matchers.contains("message-0", "message-1", "message-2", "message-3", "message-4"));

  verify(statisticsManager, times(5))
      .computePublish(
          eq("projects/project-1/topics/implicit-kafka-topic"),
          argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
          anyLong());
  verify(statisticsManager, never()).computePublishError(anyString());
}
 
List<MockProducer<String, ByteBuffer>> getCreatedProducers() {
  return createdProducers;
}
 
@Override
public Producer<String, ByteBuffer> createProducer() {
  MockProducer<String, ByteBuffer> producer = new MockProducer<>();
  createdProducers.add(producer);
  return producer;
}
 
public MockLiKafkaProducer() {
  _delegate = new MockProducer<>(false, new ByteArraySerializer(), new ByteArraySerializer());
}
 
public MockProducer<byte[], byte[]> getDelegate() {
  return _delegate;
}
 
@BeforeClass
public static void setUp() {
    eventuateKafkaProducer = new EventuateKafkaProducer();
    eventuateKafkaProducer.setProducer( new MockProducer(true, null , null, null) );
    event = new PublishedEvent("eventId", "entityId", "entityType", "eventJson", "eventType", null, Optional.of("metadata"));
}
 
源代码22 项目: beam   文件: KafkaIOTest.java
ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer) {
  // complete everything successfully
  this(mockProducer, 0, 0);
}
 
源代码23 项目: baleen   文件: MockKafkaResource.java
/** @return the mock producer for testing */
public MockProducer<String, String> getMockProducer() {
  return mockProducer;
}
 
源代码24 项目: tutorials   文件: KafkaProducerUnitTest.java
private void buildMockProducer(boolean autoComplete) {
    this.mockProducer = new MockProducer<>(autoComplete, new StringSerializer(), new StringSerializer());
}