下面列出了com.codahale.metrics.SlidingWindowReservoir#org.apache.kafka.common.TopicPartition 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testAssignBalanced() {
List<TopicPartition> partitions =
Arrays.asList(
new TopicPartition("a", 0),
new TopicPartition("a", 1),
new TopicPartition("a", 2),
new TopicPartition("b", 0),
new TopicPartition("b", 1),
new TopicPartition("b", 2));
List<List<TopicPartition>> result = sourceTaskAssignor.assign(partitions, 2);
assertThat(
result,
is(
Arrays.asList(
Arrays.asList(
new TopicPartition("a", 0),
new TopicPartition("a", 1),
new TopicPartition("a", 2)),
Arrays.asList(
new TopicPartition("b", 0),
new TopicPartition("b", 1),
new TopicPartition("b", 2)))));
}
@Test
public void testTreatBadSegmentAsPayload() {
Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer();
MessageAssembler messageAssembler = new MessageAssemblerImpl(100, 100, true, segmentDeserializer);
TopicPartition tp = new TopicPartition("topic", 0);
UUID uuid = UUID.randomUUID();
byte[] realPayload = "message".getBytes();
LargeMessageSegment badSegment = new LargeMessageSegment(uuid, -1, 100, -1, ByteBuffer.wrap(realPayload));
byte[] messageWrappedBytes = segmentSerializer.serialize(tp.topic(), badSegment);
Assert.assertTrue(messageWrappedBytes.length > realPayload.length); //wrapping has been done
messageAssembler.assemble(tp, 0, messageWrappedBytes);
MessageAssembler.AssembleResult assembleResult = messageAssembler.assemble(tp, 0, messageWrappedBytes);
Assert.assertEquals(assembleResult.messageBytes(), messageWrappedBytes);
Assert.assertEquals(assembleResult.messageStartingOffset(), 0);
Assert.assertEquals(assembleResult.messageEndingOffset(), 0);
}
@Test
public void testTwoConsumersOneTopicOnePartition() {
String topic = "topic";
String consumer1 = "consumer1";
String consumer2 = "consumer2";
Map<String, Integer> partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put(topic, 1);
Map<String, Subscription> consumers = new HashMap<>();
consumers.put(consumer1, new Subscription(Collections.singletonList(topic)));
consumers.put(consumer2, new Subscription(Collections.singletonList(topic)));
Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
assertEquals(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1));
assertEquals(Collections.<TopicPartition>emptyList(), assignment.get(consumer2));
}
public MaasEvent(String topic, String dataTopic) {
super(01);
this.topic = topic;
this.dataTopic = dataTopic;
dao = new DbusDataDaoImpl();
Properties props = HeartBeatConfigContainer.getInstance().getmaasConf().getConsumerProp();
Properties producerProps = HeartBeatConfigContainer.getInstance().getmaasConf().getProducerProp();
try {
LoggerFactory.getLogger().info("[topic] ...." + topic);
LoggerFactory.getLogger().info("[maas-event] initial.........................");
dataConsumer = new KafkaConsumer<>(props);
partition0 = new TopicPartition(this.topic, 0);
dataConsumer.assign(Arrays.asList(partition0));
dataConsumer.seekToEnd(Arrays.asList(partition0));
statProducer = new KafkaProducer<>(producerProps);
} catch (Exception e) {
e.printStackTrace();
LoggerFactory.getLogger().error(e.getMessage(), e);
}
}
/**
* Adds topic partition to worker instance
*
* @param topic topic name
* @param partition partition id
* @param startingOffset starting offset for topic partition
* @param endingOffset ending offset for topic partition
* @param dstTopic topic name in destination cluster
*/
public void addTopicPartition(String topic, int partition, Long startingOffset,
Long endingOffset, String dstTopic) {
if (observer != null) {
observer.addTopic(topic);
}
if (StringUtils.isNotBlank(dstTopic)) {
topicMapping.put(topic, dstTopic);
}
TopicPartition topicPartition = new TopicPartition(topic, partition);
long offset =
startingOffset != null ? startingOffset
: checkpointManager.fetchOffset(topicPartition);
LOGGER.info("Adding topic: {}, partition {}, starting offset {}",
topic, partition, offset);
PartitionOffsetInfo offsetInfo = new PartitionOffsetInfo(topicPartition, offset, endingOffset);
fetcherManager.addTopicPartition(topicPartition, offsetInfo);
}
@Test
public void testGetSystemStreamMetaDataWithRetry() {
final List<PartitionInfo> partitionInfosForTopic = ImmutableList.of(mockPartitionInfo0, mockPartitionInfo1);
when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenThrow(new RuntimeException())
.thenReturn(partitionInfosForTopic);
Map<String, SystemStreamMetadata> metadataMap =
kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(VALID_TOPIC));
assertEquals("metadata should return for 1 topic", metadataMap.size(), 1);
// retried twice because the first fails and the second succeeds
Mockito.verify(mockKafkaConsumer, Mockito.times(2)).partitionsFor(VALID_TOPIC);
final List<TopicPartition> topicPartitions =
Arrays.asList(new TopicPartition(mockPartitionInfo0.topic(), mockPartitionInfo0.partition()),
new TopicPartition(mockPartitionInfo1.topic(), mockPartitionInfo1.partition()));
// the following methods thereafter are only called once
Mockito.verify(mockKafkaConsumer, Mockito.times(1)).beginningOffsets(topicPartitions);
Mockito.verify(mockKafkaConsumer, Mockito.times(1)).endOffsets(topicPartitions);
}
@Override
public TopicPartition deserialize(String topic, byte[] data) {
if (data == null || data.length == 0) {
return null;
}
try {
ByteBuffer buf = ByteBuffer.wrap(data);
int topicLength = buf.getInt();
byte[] topicBytes = new byte[topicLength];
buf.get(topicBytes);
String otherTopic = new String(topicBytes, ENCODING);
int partition = buf.getInt();
return new TopicPartition(otherTopic, partition);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string");
}
}
@Test
public void shouldNotChangeDurationBehindOnNoRecords() {
// given
final KafkaRecordsConsumer consumer = someKafkaRecordsConsumer(fromHorizon());
durationBehindHandler.onPartitionsAssigned(asList(new TopicPartition("", 0), new TopicPartition("", 1)));
ConsumerRecord<String,String> consumerRecord = new ConsumerRecord<>("", 0, 23, now().minusSeconds(100).toEpochMilli(), TimestampType.CREATE_TIME, 0, 0, 0, "key", "value");
consumer.apply(new ConsumerRecords<>(ImmutableMap.of(new TopicPartition("", 0), singletonList(consumerRecord))));
assertThat(getSecondsBehind("0"), is(100L));
assertThat(getSecondsBehind("1"), is(9223372036854775L));
// when
consumer.apply(ConsumerRecords.empty());
// then
assertThat(getSecondsBehind("0"), is(100L));
assertThat(getSecondsBehind("1"), is(9223372036854775L));
}
public Map<Integer, Long> getCommittedOffsets(String topicName) {
synchronized (consumer) {
List<TopicPartition> topicPartitions = topicPartitions(topicName);
ImmutableMap.Builder<Integer, Long> builder = ImmutableMap.builder();
topicPartitions.forEach(tp -> {
OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);
Long offset;
if (offsetAndMetadata == null) {
offset = consumer.beginningOffsets(singleton(tp)).get(tp);
} else {
offset = offsetAndMetadata.offset();
}
builder.put(tp.partition(), offset);
});
return builder.build();
}
}
private String generateRecordKey(final TopicPartition tp, final SinkRecord headRecord) {
//FIXME move into commons lib
final Function<Parameter, String> setKafkaOffset =
usePaddingParameter -> usePaddingParameter.asBoolean()
? String.format("%020d", headRecord.kafkaOffset())
: Long.toString(headRecord.kafkaOffset());
return filenameTemplate.instance()
.bindVariable(FilenameTemplateVariable.TOPIC.name, tp::topic)
.bindVariable(
FilenameTemplateVariable.PARTITION.name,
() -> Integer.toString(tp.partition())
).bindVariable(
FilenameTemplateVariable.START_OFFSET.name,
setKafkaOffset
).bindVariable(
FilenameTemplateVariable.TIMESTAMP.name,
setTimestamp
).render();
}
@Test
public void shouldStopWhenNoAcknowledgements() throws InterruptedException {
Map<TopicPartition, OffsetAndMetadata> offsetMap1 = recordsUtil.createRecords("driver-", 3).getPartitionsCommitOffset();
Map<TopicPartition, OffsetAndMetadata> offsetMap2 = recordsUtil.createRecords("customer-", 3).getPartitionsCommitOffset();
Map<TopicPartition, OffsetAndMetadata> offsetMap3 = recordsUtil.createRecords("merchant-", 3).getPartitionsCommitOffset();
List<Map<TopicPartition, OffsetAndMetadata>> recordsList = Arrays.asList(offsetMap1, offsetMap2, offsetMap3);
commitQueue.addAll(recordsList);
committer.setDefaultSleepMs(10);
Thread committerThread = new Thread(committer);
committerThread.start();
committerThread.join();
InOrder inOrder = inOrder(kafkaConsumer);
inOrder.verify(kafkaConsumer, never()).commitSync(anyMap());
assertEquals(2, commitQueue.size());
inOrder.verify(kafkaConsumer, atLeastOnce()).wakeup(anyString());
assertTrue(acknowledgements.isEmpty());
}
@Test
public void readFromPosition_positionStartsNotBegining() throws Exception {
final List<QueryChange> expected = write10ChangesToChangeLog().subList(5, 10);
// set the position to some non-0 position
final TopicPartition partition = new TopicPartition(topic, 0);
consumer.assign(Lists.newArrayList(partition));
consumer.seekToEnd(Lists.newArrayList(partition));
final CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> iter = changeLog.readFromPosition(5L);
final List<QueryChange> actual = new ArrayList<>();
while (iter.hasNext()) {
final ChangeLogEntry<QueryChange> entry = iter.next();
actual.add(entry.getEntry());
}
assertEquals(expected, actual);
}
public Map<TopicPartition, OffsetFetchResponse.PartitionData> getCommitedOffsets(final String groupName, final List<TopicPartition> topicPartitions,
final long responseWaitTime) throws OffsetFetchException {
if(this.coordinator == null){
throw new OffsetFetchException("Missing Group Coordinator for group:" + groupName);
}
OffsetFetchRequest.Builder offsetRequestBuilder =
new OffsetFetchRequest.Builder(groupName, topicPartitions);
this.kafkaApiRequest.sendApiRequest(this.coordinator, offsetRequestBuilder);
OffsetFetchResponse offsetFetchResponse =(OffsetFetchResponse) this.kafkaApiRequest.getLastApiResponse(responseWaitTime);
if(offsetFetchResponse.error() == Errors.NONE) {
return offsetFetchResponse.responseData();
}else{
throw new OffsetFetchException(offsetFetchResponse.error().message());
}
}
/**
* createConsumer - create a new consumer
*
* @return
* @throws Exception
*/
private Consumer<String, String> createConsumer() throws Exception {
// Seek to end automatically
TopicPartition dataTopicPartition = new TopicPartition(topicName, 0);
List<TopicPartition> topics = Arrays.asList(dataTopicPartition);
Properties props = ConfUtils.getProps(CONSUMER_PROPS);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.assign(topics);
if (offset == -1) {
consumer.seekToEnd(topics);
logger.info("Consumer seek to end");
} else {
consumer.seek(dataTopicPartition, offset);
logger.info(String.format("read changed as offset: %s", consumer.position(dataTopicPartition)));
}
return consumer;
}
/** Generate a list of Task Configuration Maps from the current list of partitions. */
public List<Map<String, String>> fromPartitionList(
int maxTasks, List<TopicPartition> topicPartitionList) {
// Assign partitions to tasks.
List<List<TopicPartition>> partitionsByTask =
sourceTaskAssignor.assign(
topicPartitionList, Math.min(topicPartitionList.size(), maxTasks));
// Generate configuration for each task.
AtomicInteger taskCounter = new AtomicInteger();
return partitionsByTask
.stream()
.map(TopicPartitionSerDe::toJson)
.map(partitionList -> mapOf(TaskConfigDefinition.PARTITION_LIST, partitionList))
.peek(t -> t.putAll(filteredConfig))
.map(m -> makeClientIdUnique(m, taskCounter.getAndIncrement()))
.collect(Collectors.toList());
}
@Test
public void testPutFlush() {
HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
// We do not call task.start() since it would override the output stream
task.put(Arrays.asList(
new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1)
));
offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L));
task.flush(offsets);
assertEquals("line1\n", os.toString());
task.put(Arrays.asList(
new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line2", 2),
new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line3", 1)
));
offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L));
offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L));
task.flush(offsets);
assertEquals("line1\nline2\nline3\n", os.toString());
}
private int processLogDirsWithinBroker(
Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> topicPartitionReplicaInfoMap, String topic,
Node broker) {
int totalPartitionsInBroker = 0;
for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> topicPartitionReplicaInfoEntry : topicPartitionReplicaInfoMap
.entrySet()) {
TopicPartition topicPartition = topicPartitionReplicaInfoEntry.getKey();
DescribeLogDirsResponse.ReplicaInfo replicaInfo = topicPartitionReplicaInfoEntry.getValue();
if (topicPartition.topic().equals(topic)) {
totalPartitionsInBroker++;
LOGGER.trace("totalPartitions In The Broker = {}", totalPartitionsInBroker);
}
LOGGER.trace("broker information: {}", broker);
LOGGER.trace("logDirInfo for kafka-logs: topicPartition = {}, replicaInfo = {}", topicPartition, replicaInfo);
}
return totalPartitionsInBroker;
}
public void prepareTxnOffsetCommit(long producerId,
Map<TopicPartition, OffsetAndMetadata> offsets) {
if (log.isTraceEnabled()) {
log.trace("TxnOffsetCommit for producer {} and group {} with offsets {} is pending",
producerId, groupId, offsets);
}
receivedTransactionalOffsetCommits = true;
Map<TopicPartition, CommitRecordMetadataAndOffset> producerOffsets =
pendingTransactionalOffsetCommits.computeIfAbsent(producerId, pid -> new HashMap<>());
offsets.forEach((tp, offsetsAndMetadata) -> producerOffsets.put(tp, new CommitRecordMetadataAndOffset(
Optional.empty(),
offsetsAndMetadata
)));
}
/**
* Record the stats, and update the topic partition list based on the stats
*
* @param stats the broker stats
*/
public synchronized void update(BrokerStats stats) {
if (stats == null
|| (latestStats != null && latestStats.getTimestamp() > stats.getTimestamp())
|| stats.getHasFailure()) {
return;
}
brokerName = stats.getName();
latestStats = stats;
if (rackId == null) {
rackId = stats.getRackId() != null ? stats.getRackId() : stats.getAvailabilityZone();
}
if (stats.getLeaderReplicas() != null) {
setLeaderReplicas(stats.getLeaderReplicas()
.stream()
.map(tps -> new TopicPartition(tps.getTopic(), tps.getPartition()))
.collect(Collectors.toSet())
);
}
if (stats.getFollowerReplicas() != null ) {
setFollowerReplicas(stats.getFollowerReplicas()
.stream()
.map(tps -> new TopicPartition(tps.getTopic(), tps.getPartition()))
.collect(Collectors.toSet())
);
}
}
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
commitInProgress = false;
if (ex != null) {
log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
internalCommitCallback.onException(ex);
} else {
internalCommitCallback.onSuccess();
}
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> assignedPartitions) {
// Here we do a force upload/commit so the other consumer take over partition(s) later don't need to rewrite the same messages
LOG.info("re-balance starting, forcing uploading current assigned partitions {}", assignedPartitions);
List<com.pinterest.secor.common.TopicPartition> tps = assignedPartitions.stream().map(p -> new com.pinterest.secor.common.TopicPartition(p.topic(), p.partition())).collect(Collectors.toList());
tps.stream().map(p -> p.getTopic()).collect(Collectors.toSet()).forEach(topic -> StatsUtil.incr("secor.consumer_rebalance_count." + topic));
handler.uploadOnRevoke(tps);
}
private List<ConsumerRecord<byte[], byte[]>> retrieveActionReportMessages() {
DoctorKafkaConfig doctorKafkaConfig = DoctorKafkaMain.doctorKafka.getDoctorKafkaConfig();
String zkUrl = doctorKafkaConfig.getBrokerstatsZkurl();
String actionReportTopic = doctorKafkaConfig.getActionReportTopic();
Properties properties =
OperatorUtil.createKafkaConsumerProperties(zkUrl, OPERATOR_ACTIONS_CONSUMER_GROUP,
doctorKafkaConfig.getActionReportProducerSecurityProtocol(),
doctorKafkaConfig.getActionReportProducerSslConfigs());
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties);
TopicPartition operatorReportTopicPartition = new TopicPartition(actionReportTopic, 0);
List<TopicPartition> tps = new ArrayList<>();
tps.add(operatorReportTopicPartition);
consumer.assign(tps);
Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(tps);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
for (TopicPartition tp : endOffsets.keySet()) {
long numMessages = endOffsets.get(tp) - beginOffsets.get(tp);
LOG.info("{} : offsets [{}, {}], num messages : {}",
tp, beginOffsets.get(tp), endOffsets.get(tp), numMessages);
consumer.seek(tp, Math.max(beginOffsets.get(tp), endOffsets.get(tp) - NUM_MESSAGES));
}
ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_TIMEOUT_MS);
List<ConsumerRecord<byte[], byte[]>> recordList = new ArrayList<>();
while (!records.isEmpty()) {
for (ConsumerRecord<byte[], byte[]> record : records) {
recordList.add(record);
}
records = consumer.poll(CONSUMER_POLL_TIMEOUT_MS);
}
LOG.info("Read {} messages", recordList.size());
return recordList;
}
private Map<TopicPartition, OffsetAndMetadata> getOffsetsMap() {
Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
for (Integer partition : consumerOffsets.keySet()) {
offsetsMap.put(new TopicPartition(_streamConfig.getTopicName(), partition),
new OffsetAndMetadata(consumerOffsets.get(partition)));
}
return offsetsMap;
}
@Override
public void commit(TopicPartition partition, long offset) {
if (!autoCommitEnabled) {
if (LOG.isDebugEnabled()) {
LOG.info(" commiting the offset ==>> " + offset);
}
kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
}
}
static String toJson(List<TopicPartition> topicPartitionList) {
try {
return OBJECT_MAPPER.writeValueAsString(topicPartitionList);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
/**
* @return all topics with their associated consumerCount and consumerGroupCount
*/
private static Map<String, List<Integer>> getTopicConsumerAndGroupCounts(
KafkaConsumerGroupClient consumerGroupClient
) {
List<String> consumerGroups = consumerGroupClient.listGroups();
Map<String, AtomicInteger> topicConsumerCount = new HashMap<>();
Map<String, Set<String>> topicConsumerGroupCount = new HashMap<>();
for (String group : consumerGroups) {
Collection<KafkaConsumerGroupClientImpl.ConsumerSummary> consumerSummaryList =
consumerGroupClient.describeConsumerGroup(group).consumers();
for (KafkaConsumerGroupClientImpl.ConsumerSummary consumerSummary : consumerSummaryList) {
for (TopicPartition topicPartition : consumerSummary.partitions()) {
topicConsumerCount
.computeIfAbsent(topicPartition.topic(), k -> new AtomicInteger())
.incrementAndGet();
topicConsumerGroupCount
.computeIfAbsent(topicPartition.topic(), k -> new HashSet<>()).add(group);
}
}
}
HashMap<String, List<Integer>> results = new HashMap<>();
topicConsumerCount.forEach(
(k, v) -> {
results.computeIfAbsent(k, v1 -> new ArrayList<>()).add(v.intValue());
results.get(k).add(topicConsumerGroupCount.get(k).size());
}
);
return results;
}
@Test
public void notInSync() throws Exception {
TopicPartitionInfo tpi = new TopicPartitionInfo(0, node0, singletonList(node0), singletonList(node1));
KafkaFuture<Map<String, TopicDescription>> kafkaFuture = topicDescriptionFuture(tpi);
doReturn(describeTopicsResult).when(adminClient).describeTopics(topics);
doReturn(kafkaFuture).when(describeTopicsResult).all();
Map<TopicPartition, LeaderInSync> result = underTest.apply(0, topics);
assertThat(result.size(), is(1));
LeaderInSync leaderInSync = result.get(new TopicPartition(topic, 0));
assertThat(leaderInSync.isLeader(), is(true));
assertThat(leaderInSync.isInSync(), is(false));
}
@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
Objects.requireNonNull(timeout, "timeout");
if (timeout.toMillis() < 0)
throw new IllegalArgumentException("timeout must not be negative");
Map<String, Collection<TopicPartition>> map = toMap(partitions);
return runWithoutConcurrency(() -> {
Map<TopicPartition, Long> result = new HashMap<>();
forEach(map::containsKey, (t, c) -> result.putAll(c.getConsumer().beginningOffsets(map.get(t), timeout)));
return Collections.unmodifiableMap(result);
});
}
private void checkExpectedToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, Long expectedToCommit) {
if (expectedToCommit != null) {
assertThat(offsetsToCommit).containsOnly(
entry(TOPIC_PARTITION_0, new OffsetAndMetadata(expectedToCommit + 1))
);
} else {
assertThat(offsetsToCommit).isEmpty();
}
}
public static void consumeFromTime(String topic) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
boolean flag = true;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (flag) {
Set<TopicPartition> assignments = consumer.assignment();
Map<TopicPartition, Long> query = new HashMap<>();
for (TopicPartition topicPartition : assignments) {
query.put(
topicPartition,
Instant.now().minus(5, MINUTES).toEpochMilli());
}
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
result.entrySet()
.stream()
.forEach(entry ->
consumer.seek(
entry.getKey(),
Optional.ofNullable(entry.getValue())
.map(OffsetAndTimestamp::offset)
.orElse(new Long(0))));
flag = false;
}
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
if (!records.isEmpty()) break;
}
}