类org.apache.kafka.common.requests.AbstractResponse源码实例Demo

下面列出了怎么用org.apache.kafka.common.requests.AbstractResponse的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: kop   文件: KafkaRequestHandler.java
protected void handleFetchRequest(KafkaHeaderAndRequest fetch,
                                  CompletableFuture<AbstractResponse> resultFuture) {
    checkArgument(fetch.getRequest() instanceof FetchRequest);
    FetchRequest request = (FetchRequest) fetch.getRequest();

    if (log.isDebugEnabled()) {
        log.debug("[{}] Request {} Fetch request. Size: {}. Each item: ",
            ctx.channel(), fetch.getHeader(), request.fetchData().size());

        request.fetchData().forEach((topic, data) -> {
            log.debug("  Fetch request topic:{} data:{}.",
                topic, data.toString());
        });
    }

    MessageFetchContext fetchContext = MessageFetchContext.get(this, fetch);
    fetchContext.handleFetch(resultFuture);
}
 
源代码2 项目: kop   文件: KafkaRequestHandler.java
protected void handleSyncGroupRequest(KafkaHeaderAndRequest syncGroup,
                                      CompletableFuture<AbstractResponse> resultFuture) {
    checkArgument(syncGroup.getRequest() instanceof SyncGroupRequest);
    SyncGroupRequest request = (SyncGroupRequest) syncGroup.getRequest();

    groupCoordinator.handleSyncGroup(
        request.groupId(),
        request.generationId(),
        request.memberId(),
        CoreUtils.mapValue(
            request.groupAssignment(), Utils::toArray
        )
    ).thenAccept(syncGroupResult -> {
        SyncGroupResponse response = new SyncGroupResponse(
            syncGroupResult.getKey(),
            ByteBuffer.wrap(syncGroupResult.getValue())
        );

        resultFuture.complete(response);
    });
}
 
源代码3 项目: kop   文件: KafkaRequestHandler.java
protected void handleHeartbeatRequest(KafkaHeaderAndRequest heartbeat,
                                      CompletableFuture<AbstractResponse> resultFuture) {
    checkArgument(heartbeat.getRequest() instanceof HeartbeatRequest);
    HeartbeatRequest request = (HeartbeatRequest) heartbeat.getRequest();

    // let the coordinator to handle heartbeat
    groupCoordinator.handleHeartbeat(
        request.groupId(),
        request.memberId(),
        request.groupGenerationId()
    ).thenAccept(errors -> {
        HeartbeatResponse response = new HeartbeatResponse(errors);

        if (log.isTraceEnabled()) {
            log.trace("Sending heartbeat response {} for correlation id {} to client {}.",
                response, heartbeat.getHeader().correlationId(), heartbeat.getHeader().clientId());
        }

        resultFuture.complete(response);
    });
}
 
源代码4 项目: kop   文件: KafkaRequestHandler.java
@Override
protected void handleLeaveGroupRequest(KafkaHeaderAndRequest leaveGroup,
                                       CompletableFuture<AbstractResponse> resultFuture) {
    checkArgument(leaveGroup.getRequest() instanceof LeaveGroupRequest);
    LeaveGroupRequest request = (LeaveGroupRequest) leaveGroup.getRequest();

    // let the coordinator to handle heartbeat
    groupCoordinator.handleLeaveGroup(
        request.groupId(),
        request.memberId()
    ).thenAccept(errors -> {
        LeaveGroupResponse response = new LeaveGroupResponse(errors);

        resultFuture.complete(response);
    });
}
 
源代码5 项目: kop   文件: KafkaCommandDecoder.java
protected void writeAndFlushWhenInactiveChannel(Channel channel) {
    // loop from first responseFuture, and return them all
    while (requestsQueue != null && requestsQueue.peek() != null) {
        try {
            ResponseAndRequest pair = requestsQueue.remove();

            if (log.isDebugEnabled()) {
                log.debug("Channel Closing! Write kafka cmd responseFuture back to client. request: {}",
                    pair.getRequest().getHeader());
            }
            AbstractRequest request = pair.getRequest().getRequest();
            AbstractResponse apiResponse = request
                .getErrorResponse(new LeaderNotAvailableException("Channel is closing!"));
            pair.getResponseFuture().complete(apiResponse);

            ByteBuf result = responseToByteBuf(pair.getResponseFuture().get(), pair.getRequest());
            channel.writeAndFlush(result);
        } catch (Exception e) {
            // should not comes here.
            log.error("error to get Response ByteBuf:", e);
        }
    }
}
 
源代码6 项目: kop   文件: KafkaApisTest.java
@Test(timeOut = 20000, enabled = false)
// https://github.com/streamnative/kop/issues/51
public void testOffsetCommitWithInvalidPartition() throws Exception {
    String topicName = "kopOffsetCommitWithInvalidPartition";

    CompletableFuture<AbstractResponse> invalidResponse1 = new CompletableFuture<>();
    // invalid partition id -1;
    checkInvalidPartition(invalidResponse1, topicName, -1);
    AbstractResponse response1 = invalidResponse1.get();
    TopicPartition topicPartition1 = new TopicPartition(topicName, -1);
    assertEquals(((OffsetCommitResponse) response1).responseData().get(topicPartition1),
        Errors.UNKNOWN_TOPIC_OR_PARTITION);

    // invalid partition id 1.
    CompletableFuture<AbstractResponse> invalidResponse2 = new CompletableFuture<>();
    checkInvalidPartition(invalidResponse2, topicName, 1);
    TopicPartition topicPartition2 = new TopicPartition(topicName, 1);
    AbstractResponse response2 = invalidResponse2.get();
    assertEquals(((OffsetCommitResponse) response2).responseData().get(topicPartition2),
        Errors.UNKNOWN_TOPIC_OR_PARTITION);
}
 
源代码7 项目: kop   文件: KafkaApisTest.java
@Test(timeOut = 20000, enabled = false)
// https://github.com/streamnative/kop/issues/51
public void testGetOffsetsForUnknownTopic() throws Exception {
    String topicName = "kopTestGetOffsetsForUnknownTopic";

    TopicPartition tp = new TopicPartition(topicName, 0);
    Map<TopicPartition, Long> targetTimes = Maps.newHashMap();
    targetTimes.put(tp, ListOffsetRequest.LATEST_TIMESTAMP);

    ListOffsetRequest.Builder builder = ListOffsetRequest.Builder
        .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
        .setTargetTimes(targetTimes);

    KafkaHeaderAndRequest request = buildRequest(builder);
    CompletableFuture<AbstractResponse> responseFuture = new CompletableFuture<>();
    kafkaRequestHandler
        .handleListOffsetRequest(request, responseFuture);

    AbstractResponse response = responseFuture.get();
    ListOffsetResponse listOffsetResponse = (ListOffsetResponse) response;
    assertEquals(listOffsetResponse.responseData().get(tp).error,
        Errors.UNKNOWN_TOPIC_OR_PARTITION);
}
 
源代码8 项目: cruise-control   文件: KafkaCruiseControlUtils.java
/**
 * Generate a {@link MetadataResponseData} with the given information -- e.g. for creating bootstrap and test response.
 *
 * @param brokers Brokers in the cluster.
 * @param clusterId Cluster Id.
 * @param controllerId Controller Id.
 * @param topicMetadataList Metadata list for the topics in the cluster.
 * @return A {@link MetadataResponseData} with the given information.
 */
public static MetadataResponse prepareMetadataResponse(List<Node> brokers,
                                                       String clusterId,
                                                       int controllerId,
                                                       List<MetadataResponse.TopicMetadata> topicMetadataList) {
  MetadataResponseData responseData = new MetadataResponseData();
  responseData.setThrottleTimeMs(AbstractResponse.DEFAULT_THROTTLE_TIME);
  brokers.forEach(broker -> responseData.brokers().add(
      new MetadataResponseData.MetadataResponseBroker().setNodeId(broker.id())
                                                       .setHost(broker.host())
                                                       .setPort(broker.port())
                                                       .setRack(broker.rack())));

  responseData.setClusterId(clusterId);
  responseData.setControllerId(controllerId);
  responseData.setClusterAuthorizedOperations(MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
  topicMetadataList.forEach(topicMetadata -> responseData.topics().add(prepareMetadataResponseTopic(topicMetadata)));

  return new MetadataResponse(responseData);
}
 
源代码9 项目: kop   文件: KafkaRequestHandler.java
protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
                           CompletableFuture<AbstractResponse> resultFuture) {
    String err = String.format("Kafka API (%s) Not supported by kop server.",
        kafkaHeaderAndRequest.getHeader().apiKey());
    log.error(err);

    AbstractResponse apiResponse = kafkaHeaderAndRequest.getRequest()
        .getErrorResponse(new UnsupportedOperationException(err));
    resultFuture.complete(apiResponse);
}
 
源代码10 项目: kop   文件: KafkaRequestHandler.java
protected void handleInactive(KafkaHeaderAndRequest kafkaHeaderAndRequest,
                              CompletableFuture<AbstractResponse> resultFuture) {
    AbstractRequest request = kafkaHeaderAndRequest.getRequest();
    AbstractResponse apiResponse = request.getErrorResponse(new LeaderNotAvailableException("Channel is closing!"));

    log.error("Kafka API {} is send to a closing channel", kafkaHeaderAndRequest.getHeader().apiKey());

    resultFuture.complete(apiResponse);
}
 
源代码11 项目: kop   文件: KafkaRequestHandler.java
protected void handleOffsetFetchRequest(KafkaHeaderAndRequest offsetFetch,
                                        CompletableFuture<AbstractResponse> resultFuture) {
    checkArgument(offsetFetch.getRequest() instanceof OffsetFetchRequest);
    OffsetFetchRequest request = (OffsetFetchRequest) offsetFetch.getRequest();
    checkState(groupCoordinator != null,
        "Group Coordinator not started");

    KeyValue<Errors, Map<TopicPartition, OffsetFetchResponse.PartitionData>> keyValue =
        groupCoordinator.handleFetchOffsets(
            request.groupId(),
            Optional.of(request.partitions())
        );

    resultFuture.complete(new OffsetFetchResponse(keyValue.getKey(), keyValue.getValue()));
}
 
源代码12 项目: kop   文件: KafkaRequestHandler.java
private void handleListOffsetRequestV1AndAbove(KafkaHeaderAndRequest listOffset,
                                               CompletableFuture<AbstractResponse> resultFuture) {
    ListOffsetRequest request = (ListOffsetRequest) listOffset.getRequest();

    Map<TopicPartition, CompletableFuture<ListOffsetResponse.PartitionData>> responseData = Maps.newHashMap();

    request.partitionTimestamps().entrySet().stream().forEach(tms -> {
        TopicPartition topic = tms.getKey();
        TopicName pulsarTopic = pulsarTopicName(topic, namespace);
        Long times = tms.getValue();
        CompletableFuture<ListOffsetResponse.PartitionData> partitionData;

        CompletableFuture<PersistentTopic> persistentTopic = topicManager.getTopic(pulsarTopic.toString());
        partitionData = fetchOffsetForTimestamp(persistentTopic, times, false);

        responseData.put(topic, partitionData);
    });

    CompletableFuture
            .allOf(responseData.values().stream().toArray(CompletableFuture<?>[]::new))
            .whenComplete((ignore, ex) -> {
                ListOffsetResponse response =
                        new ListOffsetResponse(CoreUtils.mapValue(responseData, future -> future.join()));

                resultFuture.complete(response);
            });
}
 
源代码13 项目: kop   文件: KafkaRequestHandler.java
protected void handleListOffsetRequest(KafkaHeaderAndRequest listOffset,
                                       CompletableFuture<AbstractResponse> resultFuture) {
    checkArgument(listOffset.getRequest() instanceof ListOffsetRequest);
    // the only difference between v0 and v1 is the `max_num_offsets => INT32`
    // v0 is required because it is used by librdkafka
    if (listOffset.getHeader().apiVersion() == 0) {
        handleListOffsetRequestV0(listOffset, resultFuture);
    } else {
        handleListOffsetRequestV1AndAbove(listOffset, resultFuture);
    }
}
 
源代码14 项目: kop   文件: KafkaRequestHandler.java
protected void handleOffsetCommitRequest(KafkaHeaderAndRequest offsetCommit,
                                         CompletableFuture<AbstractResponse> resultFuture) {
    checkArgument(offsetCommit.getRequest() instanceof OffsetCommitRequest);
    checkState(groupCoordinator != null,
        "Group Coordinator not started");

    OffsetCommitRequest request = (OffsetCommitRequest) offsetCommit.getRequest();

    Map<TopicPartition, Errors> nonExistingTopic = nonExistingTopicErrors(request);

    groupCoordinator.handleCommitOffsets(
        request.groupId(),
        request.memberId(),
        request.generationId(),
        CoreUtils.mapValue(
            request.offsetData().entrySet().stream()
                .filter(entry -> !nonExistingTopic.containsKey(entry.getKey()))
                .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())),
            (partitionData) ->
                OffsetAndMetadata.apply(partitionData.offset, partitionData.metadata, partitionData.timestamp)
        )
    ).thenAccept(offsetCommitResult -> {
        if (nonExistingTopic != null) {
            offsetCommitResult.putAll(nonExistingTopic);
        }
        OffsetCommitResponse response = new OffsetCommitResponse(offsetCommitResult);
        resultFuture.complete(response);
    });
}
 
源代码15 项目: kop   文件: KafkaRequestHandler.java
@Override
protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups,
                                       CompletableFuture<AbstractResponse> resultFuture) {
    checkArgument(listGroups.getRequest() instanceof ListGroupsRequest);
    KeyValue<Errors, List<GroupOverview>> listResult = groupCoordinator.handleListGroups();
    ListGroupsResponse response = new ListGroupsResponse(
        listResult.getKey(),
        listResult.getValue().stream()
            .map(groupOverview -> new Group(groupOverview.groupId(), groupOverview.protocolType()))
            .collect(Collectors.toList())
    );

    resultFuture.complete(response);
}
 
源代码16 项目: kop   文件: KafkaRequestHandler.java
@Override
protected void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups,
                                         CompletableFuture<AbstractResponse> resultFuture) {
    checkArgument(deleteGroups.getRequest() instanceof DeleteGroupsRequest);
    DeleteGroupsRequest request = (DeleteGroupsRequest) deleteGroups.getRequest();

    Map<String, Errors> deleteResult = groupCoordinator.handleDeleteGroups(request.groups());
    DeleteGroupsResponse response = new DeleteGroupsResponse(
        deleteResult
    );
    resultFuture.complete(response);
}
 
源代码17 项目: kop   文件: KafkaRequestHandler.java
@Override
protected void handleSaslHandshake(KafkaHeaderAndRequest saslHandshake,
                                   CompletableFuture<AbstractResponse> resultFuture) {
    checkArgument(saslHandshake.getRequest() instanceof SaslHandshakeRequest);
    SaslHandshakeRequest request = (SaslHandshakeRequest) saslHandshake.getRequest();

    SaslHandshakeResponse response = checkSaslMechanism(request.mechanism());
    resultFuture.complete(response);
}
 
源代码18 项目: kop   文件: MessageFetchContext.java
private void readMessages(KafkaHeaderAndRequest fetch,
                          Map<TopicPartition, Pair<ManagedCursor, Long>> cursors,
                          CompletableFuture<AbstractResponse> resultFuture,
                          LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData) {
    AtomicInteger bytesRead = new AtomicInteger(0);
    Map<TopicPartition, List<Entry>> entryValues = new ConcurrentHashMap<>();

    readMessagesInternal(fetch, cursors, bytesRead, entryValues, resultFuture, responseData);
}
 
源代码19 项目: kop   文件: KafkaCommandDecoder.java
protected ByteBuf responseToByteBuf(AbstractResponse response, KafkaHeaderAndRequest request) {
    try (KafkaHeaderAndResponse kafkaHeaderAndResponse =
             KafkaHeaderAndResponse.responseForRequest(request, response)) {

        return ResponseUtils.serializeResponse(
            kafkaHeaderAndResponse.getApiVersion(),
            kafkaHeaderAndResponse.getHeader(),
            kafkaHeaderAndResponse.getResponse()
        );
    } finally {
        // the request is not needed any more.
        request.close();
    }
}
 
源代码20 项目: kop   文件: KafkaCommandDecoder.java
private KafkaHeaderAndResponse(short apiVersion,
                               ResponseHeader header,
                               AbstractResponse response) {
    this.apiVersion = apiVersion;
    this.header = header;
    this.response = response;
}
 
源代码21 项目: kop   文件: KafkaApisTest.java
void checkInvalidPartition(CompletableFuture<AbstractResponse> future,
                                                          String topic,
                                                          int invalidPartitionId) {
    TopicPartition invalidTopicPartition = new TopicPartition(topic, invalidPartitionId);
    PartitionData partitionOffsetCommitData = new PartitionData(15L, "");
    Map<TopicPartition, PartitionData> offsetData = Maps.newHashMap();
    offsetData.put(invalidTopicPartition, partitionOffsetCommitData);
    KafkaHeaderAndRequest request = buildRequest(new OffsetCommitRequest.Builder("groupId", offsetData));
    kafkaRequestHandler.handleOffsetCommitRequest(request, future);
}
 
源代码22 项目: kop   文件: KafkaApisTest.java
@Test(timeOut = 20000)
public void testBrokerHandleTopicMetadataRequest() throws Exception {
    String topicName = "kopBrokerHandleTopicMetadataRequest";
    int numberTopics = 5;
    int numberPartitions = 6;

    List<TopicPartition> topicPartitions = createTopics(topicName, numberTopics, numberPartitions);
    List<String> kafkaTopics = getCreatedTopics(topicName, numberTopics);
    KafkaHeaderAndRequest metadataRequest = createTopicMetadataRequest(kafkaTopics);
    CompletableFuture<AbstractResponse> responseFuture = new CompletableFuture<>();
    kafkaRequestHandler.handleTopicMetadataRequest(metadataRequest, responseFuture);

    MetadataResponse metadataResponse = (MetadataResponse) responseFuture.get();

    // verify all served by same broker : localhost:port
    assertEquals(metadataResponse.brokers().size(), 1);
    assertEquals(metadataResponse.brokers().iterator().next().host(), "localhost");

    // check metadata response
    Collection<TopicMetadata> topicMetadatas = metadataResponse.topicMetadata();

    log.debug("a. dumpTopicMetadata: ");
    topicMetadatas.forEach(topicMetadata -> {
        log.debug("      topicMetadata: {}", topicMetadata);
        log.debug("b.    dumpPartitionMetadata: ");
        topicMetadata.partitionMetadata().forEach(partition -> {
            log.debug("            PartitionMetadata: {}", partition);
        });
    });

    assertEquals(topicMetadatas.size(), numberTopics);

    topicMetadatas.forEach(topicMetadata -> {
        assertTrue(topicMetadata.topic().startsWith(topicName + "_"));
        assertEquals(topicMetadata.partitionMetadata().size(), numberPartitions);
    });
}
 
源代码23 项目: kafka-utilities   文件: KafkaApiRequest.java
public AbstractResponse getLastApiResponse(final long waitTimeMsBetweenCheckingResponse){

        while(!this.clientResponse.isDone()){
            try {
                Thread.sleep(waitTimeMsBetweenCheckingResponse);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return this.clientResponse.value().responseBody();
    }
 
源代码24 项目: kop   文件: KafkaRequestHandler.java
protected void handleApiVersionsRequest(KafkaHeaderAndRequest apiVersionRequest,
                                        CompletableFuture<AbstractResponse> resultFuture) {
    AbstractResponse apiResponse = overloadDefaultApiVersionsResponse();
    resultFuture.complete(apiResponse);
}
 
源代码25 项目: kop   文件: KafkaRequestHandler.java
private void handleListOffsetRequestV0(KafkaHeaderAndRequest listOffset,
                                       CompletableFuture<AbstractResponse> resultFuture) {
    ListOffsetRequest request = (ListOffsetRequest) listOffset.getRequest();

    Map<TopicPartition, CompletableFuture<ListOffsetResponse.PartitionData>> responseData = Maps.newHashMap();

    // in v0, the iterator is offsetData,
    // in v1, the iterator is partitionTimestamps,
    log.warn("received a v0 listOffset: {}", request.toString(true));
    request.offsetData().entrySet().stream().forEach(tms -> {
        TopicPartition topic = tms.getKey();
        TopicName pulsarTopic = pulsarTopicName(topic, namespace);
        Long times = tms.getValue().timestamp;
        CompletableFuture<ListOffsetResponse.PartitionData> partitionData;

        // num_num_offsets > 1 is not handled for now, returning an error
        if (tms.getValue().maxNumOffsets > 1) {
            log.warn("request is asking for multiples offsets for {}, not supported for now",
                    pulsarTopic.toString());
            partitionData = new CompletableFuture<>();
            partitionData.complete(new ListOffsetResponse
                    .PartitionData(
                    Errors.UNKNOWN_SERVER_ERROR,
                    Collections.singletonList(ListOffsetResponse.UNKNOWN_OFFSET)));
        }

        CompletableFuture<PersistentTopic> persistentTopic = topicManager.getTopic(pulsarTopic.toString());
        partitionData = fetchOffsetForTimestamp(persistentTopic, times, true);

        responseData.put(topic, partitionData);
    });

    CompletableFuture
            .allOf(responseData.values().stream().toArray(CompletableFuture<?>[]::new))
            .whenComplete((ignore, ex) -> {
                ListOffsetResponse response =
                        new ListOffsetResponse(CoreUtils.mapValue(responseData, future -> future.join()));

                resultFuture.complete(response);
            });
}
 
源代码26 项目: kop   文件: KafkaRequestHandler.java
protected void handleJoinGroupRequest(KafkaHeaderAndRequest joinGroup,
                                      CompletableFuture<AbstractResponse> resultFuture) {
    checkArgument(joinGroup.getRequest() instanceof JoinGroupRequest);
    checkState(groupCoordinator != null,
        "Group Coordinator not started");

    JoinGroupRequest request = (JoinGroupRequest) joinGroup.getRequest();

    Map<String, byte[]> protocols = new HashMap<>();
    request.groupProtocols()
        .stream()
        .forEach(protocol -> protocols.put(protocol.name(), Utils.toArray(protocol.metadata())));
    groupCoordinator.handleJoinGroup(
        request.groupId(),
        request.memberId(),
        joinGroup.getHeader().clientId(),
        joinGroup.getClientHost(),
        request.rebalanceTimeout(),
        request.sessionTimeout(),
        request.protocolType(),
        protocols
    ).thenAccept(joinGroupResult -> {

        Map<String, ByteBuffer> members = new HashMap<>();
        joinGroupResult.getMembers().forEach((memberId, protocol) ->
            members.put(memberId, ByteBuffer.wrap(protocol)));

        JoinGroupResponse response = new JoinGroupResponse(
            joinGroupResult.getError(),
            joinGroupResult.getGenerationId(),
            joinGroupResult.getSubProtocol(),
            joinGroupResult.getMemberId(),
            joinGroupResult.getLeaderId(),
            members
        );

        if (log.isTraceEnabled()) {
            log.trace("Sending join group response {} for correlation id {} to client {}.",
                response, joinGroup.getHeader().correlationId(), joinGroup.getHeader().clientId());
        }

        resultFuture.complete(response);
    });
}
 
源代码27 项目: kop   文件: KafkaRequestHandler.java
@Override
protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup,
                                          CompletableFuture<AbstractResponse> resultFuture) {
    checkArgument(describeGroup.getRequest() instanceof DescribeGroupsRequest);
    DescribeGroupsRequest request = (DescribeGroupsRequest) describeGroup.getRequest();

    // let the coordinator to handle heartbeat
    Map<String, GroupMetadata> groups = request.groupIds().stream()
        .map(groupId -> {
            KeyValue<Errors, GroupSummary> describeResult = groupCoordinator
                .handleDescribeGroup(groupId);
            GroupSummary summary = describeResult.getValue();
            List<GroupMember> members = summary.members().stream()
                .map(member -> {
                    ByteBuffer metadata = ByteBuffer.wrap(member.metadata());
                    ByteBuffer assignment = ByteBuffer.wrap(member.assignment());
                    return new GroupMember(
                        member.memberId(),
                        member.clientId(),
                        member.clientHost(),
                        metadata,
                        assignment
                    );
                })
                .collect(Collectors.toList());
            return new KeyValue<>(
                groupId,
                new GroupMetadata(
                    describeResult.getKey(),
                    summary.state(),
                    summary.protocolType(),
                    summary.protocol(),
                    members
                )
            );
        })
        .collect(Collectors.toMap(
            kv -> kv.getKey(),
            kv -> kv.getValue()
        ));
    DescribeGroupsResponse response = new DescribeGroupsResponse(
        groups
    );
    resultFuture.complete(response);
}
 
源代码28 项目: kop   文件: KafkaRequestHandler.java
static AbstractResponse failedResponse(KafkaHeaderAndRequest requestHar, Throwable e) {
    if (log.isDebugEnabled()) {
        log.debug("Request {} get failed response ", requestHar.getHeader().apiKey(), e);
    }
    return requestHar.getRequest().getErrorResponse(((Integer) THROTTLE_TIME_MS.defaultValue), e);
}
 
源代码29 项目: kop   文件: MessageFetchContext.java
public CompletableFuture<AbstractResponse> handleFetch(CompletableFuture<AbstractResponse> fetchResponse) {
    LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();

    // Map of partition and related tcm.
    Map<TopicPartition, CompletableFuture<KafkaTopicConsumerManager>> topicsAndCursor =
        ((FetchRequest) fetchRequest.getRequest())
            .fetchData().entrySet().stream()
            .map(entry -> {
                TopicName topicName = pulsarTopicName(entry.getKey(), requestHandler.getNamespace());

                CompletableFuture<KafkaTopicConsumerManager> consumerManager =
                    requestHandler.getTopicManager().getTopicConsumerManager(topicName.toString());

                return Pair.of(
                    entry.getKey(),
                    consumerManager);
            })
            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));

    // wait to get all the cursor, then readMessages
    CompletableFuture
        .allOf(topicsAndCursor.entrySet().stream().map(Map.Entry::getValue).toArray(CompletableFuture<?>[]::new))
        .whenComplete((ignore, ex) -> {
            Map<TopicPartition, Pair<ManagedCursor, Long>> partitionCursor =
                topicsAndCursor.entrySet().stream()
                    .map(pair -> {
                        KafkaTopicConsumerManager tcm;
                        try {
                            // all future completed now.
                            tcm = pair.getValue().get();
                            if (tcm == null) {
                                throw new NullPointerException("topic not owned, and return null TCM in fetch.");
                            }
                        } catch (Exception e) {
                            log.warn("Error for get KafkaTopicConsumerManager.", e);

                            responseData.put(pair.getKey(),
                                new FetchResponse.PartitionData(
                                    Errors.NOT_LEADER_FOR_PARTITION,
                                    FetchResponse.INVALID_HIGHWATERMARK,
                                    FetchResponse.INVALID_LAST_STABLE_OFFSET,
                                    FetchResponse.INVALID_LOG_START_OFFSET,
                                    null,
                                    MemoryRecords.EMPTY));
                            // result got. this will be filtered in following filter method.
                            return null;
                        }

                        long offset = ((FetchRequest) fetchRequest.getRequest()).fetchData()
                            .get(pair.getKey()).fetchOffset;

                        if (log.isDebugEnabled()) {
                            log.debug("Fetch for {}: remove tcm to get cursor for fetch offset: {} - {}.",
                                pair.getKey(), offset, MessageIdUtils.getPosition(offset));
                        }

                        Pair<ManagedCursor, Long> cursorLongPair = tcm.remove(offset);
                        if (cursorLongPair == null) {
                            log.warn("KafkaTopicConsumerManager.remove({}) return null for topic {}. "
                                    + "Fetch for topic return error.",
                                offset, pair.getKey());

                            responseData.put(pair.getKey(),
                                new FetchResponse.PartitionData(
                                    Errors.NOT_LEADER_FOR_PARTITION,
                                    FetchResponse.INVALID_HIGHWATERMARK,
                                    FetchResponse.INVALID_LAST_STABLE_OFFSET,
                                    FetchResponse.INVALID_LOG_START_OFFSET,
                                    null,
                                    MemoryRecords.EMPTY));
                            // result got. this will be filtered in following filter method.
                            return null;
                        }

                        return Pair.of(pair.getKey(), cursorLongPair);
                    })
                    .filter(x -> x != null)
                    .collect(Collectors.toMap(Pair::getKey, Pair::getValue));

            readMessages(fetchRequest, partitionCursor, fetchResponse, responseData);
        });

    return fetchResponse;
}
 
源代码30 项目: kop   文件: KafkaCommandDecoder.java
protected abstract void
handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, CompletableFuture<AbstractResponse> response);
 
 类所在包
 类方法
 同包方法