下面列出了org.apache.kafka.clients.consumer.OffsetOutOfRangeException#org.apache.kafka.clients.consumer.NoOffsetForPartitionException 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void seekToCommitted(Collection<TopicPartition> partitions) {
// current offsets are being moved so don't throw cached exceptions in poll.
clearRecordProcessingException();
for (TopicPartition tp : partitions) {
OffsetAndMetadata offsetAndMetadata = _kafkaConsumer.committed(tp);
if (offsetAndMetadata == null) {
throw new NoOffsetForPartitionException(tp);
}
_kafkaConsumer.seek(tp, offsetAndMetadata.offset());
_consumerRecordsProcessor.clear(tp);
Long hw = LiKafkaClientsUtils.offsetFromWrappedMetadata(offsetAndMetadata.metadata());
if (hw == null) {
hw = offsetAndMetadata.offset();
}
_consumerRecordsProcessor.setPartitionConsumerHighWaterMark(tp, hw);
}
}
protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e,
AbstractKafkaConsumer consumer)
{
// if initialOffset is set to EARLIST or LATEST
// and the application is run as first time
// then there is no existing committed offset and this error will be caught
// we need to seek to either beginning or end of the partition
// based on the initial offset setting
AbstractKafkaInputOperator.InitialOffset io =
AbstractKafkaInputOperator.InitialOffset.valueOf(ownerOperator.getInitialOffset());
if (io == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST
|| io == AbstractKafkaInputOperator.InitialOffset.EARLIEST) {
consumer.seekToBeginning(e.partitions().toArray(new TopicPartition[0]));
} else {
consumer.seekToEnd(e.partitions().toArray(new TopicPartition[0]));
}
}
@Test
public void testPosition() throws Exception {
String topic = "testSeek";
createTopic(topic);
TopicPartition tp = new TopicPartition(topic, 0);
TopicPartition tp1 = new TopicPartition(topic, 1);
produceSyntheticMessages(topic);
// Reset to earliest
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPosition1");
try (LiKafkaConsumer<String, String> consumer = createConsumer(props)) {
consumer.assign(Arrays.asList(tp, tp1));
assertEquals(0, consumer.position(tp));
}
// Reset to latest
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPosition2");
try (LiKafkaConsumer<String, String> consumer = createConsumer(props)) {
consumer.assign(Arrays.asList(tp, tp1));
assertEquals(consumer.position(tp), 10);
}
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPosition3");
try (LiKafkaConsumer<String, String> consumer = createConsumer(props)) {
consumer.assign(Arrays.asList(tp, tp1));
consumer.position(tp);
fail("Should have thrown NoOffsetForPartitionException");
} catch (NoOffsetForPartitionException nofpe) {
// let it go.
}
}
private long positionMain(TopicPartition partition, Duration timeout) {
// Not handling large message here. The position will be actual position.
while (true) { // In kafka 0.10.x we can get an unbounded number of invalid offset exception
try {
if (timeout == null) {
return _kafkaConsumer.position(partition);
} else {
return _kafkaConsumer.position(partition, timeout);
}
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) {
handleInvalidOffsetException(oe);
}
}
}
/**
* Handle when Kafka consumer throws NoOffsetForPartitionException. The base behavior is to seek to start position
* for all partitions.
* @param e the Exception
*/
protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e) {
_logger.info("Poll threw NoOffsetForPartitionException for partitions {}.", e.partitions());
if (!_shutdown) {
// Seek to start position, by default we are starting from latest one as we just start consumption
seekToStartPosition(_consumer, e.partitions(),
_consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, CONSUMER_AUTO_OFFSET_RESET_CONFIG_LATEST));
}
}
@Test
@SuppressWarnings("unchecked")
public void whenReadEventsThenNakadiRuntimeBaseException() {
// ARRANGE //
final ImmutableList<RuntimeException> exceptions = ImmutableList.of(new NoOffsetForPartitionException(
new TopicPartition("", 0)), new KafkaException());
int numberOfNakadiRuntimeBaseExceptions = 0;
for (final Exception exception : exceptions) {
final KafkaConsumer<byte[], byte[]> kafkaConsumerMock = mock(KafkaConsumer.class);
when(kafkaConsumerMock.poll(POLL_TIMEOUT)).thenThrow(exception);
try {
// ACT //
final NakadiKafkaConsumer consumer = new NakadiKafkaConsumer(kafkaConsumerMock,
ImmutableList.of(), createTpTimelineMap(), POLL_TIMEOUT);
consumer.readEvents();
// ASSERT //
fail("An Exception was expected to be be thrown");
} catch (final Exception e) {
numberOfNakadiRuntimeBaseExceptions++;
}
}
assertThat("We should get a NakadiBaseException for every call",
numberOfNakadiRuntimeBaseExceptions,
equalTo(exceptions.size()));
}
private ConsumerRecords<K, V> poll(long timeout, boolean includeMetadataInTimeout) {
ConsumerRecords<K, V> processedRecords;
// We will keep polling until timeout.
long now = System.currentTimeMillis();
long deadline = now + timeout;
do {
ConsumerRecordsProcessingException crpe;
// throw exception to user if the current active (un-paused) topic-partitions has exceptions
Set<TopicPartition> unPausedTopicPartitions = new HashSet<>(_kafkaConsumer.assignment());
unPausedTopicPartitions.removeAll(_kafkaConsumer.paused());
crpe = handleRecordProcessingException(unPausedTopicPartitions);
if (crpe != null) {
throw crpe;
}
if (_autoCommitEnabled && now > _lastAutoCommitMs + _autoCommitInterval) {
commitAsync();
_lastAutoCommitMs = now;
}
ConsumerRecords<byte[], byte[]> rawRecords = ConsumerRecords.empty();
try {
if (includeMetadataInTimeout) {
rawRecords = _kafkaConsumer.poll(Duration.ofMillis(deadline - now));
} else {
rawRecords = _kafkaConsumer.poll(deadline - now);
}
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) {
handleInvalidOffsetException(oe);
}
_lastProcessedResult = _consumerRecordsProcessor.process(rawRecords);
processedRecords = _lastProcessedResult.consumerRecords();
// Clear the internal reference.
_lastProcessedResult.clearRecords();
// Rewind offset if there are processing exceptions.
seekToCurrentOffsetsOnRecordProcessingExceptions();
// this is an optimization
// if no records were processed try to throw exception in current poll()
if (processedRecords.isEmpty()) {
crpe = handleRecordProcessingException(null);
if (crpe != null) {
throw crpe;
}
}
now = System.currentTimeMillis();
} while (processedRecords.isEmpty() && now < deadline);
return processedRecords;
}
/**
* This method handles the OffsetResetStrategy="LICLOSEST" offset reset strategy.
*
* The semantics of this strategy is defined as follows:
* Consumer will {@link #seekToBeginning(Collection)} when InvalidOffsetException occurs due to:
* 1. New Consumer / Expired Commit Offset
* 2. Fall-off Start (fetch offset < LSO)
*
* Consumer will {@link #seekToEnd(Collection)} when InvalidOffsetException occurs due to:
* 3a. Fall-off End (fetch offset > LEO): Consumer will seek to the end
* 3b. Fall-off End (fetch offset <= LEO): Consumer will seek to the fetched offset
*
* Note: Offset to which we reset may not necessarily be a safe offset. This method invokes 2 blocking calls and does
* ignore large-message tracking metadata. If we are unable to calculate the bounds, it will throw an
* IllegalStateException.
*
* Design details can be found here - https://docs.google.com/document/d/1zKGXxZiyiRkLJ_d0FCoGALfAo0N7k3hh9NFYhJrbPsw/edit#
* @param oe InvalidOffsetException
*/
private void handleLiClosestResetStrategy(InvalidOffsetException oe) {
if (oe instanceof NoOffsetForPartitionException) { // Case 1
LOG.info("No valid offsets found. Rewinding to the earliest");
seekToBeginning(oe.partitions());
} else if (oe instanceof OffsetOutOfRangeException) {
Map<TopicPartition, Long> seekBeginningPartitions = new HashMap<>();
Map<TopicPartition, Long> seekEndPartitions = new HashMap<>();
Map<TopicPartition, Long> seekFetchedOffsetPartitions = new HashMap<>();
Set<TopicPartition> boundsUnknownPartitions = new HashSet<>();
Map<TopicPartition, Long> beginningOffsets = beginningOffsets(oe.partitions());
Map<TopicPartition, Long> endOffsets = endOffsets(oe.partitions());
((OffsetOutOfRangeException) oe).offsetOutOfRangePartitions().forEach((tp, fetchedOffset) -> {
long beginningOffset = beginningOffsets.getOrDefault(tp, -1L);
long endOffset = endOffsets.getOrDefault(tp, -1L);
if (beginningOffset != -1L && endOffset != -1L) {
if (beginningOffset > fetchedOffset) { // Case 2
seekBeginningPartitions.put(tp, beginningOffset);
return;
}
if (endOffset < fetchedOffset) { // Case 3a
LOG.debug("Closest offset computed for topic partition {} is the log end offset {}. ", tp, fetchedOffset);
seekEndPartitions.put(tp, endOffset);
} else { // Case 3b: endOffset >= fetchedOffset
LOG.debug("Closest offset computed for topic partition {} is the fetched offset {}. ", tp, fetchedOffset);
seekFetchedOffsetPartitions.put(tp, fetchedOffset);
}
} else {
// can't handle reset if the either bound values are not known
// ideally, this should never happen since the listoffsets protocol always returns all requested offset or none
boundsUnknownPartitions.add(tp);
}
});
if (!boundsUnknownPartitions.isEmpty()) {
throw new IllegalStateException("Couldn't figure out the closest offset for these topic partitions " +
boundsUnknownPartitions + "Aborting..");
}
if (!seekBeginningPartitions.isEmpty()) {
LOG.info("Offsets are out of range for partitions {}. Seeking to the beginning offsets returned", seekBeginningPartitions);
seekBeginningPartitions.forEach(this::seekAndClear);
}
if (!seekEndPartitions.isEmpty()) {
LOG.info("Offsets are out of range for partitions {}. Seeking to the end offsets returned", seekEndPartitions);
seekEndPartitions.forEach(this::seekAndClear);
}
if (!seekFetchedOffsetPartitions.isEmpty()) {
LOG.info("Seeking to fetched offsets for topic partitions {}. This may indicate a potential loss of data.",
seekFetchedOffsetPartitions.keySet());
seekFetchedOffsetPartitions.forEach(this::seekAndClear);
}
} else {
throw oe;
}
}