下面列出了org.apache.kafka.clients.producer.MockProducer#history ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testKafkaTransportCanSend() throws UIMAException, IOException {
MockKafkaResource mockKafkaResource = new MockKafkaResource();
when(context.getResourceObject(SharedKafkaResource.RESOURCE_KEY)).thenReturn(locator);
when(locator.getResource()).thenReturn(mockKafkaResource);
KafkaTransportSender kafkaTransportSender = new KafkaTransportSender();
kafkaTransportSender.initialize(context);
JCas in = JCasFactory.createJCas();
kafkaTransportSender.process(in);
MockProducer<String, String> mockProducer = mockKafkaResource.getMockProducer();
List<ProducerRecord<String, String>> history = mockProducer.history();
assertTrue(history.size() == 1);
assertEquals(JCasSerializationTester.EMPTY_JSON, history.get(0).value());
kafkaTransportSender.closeQueue();
}
/**
* Publish tests need to manipulate the MockProducer in a separate thread from the blocking
* publish request so we'll use the PUBLISH_EXECUTOR to submit Runnables that implement the
* desired producer behaviors.
*/
@Test
public void publish() {
int messages = 5;
PublishRequest request =
PublishRequest.newBuilder()
.setTopic("projects/project-1/topics/topic-1")
.addAllMessages(generatePubsubMessages(messages))
.build();
MockProducer<String, ByteBuffer> mockProducer = startPublishExecutor(messages);
PublishResponse response = blockingStub.publish(request);
List<String> topics = new ArrayList<>();
List<String> data = new ArrayList<>();
for (ProducerRecord<String, ByteBuffer> producerRecord : mockProducer.history()) {
topics.add(producerRecord.topic());
data.add(UTF_8.decode(producerRecord.value()).toString());
}
assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2", "0-3", "0-4"));
assertThat(
topics,
Matchers.contains(
"kafka-topic-1", "kafka-topic-1", "kafka-topic-1", "kafka-topic-1", "kafka-topic-1"));
assertThat(
data, Matchers.contains("message-0", "message-1", "message-2", "message-3", "message-4"));
verify(statisticsManager, times(5))
.computePublish(
eq("projects/project-1/topics/topic-1"),
argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
anyLong());
verify(statisticsManager, never()).computePublishError(anyString());
}
private static void verifyProducerRecords(
MockProducer<Integer, Long> mockProducer,
String topic,
int numElements,
boolean keyIsAbsent,
boolean verifyTimestamp) {
// verify that appropriate messages are written to kafka
List<ProducerRecord<Integer, Long>> sent = mockProducer.history();
// sort by values
sent.sort(Comparator.comparingLong(ProducerRecord::value));
for (int i = 0; i < numElements; i++) {
ProducerRecord<Integer, Long> record = sent.get(i);
assertEquals(topic, record.topic());
if (keyIsAbsent) {
assertNull(record.key());
} else {
assertEquals(i, record.key().intValue());
}
assertEquals(i, record.value().longValue());
if (verifyTimestamp) {
assertEquals(i, record.timestamp().intValue());
}
}
}
@Test
public void shouldSendDataToKafka() throws IOException {
// given
Page target = new Page(new URL(url), html, responseHeaders);
target.setCrawlerId("mycrawler");
target.setTargetRelevance(TargetRelevance.RELEVANT);
String topicName = "ache-data-topic";
StringSerializer ss = new StringSerializer();
MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);
KafkaConfig.Format format = KafkaConfig.Format.JSON;
KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);
// when
repository.insert(target);
repository.close();
// then
List<ProducerRecord<String, String>> history = producer.history();
TargetModelJson page = mapper.readValue(history.get(0).value(), TargetModelJson.class);
assertThat(page.getContentAsString(), is(html));
assertThat(page.getUrl(), is(url));
assertThat(page.getResponseHeaders().get("content-type").get(0), is("text/html"));
assertThat(page.getRelevance().isRelevant(), is(TargetRelevance.RELEVANT.isRelevant()));
assertThat(page.getRelevance().getRelevance(), is(TargetRelevance.RELEVANT.getRelevance()));
assertThat(page.getCrawlerId(), is("mycrawler"));
}
@Test
public void shouldSendDataToKafkaUsingCDR31() throws IOException {
// given
Page target = new Page(new URL(url), html, responseHeaders);
target.setCrawlerId("mycrawler");
target.setTargetRelevance(TargetRelevance.RELEVANT);
String topicName = "ache-data-topic";
StringSerializer ss = new StringSerializer();
MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);
KafkaConfig.Format format = KafkaConfig.Format.CDR31;
KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);
// when
repository.insert(target);
repository.close();
// then
List<ProducerRecord<String, String>> history = producer.history();
CDR31Document page = mapper.readValue(history.get(0).value(), CDR31Document.class);
assertThat(page.getRawContent(), is(html));
assertThat(page.getUrl(), is(url));
assertThat(page.getResponseHeaders().get("content-type"), is("text/html"));
assertThat(page.getCrawler(), is("mycrawler"));
}
@Test
public void shouldSendDataToKafkaUsingElasticsearchJsonFormat() throws IOException {
// given
Page target = new Page(new URL(url), html, responseHeaders);
target.setCrawlerId("mycrawler");
target.setTargetRelevance(TargetRelevance.RELEVANT);
String topicName = "ache-data-topic";
StringSerializer ss = new StringSerializer();
MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);
KafkaConfig.Format format = KafkaConfig.Format.ELASTIC;
KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);
// when
repository.insert(target);
repository.close();
// then
List<ProducerRecord<String, String>> history = producer.history();
TargetModelElasticSearch page =
mapper.readValue(history.get(0).value(), TargetModelElasticSearch.class);
assertThat(page.getHtml(), is(html));
assertThat(page.getUrl(), is(url));
assertThat(page.getResponseHeaders().get("content-type").get(0), is("text/html"));
assertThat(page.getCrawlerId(), is("mycrawler"));
}
static Map<String, String> lastHeaders(MockProducer<Object, String> mockProducer) {
Map<String, String> headers = new LinkedHashMap<>();
List<ProducerRecord<Object, String>> history = mockProducer.history();
ProducerRecord<Object, String> lastRecord = history.get(history.size() - 1);
for (Header header : lastRecord.headers()) {
headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
}
return headers;
}
@Test
public void publish_implicitKafkaTopic() {
blockingStub.createTopic(
Topic.newBuilder().setName("projects/project-1/topics/implicit-kafka-topic").build());
int messages = 5;
PublishRequest request =
PublishRequest.newBuilder()
.setTopic("projects/project-1/topics/implicit-kafka-topic")
.addAllMessages(generatePubsubMessages(messages))
.build();
MockProducer<String, ByteBuffer> mockProducer = startPublishExecutor(messages);
PublishResponse response = blockingStub.publish(request);
List<String> topics = new ArrayList<>();
List<String> data = new ArrayList<>();
for (ProducerRecord<String, ByteBuffer> producerRecord : mockProducer.history()) {
topics.add(producerRecord.topic());
data.add(UTF_8.decode(producerRecord.value()).toString());
}
assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2", "0-3", "0-4"));
assertThat(
topics,
Matchers.contains(
"project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
"project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
"project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
"project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
"project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic"));
assertThat(
data, Matchers.contains("message-0", "message-1", "message-2", "message-3", "message-4"));
verify(statisticsManager, times(5))
.computePublish(
eq("projects/project-1/topics/implicit-kafka-topic"),
argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
anyLong());
verify(statisticsManager, never()).computePublishError(anyString());
}