org.apache.kafka.clients.consumer.OffsetOutOfRangeException#org.apache.kafka.clients.consumer.NoOffsetForPartitionException源码实例Demo

下面列出了org.apache.kafka.clients.consumer.OffsetOutOfRangeException#org.apache.kafka.clients.consumer.NoOffsetForPartitionException 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Override
public void seekToCommitted(Collection<TopicPartition> partitions) {
  // current offsets are being moved so don't throw cached exceptions in poll.
  clearRecordProcessingException();

  for (TopicPartition tp : partitions) {
    OffsetAndMetadata offsetAndMetadata = _kafkaConsumer.committed(tp);
    if (offsetAndMetadata == null) {
      throw new NoOffsetForPartitionException(tp);
    }
    _kafkaConsumer.seek(tp, offsetAndMetadata.offset());
    _consumerRecordsProcessor.clear(tp);
    Long hw = LiKafkaClientsUtils.offsetFromWrappedMetadata(offsetAndMetadata.metadata());
    if (hw == null) {
      hw = offsetAndMetadata.offset();
    }
    _consumerRecordsProcessor.setPartitionConsumerHighWaterMark(tp, hw);
  }
}
 
源代码2 项目: attic-apex-malhar   文件: KafkaConsumerWrapper.java
protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e,
    AbstractKafkaConsumer consumer)
{
  // if initialOffset is set to EARLIST or LATEST
  // and the application is run as first time
  // then there is no existing committed offset and this error will be caught
  // we need to seek to either beginning or end of the partition
  // based on the initial offset setting
  AbstractKafkaInputOperator.InitialOffset io =
      AbstractKafkaInputOperator.InitialOffset.valueOf(ownerOperator.getInitialOffset());
  if (io == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST
      || io == AbstractKafkaInputOperator.InitialOffset.EARLIEST) {
    consumer.seekToBeginning(e.partitions().toArray(new TopicPartition[0]));
  } else {
    consumer.seekToEnd(e.partitions().toArray(new TopicPartition[0]));
  }

}
 
@Test
public void testPosition() throws Exception {
  String topic = "testSeek";
  createTopic(topic);
  TopicPartition tp = new TopicPartition(topic, 0);
  TopicPartition tp1 = new TopicPartition(topic, 1);
  produceSyntheticMessages(topic);

  // Reset to earliest
  Properties props = new Properties();
  props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPosition1");
  try (LiKafkaConsumer<String, String> consumer = createConsumer(props)) {
    consumer.assign(Arrays.asList(tp, tp1));
    assertEquals(0, consumer.position(tp));
  }

  // Reset to latest
  props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPosition2");
  try (LiKafkaConsumer<String, String> consumer = createConsumer(props)) {
    consumer.assign(Arrays.asList(tp, tp1));
    assertEquals(consumer.position(tp), 10);
  }

  props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
  props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPosition3");
  try (LiKafkaConsumer<String, String> consumer = createConsumer(props)) {
    consumer.assign(Arrays.asList(tp, tp1));
    consumer.position(tp);
    fail("Should have thrown NoOffsetForPartitionException");
  } catch (NoOffsetForPartitionException nofpe) {
    // let it go.
  }
}
 
private long positionMain(TopicPartition partition, Duration timeout) {
  // Not handling large message here. The position will be actual position.
  while (true) { // In kafka 0.10.x we can get an unbounded number of invalid offset exception
    try {
      if (timeout == null) {
        return _kafkaConsumer.position(partition);
      } else {
        return _kafkaConsumer.position(partition, timeout);
      }
    } catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) {
      handleInvalidOffsetException(oe);
    }
  }
}
 
/**
 * Handle when Kafka consumer throws NoOffsetForPartitionException. The base behavior is to seek to start position
 * for all partitions.
 * @param e the Exception
 */
protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e) {
  _logger.info("Poll threw NoOffsetForPartitionException for partitions {}.", e.partitions());
  if (!_shutdown) {
    // Seek to start position, by default we are starting from latest one as we just start consumption
    seekToStartPosition(_consumer, e.partitions(),
        _consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, CONSUMER_AUTO_OFFSET_RESET_CONFIG_LATEST));
  }
}
 
源代码6 项目: nakadi   文件: NakadiKafkaConsumerTest.java
@Test
@SuppressWarnings("unchecked")
public void whenReadEventsThenNakadiRuntimeBaseException() {

    // ARRANGE //
    final ImmutableList<RuntimeException> exceptions = ImmutableList.of(new NoOffsetForPartitionException(
            new TopicPartition("", 0)), new KafkaException());

    int numberOfNakadiRuntimeBaseExceptions = 0;
    for (final Exception exception : exceptions) {
        final KafkaConsumer<byte[], byte[]> kafkaConsumerMock = mock(KafkaConsumer.class);
        when(kafkaConsumerMock.poll(POLL_TIMEOUT)).thenThrow(exception);

        try {

            // ACT //
            final NakadiKafkaConsumer consumer = new NakadiKafkaConsumer(kafkaConsumerMock,
                    ImmutableList.of(), createTpTimelineMap(), POLL_TIMEOUT);
            consumer.readEvents();

            // ASSERT //
            fail("An Exception was expected to be be thrown");
        } catch (final Exception e) {
            numberOfNakadiRuntimeBaseExceptions++;
        }
    }

    assertThat("We should get a NakadiBaseException for every call",
            numberOfNakadiRuntimeBaseExceptions,
            equalTo(exceptions.size()));
}
 
private ConsumerRecords<K, V> poll(long timeout, boolean includeMetadataInTimeout) {
  ConsumerRecords<K, V> processedRecords;
  // We will keep polling until timeout.
  long now = System.currentTimeMillis();
  long deadline = now + timeout;
  do {
    ConsumerRecordsProcessingException crpe;

    // throw exception to user if the current active (un-paused) topic-partitions has exceptions
    Set<TopicPartition> unPausedTopicPartitions = new HashSet<>(_kafkaConsumer.assignment());
    unPausedTopicPartitions.removeAll(_kafkaConsumer.paused());
    crpe = handleRecordProcessingException(unPausedTopicPartitions);
    if (crpe != null) {
      throw crpe;
    }

    if (_autoCommitEnabled && now > _lastAutoCommitMs + _autoCommitInterval) {
      commitAsync();
      _lastAutoCommitMs = now;
    }
    ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
    try {
      if (includeMetadataInTimeout) {
        rawRecords = _kafkaConsumer.poll(Duration.ofMillis(deadline - now));
      } else {
        rawRecords = _kafkaConsumer.poll(deadline - now);
      }
    } catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) {
      handleInvalidOffsetException(oe);
    }

    _lastProcessedResult = _consumerRecordsProcessor.process(rawRecords);
    processedRecords = _lastProcessedResult.consumerRecords();
    // Clear the internal reference.
    _lastProcessedResult.clearRecords();
    // Rewind offset if there are processing exceptions.
    seekToCurrentOffsetsOnRecordProcessingExceptions();

    // this is an optimization
    // if no records were processed try to throw exception in current poll()
    if (processedRecords.isEmpty()) {
      crpe = handleRecordProcessingException(null);
      if (crpe != null) {
        throw crpe;
      }
    }
    now = System.currentTimeMillis();
  } while (processedRecords.isEmpty() && now < deadline);
  return processedRecords;
}
 
/**
 * This method handles the OffsetResetStrategy="LICLOSEST" offset reset strategy.
 *
 * The semantics of this strategy is defined as follows:
 * Consumer will {@link #seekToBeginning(Collection)} when InvalidOffsetException occurs due to:
 * 1. New Consumer / Expired Commit Offset
 * 2. Fall-off Start (fetch offset < LSO)
 *
 * Consumer will {@link #seekToEnd(Collection)} when InvalidOffsetException occurs due to:
 * 3a. Fall-off End (fetch offset > LEO): Consumer will seek to the end
 * 3b. Fall-off End (fetch offset <= LEO): Consumer will seek to the fetched offset
 *
 * Note: Offset to which we reset may not necessarily be a safe offset. This method invokes 2 blocking calls and does
 * ignore large-message tracking metadata. If we are unable to calculate the bounds, it will throw an
 * IllegalStateException.
 *
 * Design details can be found here - https://docs.google.com/document/d/1zKGXxZiyiRkLJ_d0FCoGALfAo0N7k3hh9NFYhJrbPsw/edit#
 * @param oe InvalidOffsetException
 */
private void handleLiClosestResetStrategy(InvalidOffsetException oe) {
  if (oe instanceof NoOffsetForPartitionException) {  // Case 1
    LOG.info("No valid offsets found. Rewinding to the earliest");
    seekToBeginning(oe.partitions());
  } else if (oe instanceof OffsetOutOfRangeException) {
    Map<TopicPartition, Long> seekBeginningPartitions = new HashMap<>();
    Map<TopicPartition, Long> seekEndPartitions = new HashMap<>();
    Map<TopicPartition, Long> seekFetchedOffsetPartitions = new HashMap<>();
    Set<TopicPartition> boundsUnknownPartitions = new HashSet<>();

    Map<TopicPartition, Long> beginningOffsets = beginningOffsets(oe.partitions());
    Map<TopicPartition, Long> endOffsets = endOffsets(oe.partitions());

    ((OffsetOutOfRangeException) oe).offsetOutOfRangePartitions().forEach((tp, fetchedOffset) -> {
      long beginningOffset = beginningOffsets.getOrDefault(tp, -1L);
      long endOffset = endOffsets.getOrDefault(tp, -1L);
      if (beginningOffset != -1L && endOffset != -1L) {
        if (beginningOffset > fetchedOffset) {  // Case 2
          seekBeginningPartitions.put(tp, beginningOffset);
          return;
        }
        if (endOffset < fetchedOffset) {  // Case 3a
          LOG.debug("Closest offset computed for topic partition {} is the log end offset {}. ", tp, fetchedOffset);
          seekEndPartitions.put(tp, endOffset);
        } else {  // Case 3b: endOffset >= fetchedOffset
          LOG.debug("Closest offset computed for topic partition {} is the fetched offset {}. ", tp, fetchedOffset);
          seekFetchedOffsetPartitions.put(tp, fetchedOffset);
        }
      } else {
        // can't handle reset if the either bound values are not known
        // ideally, this should never happen since the listoffsets protocol always returns all requested offset or none
        boundsUnknownPartitions.add(tp);
      }
    });

    if (!boundsUnknownPartitions.isEmpty()) {
      throw new IllegalStateException("Couldn't figure out the closest offset for these topic partitions " +
          boundsUnknownPartitions + "Aborting..");
    }

    if (!seekBeginningPartitions.isEmpty()) {
      LOG.info("Offsets are out of range for partitions {}. Seeking to the beginning offsets returned", seekBeginningPartitions);
      seekBeginningPartitions.forEach(this::seekAndClear);
    }
    if (!seekEndPartitions.isEmpty()) {
      LOG.info("Offsets are out of range for partitions {}. Seeking to the end offsets returned", seekEndPartitions);
      seekEndPartitions.forEach(this::seekAndClear);
    }
    if (!seekFetchedOffsetPartitions.isEmpty()) {
      LOG.info("Seeking to fetched offsets for topic partitions {}. This may indicate a potential loss of data.",
          seekFetchedOffsetPartitions.keySet());
      seekFetchedOffsetPartitions.forEach(this::seekAndClear);
    }
  } else {
    throw oe;
  }
}