下面列出了org.apache.kafka.clients.consumer.CommitFailedException#org.apache.kafka.common.errors.WakeupException 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public Status consume() throws WakeupException {
if (isClosed()) {
return new FailureStatus(new RuntimeException("Message consumer was closed"));
}
Instant startTime = Instant.now();
ConsumerRecords<byte[], byte[]> messages = kafkaConsumer.poll(timeoutMillis);
statsClient.count("kafka.consumer.poll.messages", messages.count());
statsClient.timeIt("kafka.consumer.consumption.time", startTime);
if (messages.isEmpty()) {
return SUCCESS_STATUS;
}
Instant pollTime = Instant.now();
log.info("Pulled {} messages", messages.count());
Status status = pushToSink(messages, pollTime);
return status;
}
public List<SimulateData> readOutPut(String bootStrapServers, String maxRecords, String windowTime) {
KafkaConsumer kafkaConsumer = kafkaUtils.kafkaConsumer("latest", bootStrapServers, "simulate");
log.info("Subscribe Topic for {}", SIMULATE_OUTPUT);
kafkaConsumer.subscribe(Arrays.asList(SIMULATE_OUTPUT), new Rebalancer());
List<SimulateData> res = new ArrayList<>();
long start = System.currentTimeMillis();
try {
while (checkWindow(start, Long.valueOf(windowTime), res.size(), Long.valueOf(maxRecords))) {
ConsumerRecords<String, SimulateData> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, SimulateData> record : records) {
res.add(record.value());
}
log.info("Number item for read OutPut {}", res.size());
kafkaConsumer.commitSync();
}
} catch (WakeupException e) {
// Ignore exception if closing
throw e;
} catch (RuntimeException re) {
log.error("RuntimeException {}", re);
} finally {
kafkaConsumer.close();
}
return res;
}
public ConsumerRecords<String, String> extractDataFromKafka(String topic, long duration, TimeUnit timeUnit) {
long pollingTime = timeUnit.toMillis(duration);
log.info("Capture data during {} ms on topic {}", pollingTime, topic);
kafkaConsumer.subscribe(Arrays.asList(topic));
try {
return kafkaConsumer.poll(pollingTime);
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} catch (RuntimeException re) {
log.error("RuntimeException {}", re);
} finally {
if (closed.get()) {
kafkaConsumer.close();
}
return null;
}
}
@Override
public List<SourceRecord> poll() {
try {
logger.trace("Calling poll");
ConsumerRecords<byte[], byte[]> result = consumer.poll(consumerPollTimeoutMillis);
logger.trace("Got {} records", result.count());
if (!result.isEmpty()) {
return sourceRecords(result);
} else {
return Collections.emptyList();
}
} catch (WakeupException e) {
// Ignore exception iff shutting down thread.
if (!shutDown.get()) throw e;
}
shutDownTask();
return Collections.emptyList();
}
@Override
public void run() {
try {
kafkaConsumer.subscribe(sourceTopics);
while (true) {
var records = kafkaConsumer.poll(ofMillis(pollDurationMs));
for (ConsumerRecord<String, byte[]> record : records) {
List<Span> spans = decoder.decodeList(record.value());
spans.stream()
.filter(validator::isSpanValid)
.peek(span -> spansCounter.increment())
.forEach(fork::processSpan);
}
}
} catch (WakeupException exception) {
// ignore for shutdown
} finally {
kafkaConsumer.close();
}
}
@Override
public void run() {
try {
consumer.subscribe(topics);
log.info("Consumer successfully subscribed to topics {}", topics);
ConsumerRecords<Integer, String> records = consumer.poll(Long.MAX_VALUE);
records.forEach(record -> {
log.info("Received record with a key of {} and a value of {}", record.key(), record.value());
});
} catch (WakeupException e) {
// Ignore
} finally {
consumer.commitSync();
log.info("Consumer for topics {} temporarily closed", topics);
this.run();
}
}
/**
* Runnable interface implement.<BR/>
*/
@Override
public void run() {
log.debug("The handler:{} thread started.", id);
try {
subscribe(getTopicList());
while (!closed.get()) {
ConsumerRecords<String, String> records = getMessage();
log.debug("records count: {}", records.count());
if(records == null || records.isEmpty())
continue;
handle(records);
}
} catch (WakeupException e) {
log.error(e.getMessage());
//Ignore exception if closing
if(!closed.get())
throw e;
} finally {
this.close();
}
log.debug("The handler:{} thread ended.", id);
}
/**
* consumer handle process
*/
public void process() {
try {
subscribe(getTopicList());
while (!closed.get()) {
ConsumerRecords<K, V> records = getMessage();
log.debug("records count: {}", records.count());
if (records == null || records.isEmpty())
continue;
handle(records);
}
} catch (WakeupException e) {
log.error(e.getMessage());
//Ignore exception if closing
if (!closed.get())
throw e;
} finally {
this.close();
}
}
@Override
public void run() {
try {
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic + "." + version));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
String json = new String(record.value());
process(json);
}
}
} catch (WakeupException e) {
System.out.println("Consumer " + props.getProperty("client.id") + " from group " + props.getProperty("group.id") + " on topic " + topic + "." + version + " was waken up.");
} finally {
System.out.println("Consumer " + props.getProperty("client.id") + " from group " + props.getProperty("group.id") + " on topic " + topic + "." + version + " is being terminated.");
consumer.close();
}
}
/**
* Runnable interface implement.<BR/>
*/
@Override
public void run() {
log.debug("The handler:{} thread started.", id);
try {
subscribe(getTopicList());
while (!closed.get()) {
ConsumerRecords<String, String> records = getMessage();
log.debug("records count: {}", records.count());
if(records == null || records.isEmpty())
continue;
handle(records);
}
} catch (WakeupException e) {
log.error(e.getMessage());
//Ignore exception if closing
if(!closed.get())
throw e;
} finally {
this.close();
}
log.debug("The handler:{} thread ended.", id);
}
/**
* consumer handle process
*/
public void process() {
try {
subscribe(getTopicList());
while (!closed.get()) {
ConsumerRecords<K, V> records = getMessage();
log.debug("records count: {}", records.count());
if (records == null || records.isEmpty())
continue;
handle(records);
}
} catch (WakeupException e) {
log.error(e.getMessage());
//Ignore exception if closing
if (!closed.get())
throw e;
} finally {
this.close();
}
}
@Override
public void close() {
try {
if(!isShutdown.get()) {
super.close();
isShutdown.set(true);
}
}
catch(WakeupException we) {
//see https://issues.apache.org/jira/browse/STORM-2184
LOG.warn("You can generally ignore these, as per https://issues.apache.org/jira/browse/STORM-2184 -- {}", we.getMessage(), we);
}
catch(IllegalStateException ise) {
if(ise.getMessage().contains("This consumer has already been closed")) {
LOG.warn(ise.getMessage());
}
else {
throw ise;
}
}
}
@Override
public void run() {
try {
LOG.info("Configuring KafkaConsumer on thread: {} to subscribe to topic: {}", threadNumber, topic);
consumer.subscribe(Arrays.asList(topic));
while (!closed.get()) {
final ConsumerRecords<String, CommandNotification> records = consumer.poll(10000);
// Handle new records
for(final ConsumerRecord<String, CommandNotification> record: records) {
final CommandNotification notification = record.value();
LOG.info("Thread {} is adding notification: {}", threadNumber, notification);
coord.processNextCommandNotification(notification);
}
}
LOG.info("Finished polling.");
} catch (final WakeupException e) {
// Ignore exception if closing
if (!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
}
private void consume(Runnable beforePollingTask) {
try {
beforePollingTask.run();
while (true) {
ConsumerRecords<String, Integer> records = consumer.poll(Duration.ofMillis(1000));
StreamSupport.stream(records.spliterator(), false)
.map(record -> new CountryPopulation(record.key(), record.value()))
.forEach(countryPopulationConsumer);
consumer.commitSync();
}
} catch (WakeupException e) {
logger.info("Shutting down...");
} catch (RuntimeException ex) {
exceptionConsumer.accept(ex);
} finally {
consumer.close();
}
}
@Test
public void shouldStopConsumptionWhenWakeupExceptionIsThrown() throws InterruptedException {
Worker worker = new ConsumerWorker("consumer", consumer, new WorkerState());
doThrow(new WakeupException()).when(consumer).consume();
new Thread(worker).start();
Thread.sleep(100);
verify(consumer).consume();
verify(consumer).close();
}
private void commitSync() {
Map<TopicPartition, OffsetAndMetadata> offsets = offsetsState.getOffsetsToCommit();
logger.debug("committing offsets sync: {}", offsets);
if (!offsets.isEmpty()) {
try {
consumer.commitSync(offsets);
} catch (WakeupException e) {
// this has to be repeated if consumer.wakeup() during thread shutdown hasn't woken up any pending poll
// operation
consumer.commitSync(offsets);
}
}
}
@Test
public void testProcessWakeException() {
when(kafkaConsumer.poll(anyLong())).thenThrow(WakeupException.class);
assertFalse(anomalyConsumer.process(kafkaConsumer, true));
verify(kafkaConsumer, times(1)).close();
}
/**
* Begin a continuous poll-execute loop for the command topic, stopping only if either a
* {@link WakeupException} is thrown or the {@link #close()} method is called.
*/
@Override
public void run() {
try {
while (!closed.get()) {
log.debug("Polling for new writes to command topic");
fetchAndRunCommands();
}
} catch (WakeupException wue) {
if (!closed.get()) {
throw wue;
}
}
}
private void executeStatement(Command command, CommandId commandId) {
log.info("Executing statement: " + command.getStatement());
try {
statementExecutor.handleStatement(command, commandId);
} catch (WakeupException wue) {
throw wue;
} catch (Exception exception) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter);
exception.printStackTrace(printWriter);
log.error("Exception encountered during poll-parse-execute loop: " + stringWriter.toString());
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ConsumerPool pool = getConsumerPool(context);
if (pool == null) {
context.yield();
return;
}
try (final ConsumerLease lease = pool.obtainConsumer(session)) {
if (lease == null) {
context.yield();
return;
}
activeLeases.add(lease);
try {
while (this.isScheduled() && lease.continuePolling()) {
lease.poll();
}
if (this.isScheduled() && !lease.commit()) {
context.yield();
}
} catch (final WakeupException we) {
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
+ "Will roll back session and discard any partially received data.", new Object[] {lease});
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
new Object[]{lease, kex}, kex);
} catch (final Throwable t) {
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
new Object[]{lease, t}, t);
} finally {
activeLeases.remove(lease);
}
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ConsumerPool pool = getConsumerPool(context);
if (pool == null) {
context.yield();
return;
}
try (final ConsumerLease lease = pool.obtainConsumer(session)) {
if (lease == null) {
context.yield();
return;
}
activeLeases.add(lease);
try {
while (this.isScheduled() && lease.continuePolling()) {
lease.poll();
}
if (this.isScheduled() && !lease.commit()) {
context.yield();
}
} catch (final WakeupException we) {
getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
+ "Will roll back session and discard any partially received data.", new Object[] {lease});
} catch (final KafkaException kex) {
getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
new Object[] {lease, kex}, kex);
} catch (final Throwable t) {
getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
new Object[] {lease, t}, t);
} finally {
activeLeases.remove(lease);
}
}
}
private void fetch() {
if (!canUseTopicConfigForListener()) {
Logger.error("Could not start consumer. Topic config is invalid.");
return;
}
final KafkaTopicConfig topicConfig = listenerConfig.getRelatedConfig();
try {
receivedMessagesCount = 0;
receivedMessageLimit = Integer.parseInt(listenerConfig.getReceivedMsgLimitCount());
tryFetch(topicConfig);
} catch (WakeupException ignored) {
Logger.trace("Closing consumer due to wakeup()");
closeConsumer();
} catch (Throwable t) {
Logger.error("Exception for fetch()", t);
} finally {
if (isRunning.get()) {
Logger.info(String.format("Consumer stopped (topic:%s, consumer group:%s)", topicConfig.getTopicName(),
listenerConfig.getConsumerGroup()));
}
shouldBeRunning.set(false);
isRunning.set(false);
}
}
@Override
public void run() {
try {
while (!closed.get()) {
consume();
}
} catch (WakeupException e) {
// will wakeup for closing
} finally {
consumer.close();
}
}
@Override
public void run() {
try {
while (!closed.get()) {
consume();
}
} catch (WakeupException e) {
// will wakeup for closing
} finally {
consumer.close();
}
}
@Override
public void run() {
try {
while (!closed.get()) {
consume();
}
} catch (WakeupException e) {
// will wakeup for closing
} finally {
consumer.close();
}
}
@Override
public void run() {
try {
while (!closed.get()) {
consume();
}
} catch (WakeupException e) {
// will wakeup for closing
} finally {
consumer.close();
}
}
private void pollRecords(Handler<ConsumerRecords<K, V>> handler) {
if(this.polling.compareAndSet(false, true)){
this.worker.submit(() -> {
boolean submitted = false;
try {
if (!this.closed.get()) {
try {
ConsumerRecords<K, V> records = this.consumer.poll(pollTimeout);
if (records != null && records.count() > 0) {
submitted = true; // sets false only when the iterator is overwritten
this.context.runOnContext(v -> {
this.polling.set(false);
handler.handle(records);
});
}
} catch (WakeupException ignore) {
} catch (Exception e) {
if (exceptionHandler != null) {
exceptionHandler.handle(e);
}
}
}
} finally {
if(!submitted){
this.context.runOnContext(v -> {
this.polling.set(false);
schedule(0);
});
}
}
});
}
}
@Override
public void poll(final Duration timeout, final Handler<AsyncResult<ConsumerRecords<K, V>>> handler) {
this.worker.submit(() -> {
if (!this.closed.get()) {
try {
ConsumerRecords<K, V> records = this.consumer.poll(timeout);
this.context.runOnContext(v -> handler.handle(Future.succeededFuture(records)));
} catch (WakeupException ignore) {
this.context.runOnContext(v -> handler.handle(Future.succeededFuture(ConsumerRecords.empty())));
} catch (Exception e) {
this.context.runOnContext(v -> handler.handle(Future.failedFuture(e)));
}
}
});
}
@Override
public void start() throws Exception {
new Thread(() -> {
try {
consumer.subscribe(Arrays.asList(APPLICATION_EVENT_STREAM));
while (!closed.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
try {
EventEnvelope envelope =
objectMapper.readValue(record.value(),
EventEnvelope.class);
envelope.eventId = Optional.of(record.key());
super.setChanged();
super.notifyObservers(envelope);
consumer.commitSync();
} catch (Exception ex) {
LOG.error("Can not process record", ex);
}
}
}
} catch (WakeupException e) {
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}).start();
}
@Override
public void start() throws Exception {
new Thread(() -> {
try {
List<String> actionTopics = resolver.getSupportedActions().stream()
.map(s -> Constants.COMMAND_TOPIC_PREFIX + s)
.collect(Collectors.toList());
consumer.subscribe(actionTopics);
LOG.info("Subscribed for [" + actionTopics + "]");
while (!closed.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
LOG.debug("Received record [" + record + "] from [" + record.topic() + "]");
String action = record.topic().replace(Constants.COMMAND_TOPIC_PREFIX, "");
try {
LOG.debug("Handling action [" + action + "]");
handleAction(action, record.value());
consumer.commitSync();
} catch (Exception ex) {
handleException(action, ex);
}
}
}
} catch (WakeupException e) {
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}).start();
}