下面列出了怎么用org.apache.kafka.clients.producer.MockProducer的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testSendMessage() throws Exception {
TestWorkItemManager manager = new TestWorkItemManager();
WorkItemImpl workItem = new WorkItemImpl();
workItem.setParameter("Topic",
"myTopic");
workItem.setParameter("Key",
"1");
workItem.setParameter("Value",
"Sample");
Producer<Long, String> mockProducer = new MockProducer();
KafkaWorkItemHandler handler = new KafkaWorkItemHandler(mockProducer);
handler.executeWorkItem(workItem,
manager);
assertNotNull(manager.getResults());
}
@Test
public void testKafkaTransportCanSend() throws UIMAException, IOException {
MockKafkaResource mockKafkaResource = new MockKafkaResource();
when(context.getResourceObject(SharedKafkaResource.RESOURCE_KEY)).thenReturn(locator);
when(locator.getResource()).thenReturn(mockKafkaResource);
KafkaTransportSender kafkaTransportSender = new KafkaTransportSender();
kafkaTransportSender.initialize(context);
JCas in = JCasFactory.createJCas();
kafkaTransportSender.process(in);
MockProducer<String, String> mockProducer = mockKafkaResource.getMockProducer();
List<ProducerRecord<String, String>> history = mockProducer.history();
assertTrue(history.size() == 1);
assertEquals(JCasSerializationTester.EMPTY_JSON, history.get(0).value());
kafkaTransportSender.closeQueue();
}
@Test
void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber() throws ExecutionException, InterruptedException {
PartitionInfo partitionInfo0 = new PartitionInfo(TOPIC_NAME, 0, null, null, null);
PartitionInfo partitionInfo1 = new PartitionInfo(TOPIC_NAME, 1, null, null, null);
List<PartitionInfo> list = new ArrayList<>();
list.add(partitionInfo0);
list.add(partitionInfo1);
Cluster cluster = new Cluster("kafkab", new ArrayList<Node>(), list, emptySet(), emptySet());
this.mockProducer = new MockProducer<>(cluster, true, new EvenOddPartitioner(), new StringSerializer(), new StringSerializer());
//when
kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("partition", "{\"site\" : \"baeldung\"}");
//then
assertTrue(recordMetadataFuture.get().partition() == 1);
}
/**
* Publish tests need to manipulate the MockProducer in a separate thread from the blocking
* publish request so we'll use the PUBLISH_EXECUTOR to submit Runnables that implement the
* desired producer behaviors.
*/
@Test
public void publish() {
int messages = 5;
PublishRequest request =
PublishRequest.newBuilder()
.setTopic("projects/project-1/topics/topic-1")
.addAllMessages(generatePubsubMessages(messages))
.build();
MockProducer<String, ByteBuffer> mockProducer = startPublishExecutor(messages);
PublishResponse response = blockingStub.publish(request);
List<String> topics = new ArrayList<>();
List<String> data = new ArrayList<>();
for (ProducerRecord<String, ByteBuffer> producerRecord : mockProducer.history()) {
topics.add(producerRecord.topic());
data.add(UTF_8.decode(producerRecord.value()).toString());
}
assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2", "0-3", "0-4"));
assertThat(
topics,
Matchers.contains(
"kafka-topic-1", "kafka-topic-1", "kafka-topic-1", "kafka-topic-1", "kafka-topic-1"));
assertThat(
data, Matchers.contains("message-0", "message-1", "message-2", "message-3", "message-4"));
verify(statisticsManager, times(5))
.computePublish(
eq("projects/project-1/topics/topic-1"),
argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
anyLong());
verify(statisticsManager, never()).computePublishError(anyString());
}
@Test
public void publish_withAttributes() {
int messages = 3;
PublishRequest request =
PublishRequest.newBuilder()
.setTopic("projects/project-1/topics/topic-2")
.addAllMessages(generatePubsubMessagesWithHeader(messages))
.build();
MockProducer<String, ByteBuffer> producer = startPublishExecutor(messages);
PublishResponse response = blockingStub.publish(request);
assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2"));
List<Headers> headers =
producer.history().stream().map(ProducerRecord::headers).collect(Collectors.toList());
assertThat(
headers,
Matchers.contains(
new RecordHeaders(
Collections.singletonList(
new RecordHeader("some-key", "some-value".getBytes(UTF_8)))),
new RecordHeaders(
Collections.singletonList(
new RecordHeader("some-key", "some-value".getBytes(UTF_8)))),
new RecordHeaders(
Collections.singletonList(
new RecordHeader("some-key", "some-value".getBytes(UTF_8))))));
verify(statisticsManager, times(3))
.computePublish(
eq("projects/project-1/topics/topic-2"),
argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
anyLong());
verify(statisticsManager, never()).computePublishError(anyString());
}
@Test
public void publish_producerFails() {
int messages = 5;
PublishRequest request =
PublishRequest.newBuilder()
.setTopic("projects/project-1/topics/topic-1")
.addAllMessages(generatePubsubMessages(messages))
.build();
PUBLISH_EXECUTOR.submit(
() -> {
MockProducer<String, ByteBuffer> producer =
kafkaClientFactory.getCreatedProducers().get(0);
while (producer.history().size() < messages) {
Thread.yield();
}
for (int i = 0; i < messages; i++) {
producer.errorNext(new RuntimeException("Send Operation Failed"));
}
});
expectedException.expect(StatusRuntimeException.class);
expectedException.expectMessage(Status.INTERNAL.getCode().toString());
blockingStub.publish(request);
verify(statisticsManager).computePublishError(eq("projects/project-1/topics/topic-1"));
verify(statisticsManager, never())
.computePublish(anyString(), any(ByteString.class), anyLong());
}
@Test
public void publish_producerTimesOut() {
int messages = 5;
PublishRequest request =
PublishRequest.newBuilder()
.setTopic("projects/project-1/topics/topic-1")
.addAllMessages(generatePubsubMessages(messages))
.build();
PUBLISH_EXECUTOR.submit(
() -> {
MockProducer<String, ByteBuffer> producer =
kafkaClientFactory.getCreatedProducers().get(0);
while (producer.history().size() < messages) {
Thread.yield();
}
for (int i = 0; i < messages - 1; i++) {
producer.completeNext();
}
});
PublishResponse response = blockingStub.publish(request);
assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2", "0-3"));
verify(statisticsManager, times(4))
.computePublish(
eq("projects/project-1/topics/topic-1"),
argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
anyLong());
verify(statisticsManager, never()).computePublishError(anyString());
}
private MockProducer<String, ByteBuffer> startPublishExecutor(int numberOfMessages) {
MockProducer<String, ByteBuffer> producer = kafkaClientFactory.getCreatedProducers().get(0);
PUBLISH_EXECUTOR.submit(
() -> {
while (producer.history().size() < numberOfMessages) {
Thread.yield();
}
for (int i = 0; i < numberOfMessages; i++) {
producer.completeNext();
}
});
return producer;
}
private static void verifyProducerRecords(
MockProducer<Integer, Long> mockProducer,
String topic,
int numElements,
boolean keyIsAbsent,
boolean verifyTimestamp) {
// verify that appropriate messages are written to kafka
List<ProducerRecord<Integer, Long>> sent = mockProducer.history();
// sort by values
sent.sort(Comparator.comparingLong(ProducerRecord::value));
for (int i = 0; i < numElements; i++) {
ProducerRecord<Integer, Long> record = sent.get(i);
assertEquals(topic, record.topic());
if (keyIsAbsent) {
assertNull(record.key());
} else {
assertEquals(i, record.key().intValue());
}
assertEquals(i, record.value().longValue());
if (verifyTimestamp) {
assertEquals(i, record.timestamp().intValue());
}
}
}
MockProducerWrapper() {
producerKey = String.valueOf(ThreadLocalRandom.current().nextLong());
mockProducer =
new MockProducer<Integer, Long>(
false, // disable synchronous completion of send. see ProducerSendCompletionThread
// below.
new IntegerSerializer(),
new LongSerializer()) {
// override flush() so that it does not complete all the waiting sends, giving a chance
// to
// ProducerCompletionThread to inject errors.
@Override
public synchronized void flush() {
while (completeNext()) {
// there are some uncompleted records. let the completion thread handle them.
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// ok to retry.
}
}
}
};
// Add the producer to the global map so that producer factory function can access it.
assertNull(MOCK_PRODUCER_MAP.putIfAbsent(producerKey, mockProducer));
}
ProducerSendCompletionThread(
MockProducer<Integer, Long> mockProducer, int maxErrors, int errorFrequency) {
this.mockProducer = mockProducer;
this.maxErrors = maxErrors;
this.errorFrequency = errorFrequency;
injectorThread = Executors.newSingleThreadExecutor();
}
@Test
public void shouldSendDataToKafka() throws IOException {
// given
Page target = new Page(new URL(url), html, responseHeaders);
target.setCrawlerId("mycrawler");
target.setTargetRelevance(TargetRelevance.RELEVANT);
String topicName = "ache-data-topic";
StringSerializer ss = new StringSerializer();
MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);
KafkaConfig.Format format = KafkaConfig.Format.JSON;
KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);
// when
repository.insert(target);
repository.close();
// then
List<ProducerRecord<String, String>> history = producer.history();
TargetModelJson page = mapper.readValue(history.get(0).value(), TargetModelJson.class);
assertThat(page.getContentAsString(), is(html));
assertThat(page.getUrl(), is(url));
assertThat(page.getResponseHeaders().get("content-type").get(0), is("text/html"));
assertThat(page.getRelevance().isRelevant(), is(TargetRelevance.RELEVANT.isRelevant()));
assertThat(page.getRelevance().getRelevance(), is(TargetRelevance.RELEVANT.getRelevance()));
assertThat(page.getCrawlerId(), is("mycrawler"));
}
@Test
public void shouldSendDataToKafkaUsingCDR31() throws IOException {
// given
Page target = new Page(new URL(url), html, responseHeaders);
target.setCrawlerId("mycrawler");
target.setTargetRelevance(TargetRelevance.RELEVANT);
String topicName = "ache-data-topic";
StringSerializer ss = new StringSerializer();
MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);
KafkaConfig.Format format = KafkaConfig.Format.CDR31;
KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);
// when
repository.insert(target);
repository.close();
// then
List<ProducerRecord<String, String>> history = producer.history();
CDR31Document page = mapper.readValue(history.get(0).value(), CDR31Document.class);
assertThat(page.getRawContent(), is(html));
assertThat(page.getUrl(), is(url));
assertThat(page.getResponseHeaders().get("content-type"), is("text/html"));
assertThat(page.getCrawler(), is("mycrawler"));
}
@Test
public void shouldSendDataToKafkaUsingElasticsearchJsonFormat() throws IOException {
// given
Page target = new Page(new URL(url), html, responseHeaders);
target.setCrawlerId("mycrawler");
target.setTargetRelevance(TargetRelevance.RELEVANT);
String topicName = "ache-data-topic";
StringSerializer ss = new StringSerializer();
MockProducer<String, String> producer = new MockProducer<>(true, ss, ss);
KafkaConfig.Format format = KafkaConfig.Format.ELASTIC;
KafkaTargetRepository repository = new KafkaTargetRepository(producer, topicName, format);
// when
repository.insert(target);
repository.close();
// then
List<ProducerRecord<String, String>> history = producer.history();
TargetModelElasticSearch page =
mapper.readValue(history.get(0).value(), TargetModelElasticSearch.class);
assertThat(page.getHtml(), is(html));
assertThat(page.getUrl(), is(url));
assertThat(page.getResponseHeaders().get("content-type").get(0), is("text/html"));
assertThat(page.getCrawlerId(), is("mycrawler"));
}
static Map<String, String> lastHeaders(MockProducer<Object, String> mockProducer) {
Map<String, String> headers = new LinkedHashMap<>();
List<ProducerRecord<Object, String>> history = mockProducer.history();
ProducerRecord<Object, String> lastRecord = history.get(history.size() - 1);
for (Header header : lastRecord.headers()) {
headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
}
return headers;
}
@Test
public void publish_implicitKafkaTopic() {
blockingStub.createTopic(
Topic.newBuilder().setName("projects/project-1/topics/implicit-kafka-topic").build());
int messages = 5;
PublishRequest request =
PublishRequest.newBuilder()
.setTopic("projects/project-1/topics/implicit-kafka-topic")
.addAllMessages(generatePubsubMessages(messages))
.build();
MockProducer<String, ByteBuffer> mockProducer = startPublishExecutor(messages);
PublishResponse response = blockingStub.publish(request);
List<String> topics = new ArrayList<>();
List<String> data = new ArrayList<>();
for (ProducerRecord<String, ByteBuffer> producerRecord : mockProducer.history()) {
topics.add(producerRecord.topic());
data.add(UTF_8.decode(producerRecord.value()).toString());
}
assertThat(response.getMessageIdsList(), Matchers.contains("0-0", "0-1", "0-2", "0-3", "0-4"));
assertThat(
topics,
Matchers.contains(
"project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
"project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
"project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
"project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic",
"project-1" + KAFKA_TOPIC_SEPARATOR + "implicit-kafka-topic"));
assertThat(
data, Matchers.contains("message-0", "message-1", "message-2", "message-3", "message-4"));
verify(statisticsManager, times(5))
.computePublish(
eq("projects/project-1/topics/implicit-kafka-topic"),
argThat(message -> message.toStringUtf8().matches(MESSAGE_CONTENT_REGEX)),
anyLong());
verify(statisticsManager, never()).computePublishError(anyString());
}
List<MockProducer<String, ByteBuffer>> getCreatedProducers() {
return createdProducers;
}
@Override
public Producer<String, ByteBuffer> createProducer() {
MockProducer<String, ByteBuffer> producer = new MockProducer<>();
createdProducers.add(producer);
return producer;
}
public MockLiKafkaProducer() {
_delegate = new MockProducer<>(false, new ByteArraySerializer(), new ByteArraySerializer());
}
public MockProducer<byte[], byte[]> getDelegate() {
return _delegate;
}
@BeforeClass
public static void setUp() {
eventuateKafkaProducer = new EventuateKafkaProducer();
eventuateKafkaProducer.setProducer( new MockProducer(true, null , null, null) );
event = new PublishedEvent("eventId", "entityId", "entityType", "eventJson", "eventType", null, Optional.of("metadata"));
}
ProducerSendCompletionThread(MockProducer<Integer, Long> mockProducer) {
// complete everything successfully
this(mockProducer, 0, 0);
}
/** @return the mock producer for testing */
public MockProducer<String, String> getMockProducer() {
return mockProducer;
}
private void buildMockProducer(boolean autoComplete) {
this.mockProducer = new MockProducer<>(autoComplete, new StringSerializer(), new StringSerializer());
}