org.apache.kafka.clients.consumer.CommitFailedException#org.apache.kafka.common.errors.WakeupException源码实例Demo

下面列出了org.apache.kafka.clients.consumer.CommitFailedException#org.apache.kafka.common.errors.WakeupException 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: beast   文件: MessageConsumer.java
public Status consume() throws WakeupException {
    if (isClosed()) {
        return new FailureStatus(new RuntimeException("Message consumer was closed"));
    }
    Instant startTime = Instant.now();
    ConsumerRecords<byte[], byte[]> messages = kafkaConsumer.poll(timeoutMillis);
    statsClient.count("kafka.consumer.poll.messages", messages.count());
    statsClient.timeIt("kafka.consumer.consumption.time", startTime);
    if (messages.isEmpty()) {
        return SUCCESS_STATUS;
    }
    Instant pollTime = Instant.now();
    log.info("Pulled {} messages", messages.count());
    Status status = pushToSink(messages, pollTime);
    return status;
}
 
源代码2 项目: SkaETL   文件: SimulateResultService.java
public List<SimulateData> readOutPut(String bootStrapServers, String maxRecords, String windowTime) {
    KafkaConsumer kafkaConsumer = kafkaUtils.kafkaConsumer("latest", bootStrapServers, "simulate");
    log.info("Subscribe Topic for {}", SIMULATE_OUTPUT);
    kafkaConsumer.subscribe(Arrays.asList(SIMULATE_OUTPUT), new Rebalancer());
    List<SimulateData> res = new ArrayList<>();
    long start = System.currentTimeMillis();
    try {
        while (checkWindow(start, Long.valueOf(windowTime), res.size(), Long.valueOf(maxRecords))) {
            ConsumerRecords<String, SimulateData> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, SimulateData> record : records) {
                res.add(record.value());
            }
            log.info("Number item for read OutPut {}", res.size());
            kafkaConsumer.commitSync();
        }
    } catch (WakeupException e) {
        // Ignore exception if closing
        throw e;
    } catch (RuntimeException re) {
        log.error("RuntimeException {}", re);
    } finally {
        kafkaConsumer.close();
    }
    return res;
}
 
源代码3 项目: SkaETL   文件: KafkaService.java
public ConsumerRecords<String, String> extractDataFromKafka(String topic, long duration, TimeUnit timeUnit) {
    long pollingTime = timeUnit.toMillis(duration);
    log.info("Capture data during {} ms on topic {}", pollingTime, topic);
    kafkaConsumer.subscribe(Arrays.asList(topic));
    try {
        return kafkaConsumer.poll(pollingTime);
    } catch (WakeupException e) {
        // Ignore exception if closing
        if (!closed.get()) throw e;
    } catch (RuntimeException re) {
        log.error("RuntimeException {}", re);
    } finally {
        if (closed.get()) {
            kafkaConsumer.close();
        }
        return null;
    }
}
 
源代码4 项目: mirus   文件: MirusSourceTask.java
@Override
public List<SourceRecord> poll() {

  try {
    logger.trace("Calling poll");
    ConsumerRecords<byte[], byte[]> result = consumer.poll(consumerPollTimeoutMillis);
    logger.trace("Got {} records", result.count());
    if (!result.isEmpty()) {
      return sourceRecords(result);
    } else {
      return Collections.emptyList();
    }
  } catch (WakeupException e) {
    // Ignore exception iff shutting down thread.
    if (!shutDown.get()) throw e;
  }

  shutDownTask();
  return Collections.emptyList();
}
 
源代码5 项目: pitchfork   文件: KafkaConsumerLoop.java
@Override
public void run() {
    try {
        kafkaConsumer.subscribe(sourceTopics);

        while (true) {
            var records = kafkaConsumer.poll(ofMillis(pollDurationMs));

            for (ConsumerRecord<String, byte[]> record : records) {
                List<Span> spans = decoder.decodeList(record.value());

                spans.stream()
                        .filter(validator::isSpanValid)
                        .peek(span -> spansCounter.increment())
                        .forEach(fork::processSpan);
            }
        }
    } catch (WakeupException exception) {
        // ignore for shutdown
    } finally {
        kafkaConsumer.close();
    }
}
 
@Override
public void run() {
    try {
        consumer.subscribe(topics);

        log.info("Consumer successfully subscribed to topics {}", topics);

        ConsumerRecords<Integer, String> records = consumer.poll(Long.MAX_VALUE);
        records.forEach(record -> {
            log.info("Received record with a key of {} and a value of {}", record.key(), record.value());
        });
    } catch (WakeupException e) {
        // Ignore
    } finally {
        consumer.commitSync();
        log.info("Consumer for topics {} temporarily closed", topics);
        this.run();
    }
}
 
源代码7 项目: SO   文件: AGenericConsumerHandler.java
/**
 * Runnable interface implement.<BR/>
 */
@Override
public void run() {
    log.debug("The handler:{} thread started.", id);
    try {
        subscribe(getTopicList());
        while (!closed.get()) {
            ConsumerRecords<String, String> records = getMessage();
            log.debug("records count: {}", records.count());
            if(records == null || records.isEmpty())
                continue;
            handle(records);
        }
    } catch (WakeupException e) {
        log.error(e.getMessage());
        //Ignore exception if closing
        if(!closed.get())
            throw e;
    } finally {
        this.close();
    }
    log.debug("The handler:{} thread ended.", id);
}
 
源代码8 项目: SO   文件: AGenericConsumerHandler2.java
/**
 * consumer handle process
 */
public void process() {
    try {
        subscribe(getTopicList());
        while (!closed.get()) {
            ConsumerRecords<K, V> records = getMessage();
            log.debug("records count: {}", records.count());
            if (records == null || records.isEmpty())
                continue;
            handle(records);
        }
    } catch (WakeupException e) {
        log.error(e.getMessage());
        //Ignore exception if closing
        if (!closed.get())
            throw e;
    } finally {
        this.close();
    }
}
 
源代码9 项目: OpenIoE   文件: ConsumerLoopGeneric.java
@Override
public void run() {
	try {
		this.consumer = new KafkaConsumer<>(props);

		consumer.subscribe(Arrays.asList(topic + "." + version));
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
			for (ConsumerRecord<String, String> record : records) {
				String json = new String(record.value());
				process(json);
			}
		}
	} catch (WakeupException e) {
		System.out.println("Consumer " + props.getProperty("client.id") + " from group " + props.getProperty("group.id") + " on topic " + topic + "." + version + " was waken up.");
	} finally {
		System.out.println("Consumer " + props.getProperty("client.id") + " from group " + props.getProperty("group.id") + " on topic " + topic + "." + version + " is being terminated.");
		consumer.close();
	}
}
 
源代码10 项目: SO   文件: AGenericConsumerHandler.java
/**
 * Runnable interface implement.<BR/>
 */
@Override
public void run() {
    log.debug("The handler:{} thread started.", id);
    try {
        subscribe(getTopicList());
        while (!closed.get()) {
            ConsumerRecords<String, String> records = getMessage();
            log.debug("records count: {}", records.count());
            if(records == null || records.isEmpty())
                continue;
            handle(records);
        }
    } catch (WakeupException e) {
        log.error(e.getMessage());
        //Ignore exception if closing
        if(!closed.get())
            throw e;
    } finally {
        this.close();
    }
    log.debug("The handler:{} thread ended.", id);
}
 
源代码11 项目: SO   文件: AGenericConsumerHandler2.java
/**
 * consumer handle process
 */
public void process() {
    try {
        subscribe(getTopicList());
        while (!closed.get()) {
            ConsumerRecords<K, V> records = getMessage();
            log.debug("records count: {}", records.count());
            if (records == null || records.isEmpty())
                continue;
            handle(records);
        }
    } catch (WakeupException e) {
        log.error(e.getMessage());
        //Ignore exception if closing
        if (!closed.get())
            throw e;
    } finally {
        this.close();
    }
}
 
源代码12 项目: metron   文件: StormKafkaSpout.java
@Override
public void close() {
  try {
    if(!isShutdown.get()) {
      super.close();
      isShutdown.set(true);
    }
  }
  catch(WakeupException we) {
    //see https://issues.apache.org/jira/browse/STORM-2184
    LOG.warn("You can generally ignore these, as per https://issues.apache.org/jira/browse/STORM-2184 -- {}", we.getMessage(), we);
  }
  catch(IllegalStateException ise) {
    if(ise.getMessage().contains("This consumer has already been closed")) {
      LOG.warn(ise.getMessage());
    }
    else {
      throw ise;
    }
  }
}
 
源代码13 项目: rya   文件: PeriodicNotificationConsumer.java
@Override
public void run() {

    try {
        LOG.info("Configuring KafkaConsumer on thread: {} to subscribe to topic: {}", threadNumber, topic);
        consumer.subscribe(Arrays.asList(topic));
        while (!closed.get()) {
            final ConsumerRecords<String, CommandNotification> records = consumer.poll(10000);
            // Handle new records
            for(final ConsumerRecord<String, CommandNotification> record: records) {
                final CommandNotification notification = record.value();
                LOG.info("Thread {} is adding notification: {}", threadNumber, notification);
                coord.processNextCommandNotification(notification);
            }
        }
        LOG.info("Finished polling.");
    } catch (final WakeupException e) {
        // Ignore exception if closing
        if (!closed.get()) {
            throw e;
        }
    } finally {
        consumer.close();
    }
}
 
源代码14 项目: tutorials   文件: CountryPopulationConsumer.java
private void consume(Runnable beforePollingTask) {
    try {
        beforePollingTask.run();
        while (true) {
            ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(1000));
            StreamSupport.stream(records.spliterator(), false)
                .map(record -> new CountryPopulation(record.key(), record.value()))
                .forEach(countryPopulationConsumer);
            consumer.commitSync();
        }
    } catch (WakeupException e) {
        logger.info("Shutting down...");
    } catch (RuntimeException ex) {
        exceptionConsumer.accept(ex);
    } finally {
        consumer.close();
    }
}
 
源代码15 项目: beast   文件: ConsumerWorkerTest.java
@Test
public void shouldStopConsumptionWhenWakeupExceptionIsThrown() throws InterruptedException {
    Worker worker = new ConsumerWorker("consumer", consumer, new WorkerState());
    doThrow(new WakeupException()).when(consumer).consume();

    new Thread(worker).start();

    Thread.sleep(100);
    verify(consumer).consume();
    verify(consumer).close();
}
 
源代码16 项目: kafka-workers   文件: ConsumerThread.java
private void commitSync() {
    Map<TopicPartition, OffsetAndMetadata> offsets = offsetsState.getOffsetsToCommit();
    logger.debug("committing offsets sync: {}", offsets);
    if (!offsets.isEmpty()) {
        try {
            consumer.commitSync(offsets);
        } catch (WakeupException e) {
            // this has to be repeated if consumer.wakeup() during thread shutdown hasn't woken up any pending poll
            // operation
            consumer.commitSync(offsets);
        }
    }
}
 
源代码17 项目: adaptive-alerting   文件: AnomalyConsumerTest.java
@Test
public void testProcessWakeException() {
    when(kafkaConsumer.poll(anyLong())).thenThrow(WakeupException.class);
    assertFalse(anomalyConsumer.process(kafkaConsumer, true));
    verify(kafkaConsumer, times(1)).close();

}
 
/**
 * Begin a continuous poll-execute loop for the command topic, stopping only if either a
 * {@link WakeupException} is thrown or the {@link #close()} method is called.
 */
@Override
public void run() {
  try {
    while (!closed.get()) {
      log.debug("Polling for new writes to command topic");
      fetchAndRunCommands();
    }
  } catch (WakeupException wue) {
    if (!closed.get()) {
      throw wue;
    }
  }
}
 
private void executeStatement(Command command, CommandId commandId) {
  log.info("Executing statement: " + command.getStatement());
  try {
    statementExecutor.handleStatement(command, commandId);
  } catch (WakeupException wue) {
    throw wue;
  } catch (Exception exception) {
    StringWriter stringWriter = new StringWriter();
    PrintWriter printWriter = new PrintWriter(stringWriter);
    exception.printStackTrace(printWriter);
    log.error("Exception encountered during poll-parse-execute loop: " + stringWriter.toString());
  }
}
 
源代码20 项目: localization_nifi   文件: ConsumeKafka_0_10.java
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    final ConsumerPool pool = getConsumerPool(context);
    if (pool == null) {
        context.yield();
        return;
    }

    try (final ConsumerLease lease = pool.obtainConsumer(session)) {
        if (lease == null) {
            context.yield();
            return;
        }

        activeLeases.add(lease);
        try {
            while (this.isScheduled() && lease.continuePolling()) {
                lease.poll();
            }
            if (this.isScheduled() && !lease.commit()) {
                context.yield();
            }
        } catch (final WakeupException we) {
            getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
                + "Will roll back session and discard any partially received data.", new Object[] {lease});
        } catch (final KafkaException kex) {
            getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
                    new Object[]{lease, kex}, kex);
        } catch (final Throwable t) {
            getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
                    new Object[]{lease, t}, t);
        } finally {
            activeLeases.remove(lease);
        }
    }
}
 
源代码21 项目: localization_nifi   文件: ConsumeKafka.java
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    final ConsumerPool pool = getConsumerPool(context);
    if (pool == null) {
        context.yield();
        return;
    }

    try (final ConsumerLease lease = pool.obtainConsumer(session)) {
        if (lease == null) {
            context.yield();
            return;
        }

        activeLeases.add(lease);
        try {
            while (this.isScheduled() && lease.continuePolling()) {
                lease.poll();
            }
            if (this.isScheduled() && !lease.commit()) {
                context.yield();
            }
        } catch (final WakeupException we) {
            getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
                + "Will roll back session and discard any partially received data.", new Object[] {lease});
        } catch (final KafkaException kex) {
            getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
                new Object[] {lease, kex}, kex);
        } catch (final Throwable t) {
            getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
                new Object[] {lease, t}, t);
        } finally {
            activeLeases.remove(lease);
        }
    }
}
 
源代码22 项目: kafka-message-tool   文件: DefaultKafkaListener.java
private void fetch() {
    if (!canUseTopicConfigForListener()) {
        Logger.error("Could not start consumer. Topic config is invalid.");
        return;
    }

    final KafkaTopicConfig topicConfig = listenerConfig.getRelatedConfig();
    try {
        receivedMessagesCount = 0;
        receivedMessageLimit = Integer.parseInt(listenerConfig.getReceivedMsgLimitCount());
        tryFetch(topicConfig);

    } catch (WakeupException ignored) {
        Logger.trace("Closing consumer due to wakeup()");
        closeConsumer();

    } catch (Throwable t) {
        Logger.error("Exception for fetch()", t);
    } finally {
        if (isRunning.get()) {
            Logger.info(String.format("Consumer stopped (topic:%s, consumer group:%s)", topicConfig.getTopicName(),
                    listenerConfig.getConsumerGroup()));
        }
        shouldBeRunning.set(false);
        isRunning.set(false);
    }
}
 
源代码23 项目: scalable-coffee-shop   文件: EventConsumer.java
@Override
public void run() {
    try {
        while (!closed.get()) {
            consume();
        }
    } catch (WakeupException e) {
        // will wakeup for closing
    } finally {
        consumer.close();
    }
}
 
源代码24 项目: scalable-coffee-shop   文件: EventConsumer.java
@Override
public void run() {
    try {
        while (!closed.get()) {
            consume();
        }
    } catch (WakeupException e) {
        // will wakeup for closing
    } finally {
        consumer.close();
    }
}
 
源代码25 项目: scalable-coffee-shop   文件: EventConsumer.java
@Override
public void run() {
    try {
        while (!closed.get()) {
            consume();
        }
    } catch (WakeupException e) {
        // will wakeup for closing
    } finally {
        consumer.close();
    }
}
 
@Override
public void run() {
    try {
        while (!closed.get()) {
            consume();
        }
    } catch (WakeupException e) {
        // will wakeup for closing
    } finally {
        consumer.close();
    }
}
 
源代码27 项目: vertx-kafka-client   文件: KafkaReadStreamImpl.java
private void pollRecords(Handler<ConsumerRecords<K, V>> handler) {
    if(this.polling.compareAndSet(false, true)){
        this.worker.submit(() -> {
           boolean submitted = false;
           try {
              if (!this.closed.get()) {
                try {
                  ConsumerRecords<K, V> records = this.consumer.poll(pollTimeout);
                  if (records != null && records.count() > 0) {
                    submitted = true; // sets false only when the iterator is overwritten
                    this.context.runOnContext(v -> {
                        this.polling.set(false);
                        handler.handle(records);
                    });
                  }
                } catch (WakeupException ignore) {
                } catch (Exception e) {
                  if (exceptionHandler != null) {
                    exceptionHandler.handle(e);
                  }
                }
              }
           } finally {
               if(!submitted){
                   this.context.runOnContext(v -> {
                       this.polling.set(false);
                       schedule(0);
                   });
               }
           }
        });
    }
}
 
源代码28 项目: vertx-kafka-client   文件: KafkaReadStreamImpl.java
@Override
public void poll(final Duration timeout, final Handler<AsyncResult<ConsumerRecords<K, V>>> handler) {
  this.worker.submit(() -> {
    if (!this.closed.get()) {
      try {
        ConsumerRecords<K, V> records = this.consumer.poll(timeout);
        this.context.runOnContext(v -> handler.handle(Future.succeededFuture(records)));
      } catch (WakeupException ignore) {
        this.context.runOnContext(v -> handler.handle(Future.succeededFuture(ConsumerRecords.empty())));
      } catch (Exception e) {
        this.context.runOnContext(v -> handler.handle(Future.failedFuture(e)));
      }
    }
  });
}
 
@Override
public void start() throws Exception {
    new Thread(() -> {
        try {
            consumer.subscribe(Arrays.asList(APPLICATION_EVENT_STREAM));

            while (!closed.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        EventEnvelope envelope =
                                objectMapper.readValue(record.value(),
                                EventEnvelope.class);
                        envelope.eventId = Optional.of(record.key());

                        super.setChanged();
                        super.notifyObservers(envelope);

                        consumer.commitSync();
                    } catch (Exception ex) {
                        LOG.error("Can not process record", ex);
                    }
                }
            }
        } catch (WakeupException e) {
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
        }
    }).start();
}
 
@Override
public void start() throws Exception {
    new Thread(() -> {
        try {
            List<String> actionTopics = resolver.getSupportedActions().stream()
                    .map(s -> Constants.COMMAND_TOPIC_PREFIX + s)
                    .collect(Collectors.toList());

            consumer.subscribe(actionTopics);
            LOG.info("Subscribed for [" + actionTopics + "]");

            while (!closed.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    LOG.debug("Received record [" + record + "] from [" + record.topic() + "]");
                    String action = record.topic().replace(Constants.COMMAND_TOPIC_PREFIX, "");
                    try {
                        LOG.debug("Handling action [" + action + "]");
                        handleAction(action, record.value());

                        consumer.commitSync();
                    } catch (Exception ex) {
                        handleException(action, ex);
                    }
                }
            }
        } catch (WakeupException e) {
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
        }
    }).start();
}