com.codahale.metrics.SlidingWindowReservoir#org.apache.kafka.common.TopicPartition源码实例Demo

下面列出了com.codahale.metrics.SlidingWindowReservoir#org.apache.kafka.common.TopicPartition 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: mirus   文件: RangeTaskAssignorTest.java
@Test
public void testAssignBalanced() {
  List<TopicPartition> partitions =
      Arrays.asList(
          new TopicPartition("a", 0),
          new TopicPartition("a", 1),
          new TopicPartition("a", 2),
          new TopicPartition("b", 0),
          new TopicPartition("b", 1),
          new TopicPartition("b", 2));
  List<List<TopicPartition>> result = sourceTaskAssignor.assign(partitions, 2);
  assertThat(
      result,
      is(
          Arrays.asList(
              Arrays.asList(
                  new TopicPartition("a", 0),
                  new TopicPartition("a", 1),
                  new TopicPartition("a", 2)),
              Arrays.asList(
                  new TopicPartition("b", 0),
                  new TopicPartition("b", 1),
                  new TopicPartition("b", 2)))));
}
 
@Test
public void testTreatBadSegmentAsPayload() {
  Serializer<LargeMessageSegment> segmentSerializer = new DefaultSegmentSerializer();
  Deserializer<LargeMessageSegment> segmentDeserializer = new DefaultSegmentDeserializer();
  MessageAssembler messageAssembler = new MessageAssemblerImpl(100, 100, true, segmentDeserializer);
  TopicPartition tp = new TopicPartition("topic", 0);

  UUID uuid = UUID.randomUUID();
  byte[] realPayload = "message".getBytes();
  LargeMessageSegment badSegment = new LargeMessageSegment(uuid, -1, 100, -1, ByteBuffer.wrap(realPayload));
  byte[] messageWrappedBytes = segmentSerializer.serialize(tp.topic(), badSegment);
  Assert.assertTrue(messageWrappedBytes.length > realPayload.length); //wrapping has been done

  messageAssembler.assemble(tp, 0, messageWrappedBytes);

  MessageAssembler.AssembleResult assembleResult = messageAssembler.assemble(tp, 0, messageWrappedBytes);
  Assert.assertEquals(assembleResult.messageBytes(), messageWrappedBytes);
  Assert.assertEquals(assembleResult.messageStartingOffset(), 0);
  Assert.assertEquals(assembleResult.messageEndingOffset(), 0);
}
 
源代码3 项目: common-kafka   文件: FairAssignorTest.java
@Test
public void testTwoConsumersOneTopicOnePartition() {
    String topic = "topic";
    String consumer1 = "consumer1";
    String consumer2 = "consumer2";

    Map<String, Integer> partitionsPerTopic = new HashMap<>();
    partitionsPerTopic.put(topic, 1);

    Map<String, Subscription> consumers = new HashMap<>();
    consumers.put(consumer1, new Subscription(Collections.singletonList(topic)));
    consumers.put(consumer2, new Subscription(Collections.singletonList(topic)));

    Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic, consumers);
    assertEquals(Arrays.asList(new TopicPartition(topic, 0)), assignment.get(consumer1));
    assertEquals(Collections.<TopicPartition>emptyList(), assignment.get(consumer2));
}
 
源代码4 项目: DBus   文件: MaasEvent.java
public MaasEvent(String topic, String dataTopic) {
    super(01);
    this.topic = topic;
    this.dataTopic = dataTopic;
    dao = new DbusDataDaoImpl();
    Properties props = HeartBeatConfigContainer.getInstance().getmaasConf().getConsumerProp();
    Properties producerProps = HeartBeatConfigContainer.getInstance().getmaasConf().getProducerProp();
    try {
        LoggerFactory.getLogger().info("[topic]   ...." + topic);
        LoggerFactory.getLogger().info("[maas-event]  initial.........................");
        dataConsumer = new KafkaConsumer<>(props);
        partition0 = new TopicPartition(this.topic, 0);
        dataConsumer.assign(Arrays.asList(partition0));
        dataConsumer.seekToEnd(Arrays.asList(partition0));

        statProducer = new KafkaProducer<>(producerProps);

    } catch (Exception e) {
        e.printStackTrace();
        LoggerFactory.getLogger().error(e.getMessage(), e);
    }
}
 
源代码5 项目: uReplicator   文件: WorkerInstance.java
/**
 * Adds topic partition to worker instance
 *
 * @param topic topic name
 * @param partition partition id
 * @param startingOffset starting offset for topic partition
 * @param endingOffset ending offset for topic partition
 * @param dstTopic topic name in destination cluster
 */
public void addTopicPartition(String topic, int partition, Long startingOffset,
    Long endingOffset, String dstTopic) {
  if (observer != null) {
    observer.addTopic(topic);
  }
  if (StringUtils.isNotBlank(dstTopic)) {
    topicMapping.put(topic, dstTopic);
  }
  TopicPartition topicPartition = new TopicPartition(topic, partition);
  long offset =
      startingOffset != null ? startingOffset
          : checkpointManager.fetchOffset(topicPartition);

  LOGGER.info("Adding topic: {}, partition {}, starting offset {}",
      topic, partition, offset);
  PartitionOffsetInfo offsetInfo = new PartitionOffsetInfo(topicPartition, offset, endingOffset);
  fetcherManager.addTopicPartition(topicPartition, offsetInfo);
}
 
源代码6 项目: samza   文件: TestKafkaSystemAdminWithMock.java
@Test
public void testGetSystemStreamMetaDataWithRetry() {
  final List<PartitionInfo> partitionInfosForTopic = ImmutableList.of(mockPartitionInfo0, mockPartitionInfo1);
  when(mockKafkaConsumer.partitionsFor(VALID_TOPIC)).thenThrow(new RuntimeException())
      .thenReturn(partitionInfosForTopic);

  Map<String, SystemStreamMetadata> metadataMap =
      kafkaSystemAdmin.getSystemStreamMetadata(ImmutableSet.of(VALID_TOPIC));
  assertEquals("metadata should return for 1 topic", metadataMap.size(), 1);

  // retried twice because the first fails and the second succeeds
  Mockito.verify(mockKafkaConsumer, Mockito.times(2)).partitionsFor(VALID_TOPIC);

  final List<TopicPartition> topicPartitions =
      Arrays.asList(new TopicPartition(mockPartitionInfo0.topic(), mockPartitionInfo0.partition()),
          new TopicPartition(mockPartitionInfo1.topic(), mockPartitionInfo1.partition()));
  // the following methods thereafter are only called once
  Mockito.verify(mockKafkaConsumer, Mockito.times(1)).beginningOffsets(topicPartitions);
  Mockito.verify(mockKafkaConsumer, Mockito.times(1)).endOffsets(topicPartitions);
}
 
源代码7 项目: kafka-graphs   文件: TopicPartitionDeserializer.java
@Override
public TopicPartition deserialize(String topic, byte[] data) {
    if (data == null || data.length == 0) {
        return null;
    }
    try {
        ByteBuffer buf = ByteBuffer.wrap(data);
        int topicLength = buf.getInt();
        byte[] topicBytes = new byte[topicLength];
        buf.get(topicBytes);
        String otherTopic = new String(topicBytes, ENCODING);
        int partition = buf.getInt();

        return new TopicPartition(otherTopic, partition);
    } catch (UnsupportedEncodingException e) {
        throw new SerializationException("Error when deserializing byte[] to string");
    }
}
 
源代码8 项目: synapse   文件: KafkaRecordsConsumerTest.java
@Test
public void shouldNotChangeDurationBehindOnNoRecords() {
    // given
    final KafkaRecordsConsumer consumer = someKafkaRecordsConsumer(fromHorizon());
    durationBehindHandler.onPartitionsAssigned(asList(new TopicPartition("", 0), new TopicPartition("", 1)));
    ConsumerRecord<String,String> consumerRecord = new ConsumerRecord<>("", 0, 23, now().minusSeconds(100).toEpochMilli(), TimestampType.CREATE_TIME, 0, 0, 0, "key", "value");

    consumer.apply(new ConsumerRecords<>(ImmutableMap.of(new TopicPartition("", 0), singletonList(consumerRecord))));
    assertThat(getSecondsBehind("0"), is(100L));
    assertThat(getSecondsBehind("1"), is(9223372036854775L));

    // when
    consumer.apply(ConsumerRecords.empty());

    // then
    assertThat(getSecondsBehind("0"), is(100L));
    assertThat(getSecondsBehind("1"), is(9223372036854775L));
}
 
源代码9 项目: data-highway   文件: OffsetManager.java
public Map<Integer, Long> getCommittedOffsets(String topicName) {
  synchronized (consumer) {
    List<TopicPartition> topicPartitions = topicPartitions(topicName);
    ImmutableMap.Builder<Integer, Long> builder = ImmutableMap.builder();
    topicPartitions.forEach(tp -> {
      OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);
      Long offset;
      if (offsetAndMetadata == null) {
        offset = consumer.beginningOffsets(singleton(tp)).get(tp);
      } else {
        offset = offsetAndMetadata.offset();
      }
      builder.put(tp.partition(), offset);
    });
    return builder.build();
  }
}
 
private String generateRecordKey(final TopicPartition tp, final SinkRecord headRecord) {
    //FIXME move into commons lib
    final Function<Parameter, String> setKafkaOffset =
        usePaddingParameter -> usePaddingParameter.asBoolean()
            ? String.format("%020d", headRecord.kafkaOffset())
            : Long.toString(headRecord.kafkaOffset());

    return filenameTemplate.instance()
        .bindVariable(FilenameTemplateVariable.TOPIC.name, tp::topic)
        .bindVariable(
            FilenameTemplateVariable.PARTITION.name,
            () -> Integer.toString(tp.partition())
        ).bindVariable(
            FilenameTemplateVariable.START_OFFSET.name,
            setKafkaOffset
        ).bindVariable(
            FilenameTemplateVariable.TIMESTAMP.name,
            setTimestamp
        ).render();
}
 
源代码11 项目: beast   文件: OffsetCommitWorkerIntegrationTest.java
@Test
public void shouldStopWhenNoAcknowledgements() throws InterruptedException {
    Map<TopicPartition, OffsetAndMetadata> offsetMap1 = recordsUtil.createRecords("driver-", 3).getPartitionsCommitOffset();
    Map<TopicPartition, OffsetAndMetadata> offsetMap2 = recordsUtil.createRecords("customer-", 3).getPartitionsCommitOffset();
    Map<TopicPartition, OffsetAndMetadata> offsetMap3 = recordsUtil.createRecords("merchant-", 3).getPartitionsCommitOffset();
    List<Map<TopicPartition, OffsetAndMetadata>> recordsList = Arrays.asList(offsetMap1, offsetMap2, offsetMap3);
    commitQueue.addAll(recordsList);
    committer.setDefaultSleepMs(10);
    Thread committerThread = new Thread(committer);
    committerThread.start();

    committerThread.join();

    InOrder inOrder = inOrder(kafkaConsumer);
    inOrder.verify(kafkaConsumer, never()).commitSync(anyMap());
    assertEquals(2, commitQueue.size());
    inOrder.verify(kafkaConsumer, atLeastOnce()).wakeup(anyString());
    assertTrue(acknowledgements.isEmpty());
}
 
源代码12 项目: rya   文件: KafkaQueryChangeLogIT.java
@Test
public void readFromPosition_positionStartsNotBegining() throws Exception {
    final List<QueryChange> expected = write10ChangesToChangeLog().subList(5, 10);

    // set the position to some non-0 position
    final TopicPartition partition = new TopicPartition(topic, 0);
    consumer.assign(Lists.newArrayList(partition));
    consumer.seekToEnd(Lists.newArrayList(partition));
    final CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> iter = changeLog.readFromPosition(5L);

    final List<QueryChange> actual = new ArrayList<>();
    while (iter.hasNext()) {
        final ChangeLogEntry<QueryChange> entry = iter.next();
        actual.add(entry.getEntry());
    }
    assertEquals(expected, actual);
}
 
public Map<TopicPartition, OffsetFetchResponse.PartitionData> getCommitedOffsets(final String groupName, final List<TopicPartition> topicPartitions,
                                                                                 final long responseWaitTime) throws OffsetFetchException {
    if(this.coordinator == null){
        throw new OffsetFetchException("Missing Group Coordinator for group:" + groupName);
    }

    OffsetFetchRequest.Builder offsetRequestBuilder =
            new OffsetFetchRequest.Builder(groupName, topicPartitions);
    this.kafkaApiRequest.sendApiRequest(this.coordinator, offsetRequestBuilder);
    OffsetFetchResponse offsetFetchResponse =(OffsetFetchResponse) this.kafkaApiRequest.getLastApiResponse(responseWaitTime);
    if(offsetFetchResponse.error() == Errors.NONE) {
        return offsetFetchResponse.responseData();
    }else{
        throw new OffsetFetchException(offsetFetchResponse.error().message());
    }
}
 
源代码14 项目: DBus   文件: KafkaReader.java
/**
 * createConsumer - create a new consumer
 *
 * @return
 * @throws Exception
 */
private Consumer<String, String> createConsumer() throws Exception {

    // Seek to end automatically
    TopicPartition dataTopicPartition = new TopicPartition(topicName, 0);
    List<TopicPartition> topics = Arrays.asList(dataTopicPartition);

    Properties props = ConfUtils.getProps(CONSUMER_PROPS);
    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.assign(topics);

    if (offset == -1) {
        consumer.seekToEnd(topics);
        logger.info("Consumer seek to end");
    } else {
        consumer.seek(dataTopicPartition, offset);
        logger.info(String.format("read changed as offset: %s", consumer.position(dataTopicPartition)));
    }
    return consumer;
}
 
源代码15 项目: mirus   文件: TaskConfigBuilder.java
/** Generate a list of Task Configuration Maps from the current list of partitions. */
public List<Map<String, String>> fromPartitionList(
    int maxTasks, List<TopicPartition> topicPartitionList) {

  // Assign partitions to tasks.
  List<List<TopicPartition>> partitionsByTask =
      sourceTaskAssignor.assign(
          topicPartitionList, Math.min(topicPartitionList.size(), maxTasks));

  // Generate configuration for each task.
  AtomicInteger taskCounter = new AtomicInteger();
  return partitionsByTask
      .stream()
      .map(TopicPartitionSerDe::toJson)
      .map(partitionList -> mapOf(TaskConfigDefinition.PARTITION_LIST, partitionList))
      .peek(t -> t.putAll(filteredConfig))
      .map(m -> makeClientIdUnique(m, taskCounter.getAndIncrement()))
      .collect(Collectors.toList());
}
 
@Test
public void testPutFlush() {
    HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

    // We do not call task.start() since it would override the output stream

    task.put(Arrays.asList(
            new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line1", 1)
    ));
    offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(1L));
    task.flush(offsets);
    assertEquals("line1\n", os.toString());

    task.put(Arrays.asList(
            new SinkRecord("topic1", 0, null, null, Schema.STRING_SCHEMA, "line2", 2),
            new SinkRecord("topic2", 0, null, null, Schema.STRING_SCHEMA, "line3", 1)
    ));
    offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(2L));
    offsets.put(new TopicPartition("topic2", 0), new OffsetAndMetadata(1L));
    task.flush(offsets);
    assertEquals("line1\nline2\nline3\n", os.toString());
}
 
private int processLogDirsWithinBroker(
    Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> topicPartitionReplicaInfoMap, String topic,
    Node broker) {
  int totalPartitionsInBroker = 0;
  for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> topicPartitionReplicaInfoEntry : topicPartitionReplicaInfoMap
      .entrySet()) {

    TopicPartition topicPartition = topicPartitionReplicaInfoEntry.getKey();
    DescribeLogDirsResponse.ReplicaInfo replicaInfo = topicPartitionReplicaInfoEntry.getValue();

    if (topicPartition.topic().equals(topic)) {
      totalPartitionsInBroker++;
      LOGGER.trace("totalPartitions In The Broker = {}", totalPartitionsInBroker);
    }

    LOGGER.trace("broker information: {}", broker);
    LOGGER.trace("logDirInfo for kafka-logs: topicPartition = {}, replicaInfo = {}", topicPartition, replicaInfo);
  }

  return totalPartitionsInBroker;
}
 
源代码18 项目: kop   文件: GroupMetadata.java
public void prepareTxnOffsetCommit(long producerId,
                                   Map<TopicPartition, OffsetAndMetadata> offsets) {
    if (log.isTraceEnabled()) {
        log.trace("TxnOffsetCommit for producer {} and group {} with offsets {} is pending",
            producerId, groupId, offsets);
    }
    receivedTransactionalOffsetCommits = true;
    Map<TopicPartition, CommitRecordMetadataAndOffset> producerOffsets =
        pendingTransactionalOffsetCommits.computeIfAbsent(producerId, pid -> new HashMap<>());
    offsets.forEach((tp, offsetsAndMetadata) -> producerOffsets.put(tp, new CommitRecordMetadataAndOffset(
        Optional.empty(),
        offsetsAndMetadata
    )));
}
 
源代码19 项目: doctorkafka   文件: KafkaBroker.java
/**
 *  Record the stats, and update the topic partition list based on the stats
 *
 *  @param stats the broker stats
 */
public synchronized void update(BrokerStats stats) {
  if (stats == null
      || (latestStats != null && latestStats.getTimestamp() > stats.getTimestamp())
      || stats.getHasFailure()) {
    return;
  }

  brokerName = stats.getName();
  latestStats = stats;
  if (rackId == null) {
    rackId = stats.getRackId() != null ? stats.getRackId() : stats.getAvailabilityZone();
  }

  if (stats.getLeaderReplicas() != null) {
    setLeaderReplicas(stats.getLeaderReplicas()
        .stream()
        .map(tps -> new TopicPartition(tps.getTopic(), tps.getPartition()))
        .collect(Collectors.toSet())
    );
  }

  if (stats.getFollowerReplicas() != null ) {
    setFollowerReplicas(stats.getFollowerReplicas()
        .stream()
        .map(tps -> new TopicPartition(tps.getTopic(), tps.getPartition()))
        .collect(Collectors.toSet())
    );
  }
}
 
源代码20 项目: flink   文件: KafkaConsumerThread.java
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
	commitInProgress = false;

	if (ex != null) {
		log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
		internalCommitCallback.onException(ex);
	} else {
		internalCommitCallback.onSuccess();
	}
}
 
源代码21 项目: secor   文件: SecorConsumerRebalanceListener.java
@Override
public void onPartitionsRevoked(Collection<TopicPartition> assignedPartitions) {
    // Here we do a force upload/commit so the other consumer take over partition(s) later don't need to rewrite the same messages
    LOG.info("re-balance starting, forcing uploading current assigned partitions {}", assignedPartitions);

    List<com.pinterest.secor.common.TopicPartition> tps = assignedPartitions.stream().map(p -> new com.pinterest.secor.common.TopicPartition(p.topic(), p.partition())).collect(Collectors.toList());
    tps.stream().map(p -> p.getTopic()).collect(Collectors.toSet()).forEach(topic -> StatsUtil.incr("secor.consumer_rebalance_count." + topic));
    handler.uploadOnRevoke(tps);
}
 
源代码22 项目: doctorkafka   文件: DoctorKafkaActionsServlet.java
private List<ConsumerRecord<byte[], byte[]>> retrieveActionReportMessages() {
  DoctorKafkaConfig doctorKafkaConfig = DoctorKafkaMain.doctorKafka.getDoctorKafkaConfig();
  String zkUrl = doctorKafkaConfig.getBrokerstatsZkurl();
  String actionReportTopic = doctorKafkaConfig.getActionReportTopic();
  Properties properties =
      OperatorUtil.createKafkaConsumerProperties(zkUrl, OPERATOR_ACTIONS_CONSUMER_GROUP,
          doctorKafkaConfig.getActionReportProducerSecurityProtocol(),
          doctorKafkaConfig.getActionReportProducerSslConfigs());
  KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties);

  TopicPartition operatorReportTopicPartition = new TopicPartition(actionReportTopic, 0);
  List<TopicPartition> tps = new ArrayList<>();
  tps.add(operatorReportTopicPartition);
  consumer.assign(tps);

  Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(tps);
  Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
  for (TopicPartition tp : endOffsets.keySet()) {
    long numMessages = endOffsets.get(tp) - beginOffsets.get(tp);
    LOG.info("{} : offsets [{}, {}], num messages : {}",
        tp, beginOffsets.get(tp), endOffsets.get(tp), numMessages);
    consumer.seek(tp, Math.max(beginOffsets.get(tp), endOffsets.get(tp) - NUM_MESSAGES));
  }

  ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_TIMEOUT_MS);
  List<ConsumerRecord<byte[], byte[]>> recordList = new ArrayList<>();

  while (!records.isEmpty()) {
    for (ConsumerRecord<byte[], byte[]> record : records) {
      recordList.add(record);
    }
    records = consumer.poll(CONSUMER_POLL_TIMEOUT_MS);
  }
  LOG.info("Read {} messages", recordList.size());
  return recordList;
}
 
源代码23 项目: incubator-pinot   文件: KafkaStreamLevelConsumer.java
private Map<TopicPartition, OffsetAndMetadata> getOffsetsMap() {
  Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
  for (Integer partition : consumerOffsets.keySet()) {
    offsetsMap.put(new TopicPartition(_streamConfig.getTopicName(), partition),
        new OffsetAndMetadata(consumerOffsets.get(partition)));
  }
  return offsetsMap;
}
 
源代码24 项目: atlas   文件: AtlasKafkaConsumer.java
@Override
public void commit(TopicPartition partition, long offset) {
    if (!autoCommitEnabled) {
        if (LOG.isDebugEnabled()) {
            LOG.info(" commiting the offset ==>> " + offset);
        }
        kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset)));
    }
}
 
源代码25 项目: mirus   文件: TopicPartitionSerDe.java
static String toJson(List<TopicPartition> topicPartitionList) {
  try {
    return OBJECT_MAPPER.writeValueAsString(topicPartitionList);
  } catch (JsonProcessingException e) {
    throw new RuntimeException(e);
  }
}
 
/**
 * @return all topics with their associated consumerCount and consumerGroupCount
 */
private static Map<String, List<Integer>> getTopicConsumerAndGroupCounts(
    KafkaConsumerGroupClient consumerGroupClient
) {

  List<String> consumerGroups = consumerGroupClient.listGroups();

  Map<String, AtomicInteger> topicConsumerCount = new HashMap<>();
  Map<String, Set<String>> topicConsumerGroupCount = new HashMap<>();

  for (String group : consumerGroups) {
    Collection<KafkaConsumerGroupClientImpl.ConsumerSummary> consumerSummaryList =
        consumerGroupClient.describeConsumerGroup(group).consumers();

    for (KafkaConsumerGroupClientImpl.ConsumerSummary consumerSummary : consumerSummaryList) {

      for (TopicPartition topicPartition : consumerSummary.partitions()) {
        topicConsumerCount
            .computeIfAbsent(topicPartition.topic(), k -> new AtomicInteger())
            .incrementAndGet();
        topicConsumerGroupCount
            .computeIfAbsent(topicPartition.topic(), k -> new HashSet<>()).add(group);
      }
    }
  }
  HashMap<String, List<Integer>> results = new HashMap<>();
  topicConsumerCount.forEach(
      (k, v) -> {
        results.computeIfAbsent(k, v1 -> new ArrayList<>()).add(v.intValue());
        results.get(k).add(topicConsumerGroupCount.get(k).size());
      }
  );

  return results;
}
 
@Test
public void notInSync() throws Exception {
  TopicPartitionInfo tpi = new TopicPartitionInfo(0, node0, singletonList(node0), singletonList(node1));
  KafkaFuture<Map<String, TopicDescription>> kafkaFuture = topicDescriptionFuture(tpi);

  doReturn(describeTopicsResult).when(adminClient).describeTopics(topics);
  doReturn(kafkaFuture).when(describeTopicsResult).all();

  Map<TopicPartition, LeaderInSync> result = underTest.apply(0, topics);

  assertThat(result.size(), is(1));
  LeaderInSync leaderInSync = result.get(new TopicPartition(topic, 0));
  assertThat(leaderInSync.isLeader(), is(true));
  assertThat(leaderInSync.isInSync(), is(false));
}
 
源代码28 项目: kbear   文件: ConsumerProxy.java
@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
    Objects.requireNonNull(timeout, "timeout");
    if (timeout.toMillis() < 0)
        throw new IllegalArgumentException("timeout must not be negative");

    Map<String, Collection<TopicPartition>> map = toMap(partitions);
    return runWithoutConcurrency(() -> {
        Map<TopicPartition, Long> result = new HashMap<>();
        forEach(map::containsKey, (t, c) -> result.putAll(c.getConsumer().beginningOffsets(map.get(t), timeout)));
        return Collections.unmodifiableMap(result);
    });
}
 
源代码29 项目: kafka-workers   文件: OffsetsStateTest.java
private void checkExpectedToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, Long expectedToCommit) {
    if (expectedToCommit != null) {
        assertThat(offsetsToCommit).containsOnly(
                entry(TOPIC_PARTITION_0, new OffsetAndMetadata(expectedToCommit + 1))
        );
    } else {
        assertThat(offsetsToCommit).isEmpty();
    }
}
 
源代码30 项目: df_data_service   文件: AvroConsumerTest.java
public static void consumeFromTime(String topic) {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));

    boolean flag = true;

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);

        if (flag) {
            Set<TopicPartition> assignments = consumer.assignment();
            Map<TopicPartition, Long> query = new HashMap<>();
            for (TopicPartition topicPartition : assignments) {
                query.put(
                        topicPartition,
                        Instant.now().minus(5, MINUTES).toEpochMilli());
            }

            Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);

            result.entrySet()
                    .stream()
                    .forEach(entry ->
                            consumer.seek(
                                    entry.getKey(),
                                    Optional.ofNullable(entry.getValue())
                                            .map(OffsetAndTimestamp::offset)
                                            .orElse(new Long(0))));

            flag = false;
        }

        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        if (!records.isEmpty()) break;
    }


}