下面列出了怎么用org.apache.kafka.common.requests.AbstractResponse的API类实例代码及写法,或者点击链接到github查看源代码。
protected void handleFetchRequest(KafkaHeaderAndRequest fetch,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(fetch.getRequest() instanceof FetchRequest);
FetchRequest request = (FetchRequest) fetch.getRequest();
if (log.isDebugEnabled()) {
log.debug("[{}] Request {} Fetch request. Size: {}. Each item: ",
ctx.channel(), fetch.getHeader(), request.fetchData().size());
request.fetchData().forEach((topic, data) -> {
log.debug(" Fetch request topic:{} data:{}.",
topic, data.toString());
});
}
MessageFetchContext fetchContext = MessageFetchContext.get(this, fetch);
fetchContext.handleFetch(resultFuture);
}
protected void handleSyncGroupRequest(KafkaHeaderAndRequest syncGroup,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(syncGroup.getRequest() instanceof SyncGroupRequest);
SyncGroupRequest request = (SyncGroupRequest) syncGroup.getRequest();
groupCoordinator.handleSyncGroup(
request.groupId(),
request.generationId(),
request.memberId(),
CoreUtils.mapValue(
request.groupAssignment(), Utils::toArray
)
).thenAccept(syncGroupResult -> {
SyncGroupResponse response = new SyncGroupResponse(
syncGroupResult.getKey(),
ByteBuffer.wrap(syncGroupResult.getValue())
);
resultFuture.complete(response);
});
}
protected void handleHeartbeatRequest(KafkaHeaderAndRequest heartbeat,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(heartbeat.getRequest() instanceof HeartbeatRequest);
HeartbeatRequest request = (HeartbeatRequest) heartbeat.getRequest();
// let the coordinator to handle heartbeat
groupCoordinator.handleHeartbeat(
request.groupId(),
request.memberId(),
request.groupGenerationId()
).thenAccept(errors -> {
HeartbeatResponse response = new HeartbeatResponse(errors);
if (log.isTraceEnabled()) {
log.trace("Sending heartbeat response {} for correlation id {} to client {}.",
response, heartbeat.getHeader().correlationId(), heartbeat.getHeader().clientId());
}
resultFuture.complete(response);
});
}
@Override
protected void handleLeaveGroupRequest(KafkaHeaderAndRequest leaveGroup,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(leaveGroup.getRequest() instanceof LeaveGroupRequest);
LeaveGroupRequest request = (LeaveGroupRequest) leaveGroup.getRequest();
// let the coordinator to handle heartbeat
groupCoordinator.handleLeaveGroup(
request.groupId(),
request.memberId()
).thenAccept(errors -> {
LeaveGroupResponse response = new LeaveGroupResponse(errors);
resultFuture.complete(response);
});
}
protected void writeAndFlushWhenInactiveChannel(Channel channel) {
// loop from first responseFuture, and return them all
while (requestsQueue != null && requestsQueue.peek() != null) {
try {
ResponseAndRequest pair = requestsQueue.remove();
if (log.isDebugEnabled()) {
log.debug("Channel Closing! Write kafka cmd responseFuture back to client. request: {}",
pair.getRequest().getHeader());
}
AbstractRequest request = pair.getRequest().getRequest();
AbstractResponse apiResponse = request
.getErrorResponse(new LeaderNotAvailableException("Channel is closing!"));
pair.getResponseFuture().complete(apiResponse);
ByteBuf result = responseToByteBuf(pair.getResponseFuture().get(), pair.getRequest());
channel.writeAndFlush(result);
} catch (Exception e) {
// should not comes here.
log.error("error to get Response ByteBuf:", e);
}
}
}
@Test(timeOut = 20000, enabled = false)
// https://github.com/streamnative/kop/issues/51
public void testOffsetCommitWithInvalidPartition() throws Exception {
String topicName = "kopOffsetCommitWithInvalidPartition";
CompletableFuture<AbstractResponse> invalidResponse1 = new CompletableFuture<>();
// invalid partition id -1;
checkInvalidPartition(invalidResponse1, topicName, -1);
AbstractResponse response1 = invalidResponse1.get();
TopicPartition topicPartition1 = new TopicPartition(topicName, -1);
assertEquals(((OffsetCommitResponse) response1).responseData().get(topicPartition1),
Errors.UNKNOWN_TOPIC_OR_PARTITION);
// invalid partition id 1.
CompletableFuture<AbstractResponse> invalidResponse2 = new CompletableFuture<>();
checkInvalidPartition(invalidResponse2, topicName, 1);
TopicPartition topicPartition2 = new TopicPartition(topicName, 1);
AbstractResponse response2 = invalidResponse2.get();
assertEquals(((OffsetCommitResponse) response2).responseData().get(topicPartition2),
Errors.UNKNOWN_TOPIC_OR_PARTITION);
}
@Test(timeOut = 20000, enabled = false)
// https://github.com/streamnative/kop/issues/51
public void testGetOffsetsForUnknownTopic() throws Exception {
String topicName = "kopTestGetOffsetsForUnknownTopic";
TopicPartition tp = new TopicPartition(topicName, 0);
Map<TopicPartition, Long> targetTimes = Maps.newHashMap();
targetTimes.put(tp, ListOffsetRequest.LATEST_TIMESTAMP);
ListOffsetRequest.Builder builder = ListOffsetRequest.Builder
.forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
.setTargetTimes(targetTimes);
KafkaHeaderAndRequest request = buildRequest(builder);
CompletableFuture<AbstractResponse> responseFuture = new CompletableFuture<>();
kafkaRequestHandler
.handleListOffsetRequest(request, responseFuture);
AbstractResponse response = responseFuture.get();
ListOffsetResponse listOffsetResponse = (ListOffsetResponse) response;
assertEquals(listOffsetResponse.responseData().get(tp).error,
Errors.UNKNOWN_TOPIC_OR_PARTITION);
}
/**
* Generate a {@link MetadataResponseData} with the given information -- e.g. for creating bootstrap and test response.
*
* @param brokers Brokers in the cluster.
* @param clusterId Cluster Id.
* @param controllerId Controller Id.
* @param topicMetadataList Metadata list for the topics in the cluster.
* @return A {@link MetadataResponseData} with the given information.
*/
public static MetadataResponse prepareMetadataResponse(List<Node> brokers,
String clusterId,
int controllerId,
List<MetadataResponse.TopicMetadata> topicMetadataList) {
MetadataResponseData responseData = new MetadataResponseData();
responseData.setThrottleTimeMs(AbstractResponse.DEFAULT_THROTTLE_TIME);
brokers.forEach(broker -> responseData.brokers().add(
new MetadataResponseData.MetadataResponseBroker().setNodeId(broker.id())
.setHost(broker.host())
.setPort(broker.port())
.setRack(broker.rack())));
responseData.setClusterId(clusterId);
responseData.setControllerId(controllerId);
responseData.setClusterAuthorizedOperations(MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
topicMetadataList.forEach(topicMetadata -> responseData.topics().add(prepareMetadataResponseTopic(topicMetadata)));
return new MetadataResponse(responseData);
}
protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> resultFuture) {
String err = String.format("Kafka API (%s) Not supported by kop server.",
kafkaHeaderAndRequest.getHeader().apiKey());
log.error(err);
AbstractResponse apiResponse = kafkaHeaderAndRequest.getRequest()
.getErrorResponse(new UnsupportedOperationException(err));
resultFuture.complete(apiResponse);
}
protected void handleInactive(KafkaHeaderAndRequest kafkaHeaderAndRequest,
CompletableFuture<AbstractResponse> resultFuture) {
AbstractRequest request = kafkaHeaderAndRequest.getRequest();
AbstractResponse apiResponse = request.getErrorResponse(new LeaderNotAvailableException("Channel is closing!"));
log.error("Kafka API {} is send to a closing channel", kafkaHeaderAndRequest.getHeader().apiKey());
resultFuture.complete(apiResponse);
}
protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(offsetFetch.getRequest() instanceof OffsetFetchRequest);
OffsetFetchRequest request = (OffsetFetchRequest) offsetFetch.getRequest();
checkState(groupCoordinator != null,
"Group Coordinator not started");
KeyValue<Errors, Map<TopicPartition, OffsetFetchResponse.PartitionData>> keyValue =
groupCoordinator.handleFetchOffsets(
request.groupId(),
Optional.of(request.partitions())
);
resultFuture.complete(new OffsetFetchResponse(keyValue.getKey(), keyValue.getValue()));
}
private void handleListOffsetRequestV1AndAbove(KafkaHeaderAndRequest listOffset,
CompletableFuture<AbstractResponse> resultFuture) {
ListOffsetRequest request = (ListOffsetRequest) listOffset.getRequest();
Map<TopicPartition, CompletableFuture<ListOffsetResponse.PartitionData>> responseData = Maps.newHashMap();
request.partitionTimestamps().entrySet().stream().forEach(tms -> {
TopicPartition topic = tms.getKey();
TopicName pulsarTopic = pulsarTopicName(topic, namespace);
Long times = tms.getValue();
CompletableFuture<ListOffsetResponse.PartitionData> partitionData;
CompletableFuture<PersistentTopic> persistentTopic = topicManager.getTopic(pulsarTopic.toString());
partitionData = fetchOffsetForTimestamp(persistentTopic, times, false);
responseData.put(topic, partitionData);
});
CompletableFuture
.allOf(responseData.values().stream().toArray(CompletableFuture<?>[]::new))
.whenComplete((ignore, ex) -> {
ListOffsetResponse response =
new ListOffsetResponse(CoreUtils.mapValue(responseData, future -> future.join()));
resultFuture.complete(response);
});
}
protected void handleListOffsetRequest(KafkaHeaderAndRequest listOffset,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(listOffset.getRequest() instanceof ListOffsetRequest);
// the only difference between v0 and v1 is the `max_num_offsets => INT32`
// v0 is required because it is used by librdkafka
if (listOffset.getHeader().apiVersion() == 0) {
handleListOffsetRequestV0(listOffset, resultFuture);
} else {
handleListOffsetRequestV1AndAbove(listOffset, resultFuture);
}
}
protected void handleOffsetCommitRequest(KafkaHeaderAndRequest offsetCommit,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(offsetCommit.getRequest() instanceof OffsetCommitRequest);
checkState(groupCoordinator != null,
"Group Coordinator not started");
OffsetCommitRequest request = (OffsetCommitRequest) offsetCommit.getRequest();
Map<TopicPartition, Errors> nonExistingTopic = nonExistingTopicErrors(request);
groupCoordinator.handleCommitOffsets(
request.groupId(),
request.memberId(),
request.generationId(),
CoreUtils.mapValue(
request.offsetData().entrySet().stream()
.filter(entry -> !nonExistingTopic.containsKey(entry.getKey()))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())),
(partitionData) ->
OffsetAndMetadata.apply(partitionData.offset, partitionData.metadata, partitionData.timestamp)
)
).thenAccept(offsetCommitResult -> {
if (nonExistingTopic != null) {
offsetCommitResult.putAll(nonExistingTopic);
}
OffsetCommitResponse response = new OffsetCommitResponse(offsetCommitResult);
resultFuture.complete(response);
});
}
@Override
protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(listGroups.getRequest() instanceof ListGroupsRequest);
KeyValue<Errors, List<GroupOverview>> listResult = groupCoordinator.handleListGroups();
ListGroupsResponse response = new ListGroupsResponse(
listResult.getKey(),
listResult.getValue().stream()
.map(groupOverview -> new Group(groupOverview.groupId(), groupOverview.protocolType()))
.collect(Collectors.toList())
);
resultFuture.complete(response);
}
@Override
protected void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(deleteGroups.getRequest() instanceof DeleteGroupsRequest);
DeleteGroupsRequest request = (DeleteGroupsRequest) deleteGroups.getRequest();
Map<String, Errors> deleteResult = groupCoordinator.handleDeleteGroups(request.groups());
DeleteGroupsResponse response = new DeleteGroupsResponse(
deleteResult
);
resultFuture.complete(response);
}
@Override
protected void handleSaslHandshake(KafkaHeaderAndRequest saslHandshake,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(saslHandshake.getRequest() instanceof SaslHandshakeRequest);
SaslHandshakeRequest request = (SaslHandshakeRequest) saslHandshake.getRequest();
SaslHandshakeResponse response = checkSaslMechanism(request.mechanism());
resultFuture.complete(response);
}
private void readMessages(KafkaHeaderAndRequest fetch,
Map<TopicPartition, Pair<ManagedCursor, Long>> cursors,
CompletableFuture<AbstractResponse> resultFuture,
LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData) {
AtomicInteger bytesRead = new AtomicInteger(0);
Map<TopicPartition, List<Entry>> entryValues = new ConcurrentHashMap<>();
readMessagesInternal(fetch, cursors, bytesRead, entryValues, resultFuture, responseData);
}
protected ByteBuf responseToByteBuf(AbstractResponse response, KafkaHeaderAndRequest request) {
try (KafkaHeaderAndResponse kafkaHeaderAndResponse =
KafkaHeaderAndResponse.responseForRequest(request, response)) {
return ResponseUtils.serializeResponse(
kafkaHeaderAndResponse.getApiVersion(),
kafkaHeaderAndResponse.getHeader(),
kafkaHeaderAndResponse.getResponse()
);
} finally {
// the request is not needed any more.
request.close();
}
}
private KafkaHeaderAndResponse(short apiVersion,
ResponseHeader header,
AbstractResponse response) {
this.apiVersion = apiVersion;
this.header = header;
this.response = response;
}
void checkInvalidPartition(CompletableFuture<AbstractResponse> future,
String topic,
int invalidPartitionId) {
TopicPartition invalidTopicPartition = new TopicPartition(topic, invalidPartitionId);
PartitionData partitionOffsetCommitData = new PartitionData(15L, "");
Map<TopicPartition, PartitionData> offsetData = Maps.newHashMap();
offsetData.put(invalidTopicPartition, partitionOffsetCommitData);
KafkaHeaderAndRequest request = buildRequest(new OffsetCommitRequest.Builder("groupId", offsetData));
kafkaRequestHandler.handleOffsetCommitRequest(request, future);
}
@Test(timeOut = 20000)
public void testBrokerHandleTopicMetadataRequest() throws Exception {
String topicName = "kopBrokerHandleTopicMetadataRequest";
int numberTopics = 5;
int numberPartitions = 6;
List<TopicPartition> topicPartitions = createTopics(topicName, numberTopics, numberPartitions);
List<String> kafkaTopics = getCreatedTopics(topicName, numberTopics);
KafkaHeaderAndRequest metadataRequest = createTopicMetadataRequest(kafkaTopics);
CompletableFuture<AbstractResponse> responseFuture = new CompletableFuture<>();
kafkaRequestHandler.handleTopicMetadataRequest(metadataRequest, responseFuture);
MetadataResponse metadataResponse = (MetadataResponse) responseFuture.get();
// verify all served by same broker : localhost:port
assertEquals(metadataResponse.brokers().size(), 1);
assertEquals(metadataResponse.brokers().iterator().next().host(), "localhost");
// check metadata response
Collection<TopicMetadata> topicMetadatas = metadataResponse.topicMetadata();
log.debug("a. dumpTopicMetadata: ");
topicMetadatas.forEach(topicMetadata -> {
log.debug(" topicMetadata: {}", topicMetadata);
log.debug("b. dumpPartitionMetadata: ");
topicMetadata.partitionMetadata().forEach(partition -> {
log.debug(" PartitionMetadata: {}", partition);
});
});
assertEquals(topicMetadatas.size(), numberTopics);
topicMetadatas.forEach(topicMetadata -> {
assertTrue(topicMetadata.topic().startsWith(topicName + "_"));
assertEquals(topicMetadata.partitionMetadata().size(), numberPartitions);
});
}
public AbstractResponse getLastApiResponse(final long waitTimeMsBetweenCheckingResponse){
while(!this.clientResponse.isDone()){
try {
Thread.sleep(waitTimeMsBetweenCheckingResponse);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return this.clientResponse.value().responseBody();
}
protected void handleApiVersionsRequest(KafkaHeaderAndRequest apiVersionRequest,
CompletableFuture<AbstractResponse> resultFuture) {
AbstractResponse apiResponse = overloadDefaultApiVersionsResponse();
resultFuture.complete(apiResponse);
}
private void handleListOffsetRequestV0(KafkaHeaderAndRequest listOffset,
CompletableFuture<AbstractResponse> resultFuture) {
ListOffsetRequest request = (ListOffsetRequest) listOffset.getRequest();
Map<TopicPartition, CompletableFuture<ListOffsetResponse.PartitionData>> responseData = Maps.newHashMap();
// in v0, the iterator is offsetData,
// in v1, the iterator is partitionTimestamps,
log.warn("received a v0 listOffset: {}", request.toString(true));
request.offsetData().entrySet().stream().forEach(tms -> {
TopicPartition topic = tms.getKey();
TopicName pulsarTopic = pulsarTopicName(topic, namespace);
Long times = tms.getValue().timestamp;
CompletableFuture<ListOffsetResponse.PartitionData> partitionData;
// num_num_offsets > 1 is not handled for now, returning an error
if (tms.getValue().maxNumOffsets > 1) {
log.warn("request is asking for multiples offsets for {}, not supported for now",
pulsarTopic.toString());
partitionData = new CompletableFuture<>();
partitionData.complete(new ListOffsetResponse
.PartitionData(
Errors.UNKNOWN_SERVER_ERROR,
Collections.singletonList(ListOffsetResponse.UNKNOWN_OFFSET)));
}
CompletableFuture<PersistentTopic> persistentTopic = topicManager.getTopic(pulsarTopic.toString());
partitionData = fetchOffsetForTimestamp(persistentTopic, times, true);
responseData.put(topic, partitionData);
});
CompletableFuture
.allOf(responseData.values().stream().toArray(CompletableFuture<?>[]::new))
.whenComplete((ignore, ex) -> {
ListOffsetResponse response =
new ListOffsetResponse(CoreUtils.mapValue(responseData, future -> future.join()));
resultFuture.complete(response);
});
}
protected void handleJoinGroupRequest(KafkaHeaderAndRequest joinGroup,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(joinGroup.getRequest() instanceof JoinGroupRequest);
checkState(groupCoordinator != null,
"Group Coordinator not started");
JoinGroupRequest request = (JoinGroupRequest) joinGroup.getRequest();
Map<String, byte[]> protocols = new HashMap<>();
request.groupProtocols()
.stream()
.forEach(protocol -> protocols.put(protocol.name(), Utils.toArray(protocol.metadata())));
groupCoordinator.handleJoinGroup(
request.groupId(),
request.memberId(),
joinGroup.getHeader().clientId(),
joinGroup.getClientHost(),
request.rebalanceTimeout(),
request.sessionTimeout(),
request.protocolType(),
protocols
).thenAccept(joinGroupResult -> {
Map<String, ByteBuffer> members = new HashMap<>();
joinGroupResult.getMembers().forEach((memberId, protocol) ->
members.put(memberId, ByteBuffer.wrap(protocol)));
JoinGroupResponse response = new JoinGroupResponse(
joinGroupResult.getError(),
joinGroupResult.getGenerationId(),
joinGroupResult.getSubProtocol(),
joinGroupResult.getMemberId(),
joinGroupResult.getLeaderId(),
members
);
if (log.isTraceEnabled()) {
log.trace("Sending join group response {} for correlation id {} to client {}.",
response, joinGroup.getHeader().correlationId(), joinGroup.getHeader().clientId());
}
resultFuture.complete(response);
});
}
@Override
protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(describeGroup.getRequest() instanceof DescribeGroupsRequest);
DescribeGroupsRequest request = (DescribeGroupsRequest) describeGroup.getRequest();
// let the coordinator to handle heartbeat
Map<String, GroupMetadata> groups = request.groupIds().stream()
.map(groupId -> {
KeyValue<Errors, GroupSummary> describeResult = groupCoordinator
.handleDescribeGroup(groupId);
GroupSummary summary = describeResult.getValue();
List<GroupMember> members = summary.members().stream()
.map(member -> {
ByteBuffer metadata = ByteBuffer.wrap(member.metadata());
ByteBuffer assignment = ByteBuffer.wrap(member.assignment());
return new GroupMember(
member.memberId(),
member.clientId(),
member.clientHost(),
metadata,
assignment
);
})
.collect(Collectors.toList());
return new KeyValue<>(
groupId,
new GroupMetadata(
describeResult.getKey(),
summary.state(),
summary.protocolType(),
summary.protocol(),
members
)
);
})
.collect(Collectors.toMap(
kv -> kv.getKey(),
kv -> kv.getValue()
));
DescribeGroupsResponse response = new DescribeGroupsResponse(
groups
);
resultFuture.complete(response);
}
static AbstractResponse failedResponse(KafkaHeaderAndRequest requestHar, Throwable e) {
if (log.isDebugEnabled()) {
log.debug("Request {} get failed response ", requestHar.getHeader().apiKey(), e);
}
return requestHar.getRequest().getErrorResponse(((Integer) THROTTLE_TIME_MS.defaultValue), e);
}
public CompletableFuture<AbstractResponse> handleFetch(CompletableFuture<AbstractResponse> fetchResponse) {
LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
// Map of partition and related tcm.
Map<TopicPartition, CompletableFuture<KafkaTopicConsumerManager>> topicsAndCursor =
((FetchRequest) fetchRequest.getRequest())
.fetchData().entrySet().stream()
.map(entry -> {
TopicName topicName = pulsarTopicName(entry.getKey(), requestHandler.getNamespace());
CompletableFuture<KafkaTopicConsumerManager> consumerManager =
requestHandler.getTopicManager().getTopicConsumerManager(topicName.toString());
return Pair.of(
entry.getKey(),
consumerManager);
})
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
// wait to get all the cursor, then readMessages
CompletableFuture
.allOf(topicsAndCursor.entrySet().stream().map(Map.Entry::getValue).toArray(CompletableFuture<?>[]::new))
.whenComplete((ignore, ex) -> {
Map<TopicPartition, Pair<ManagedCursor, Long>> partitionCursor =
topicsAndCursor.entrySet().stream()
.map(pair -> {
KafkaTopicConsumerManager tcm;
try {
// all future completed now.
tcm = pair.getValue().get();
if (tcm == null) {
throw new NullPointerException("topic not owned, and return null TCM in fetch.");
}
} catch (Exception e) {
log.warn("Error for get KafkaTopicConsumerManager.", e);
responseData.put(pair.getKey(),
new FetchResponse.PartitionData(
Errors.NOT_LEADER_FOR_PARTITION,
FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET,
null,
MemoryRecords.EMPTY));
// result got. this will be filtered in following filter method.
return null;
}
long offset = ((FetchRequest) fetchRequest.getRequest()).fetchData()
.get(pair.getKey()).fetchOffset;
if (log.isDebugEnabled()) {
log.debug("Fetch for {}: remove tcm to get cursor for fetch offset: {} - {}.",
pair.getKey(), offset, MessageIdUtils.getPosition(offset));
}
Pair<ManagedCursor, Long> cursorLongPair = tcm.remove(offset);
if (cursorLongPair == null) {
log.warn("KafkaTopicConsumerManager.remove({}) return null for topic {}. "
+ "Fetch for topic return error.",
offset, pair.getKey());
responseData.put(pair.getKey(),
new FetchResponse.PartitionData(
Errors.NOT_LEADER_FOR_PARTITION,
FetchResponse.INVALID_HIGHWATERMARK,
FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET,
null,
MemoryRecords.EMPTY));
// result got. this will be filtered in following filter method.
return null;
}
return Pair.of(pair.getKey(), cursorLongPair);
})
.filter(x -> x != null)
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
readMessages(fetchRequest, partitionCursor, fetchResponse, responseData);
});
return fetchResponse;
}
protected abstract void
handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);