下面列出了怎么用org.apache.kafka.common.KafkaException的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void validatePoolConsumerFails() throws Exception {
when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
try {
lease.poll();
fail();
} catch (final KafkaException ke) {
}
}
testPool.close();
verify(mockSession, times(0)).create();
verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
}
@Test
public void validatePoolConsumerFails() throws Exception {
when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
try {
lease.poll();
fail();
} catch (final KafkaException ke) {
}
}
testPool.close();
verify(mockSession, times(0)).create();
verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
}
/**
* Returns the reset offset used in situations where the consumer has no committed offset for a partition, or its committed
* offset is out of range. The returned offset is ensured to be committed, if {@link ProcessingConfig#getCommitInitialOffset()
* allowed} by the configuration.
*
* @return the reset offset
*/
private long getCommittedResetOffset() {
// Get the reset offset
long resetOffset = getResetOffset();
LOGGER.debug("Using reset offset [{}] for partition [{}] as last committed offset", resetOffset, topicPartition);
// Consumer doesn't have an offset so try to commit the offset. This can be helpful for monitoring in case
// there are no messages in the queue or processing is failing
if (config.getCommitInitialOffset()) {
try {
consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(resetOffset)));
} catch (KafkaException e) {
LOGGER.warn("Unable to commit reset offset {} during initialization of partition {} for group {}", resetOffset,
topicPartition, config.getGroupId(), e);
}
}
return resetOffset;
}
@Override
protected void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) throws KafkaException {
super.commitOffsets(offsetsToCommit);
// Record the commit in the history for all applicable records
for (RecordId recordId : recordsToBeCommitted) {
OffsetAndMetadata offset = offsetsToCommit.get(new TopicPartition(recordId.topic, recordId.partition));
if (offset != null && offset.offset() > recordId.offset) {
recordsToBeCommitted.remove(recordId);
// Delay history recording if there is an ack in progress so that we can verify ack/commit order
if (ackInProgress) {
recordsCommittedDuringAck.add(recordId);
} else {
addRecordHistory(recordId, Action.COMMITTED);
}
}
}
}
@Override
public void put(Collection<SinkRecord> collection) {
// Any retriable exception thrown here will be attempted again and not cause the task to pause
for(SinkRecord sinkRecord : collection) {
if (sinkRecord.keySchema() != Schema.OPTIONAL_BYTES_SCHEMA || sinkRecord.valueSchema() != Schema.OPTIONAL_BYTES_SCHEMA)
throw new IllegalStateException("Expected sink record key/value to be optional bytes, but saw instead key: "
+ sinkRecord.keySchema() + " value: " + sinkRecord.valueSchema() + ". Must use converter: " +
"org.apache.kafka.connect.converters.ByteArrayConverter");
LOGGER.debug("Sending record {}", sinkRecord);
try {
producer.send(new ProducerRecord<>(sinkRecord.topic(), sinkRecord.kafkaPartition(), (byte[]) sinkRecord.key(),
(byte[]) sinkRecord.value()));
} catch (KafkaException e) {
// If send throws an exception ensure we always retry the record/collection
throw new RetriableException(e);
}
}
}
void commitTxn(long lastRecordId, Counter numTransactions) throws IOException {
try {
// Store id in consumer group metadata for the partition.
// NOTE: Kafka keeps this metadata for 24 hours since the last update. This limits
// how long the pipeline could be down before resuming it. It does not look like
// this TTL can be adjusted (asked about it on Kafka users list).
ProducerSpEL.sendOffsetsToTransaction(
producer,
ImmutableMap.of(
new TopicPartition(spec.getTopic(), shard),
new OffsetAndMetadata(
0L,
JSON_MAPPER.writeValueAsString(new ShardMetadata(lastRecordId, writerId)))),
spec.getSinkGroupId());
ProducerSpEL.commitTransaction(producer);
numTransactions.inc();
LOG.debug("{} : committed {} records", shard, lastRecordId - committedId);
committedId = lastRecordId;
} catch (KafkaException e) {
ProducerSpEL.abortTransaction(producer);
throw e;
}
}
@Test
void testTransactionalProducerBehaviorOnCommittingAnAbortedTransaction() {
assumeFalse(
System.getProperty("os.name").contains("Windows"),
"Transactional producers not supported on Windows"
);
ProducerFactory<String, String> producerFactory = transactionalProducerFactory(kafkaBroker, "xyz");
Producer<String, String> testProducer = producerFactory.createProducer();
try {
testProducer.beginTransaction();
send(testProducer, "testTransactionalProducerBehaviorOnCommittingAnAbortedTransaction", "bar");
testProducer.abortTransaction();
assertThrows(KafkaException.class, testProducer::commitTransaction);
} finally {
cleanup(producerFactory, testProducer);
}
}
@Test
void testTransactionalProducerBehaviorOnSendingOffsetsWhenTransactionIsClosed() {
assumeFalse(
System.getProperty("os.name").contains("Windows"),
"Transactional producers not supported on Windows"
);
ProducerFactory<String, String> producerFactory = transactionalProducerFactory(kafkaBroker, "xyz");
Producer<String, String> testProducer = producerFactory.createProducer();
testProducer.beginTransaction();
testProducer.commitTransaction();
assertThrows(KafkaException.class, () -> testProducer.sendOffsetsToTransaction(Collections.emptyMap(), "foo"));
cleanup(producerFactory, testProducer);
}
public <T> T getConfiguredInstance(String key, Class<T> t, Producer<byte[], byte[]> producer) {
Class<?> c = getClass(key);
if (c == null) {
return null;
}
Object o = Utils.newInstance(c);
if (!t.isInstance(o)) {
throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
}
if (o instanceof Configurable) {
((Configurable) o).configure(configsWithCurrentProducer(producer));
}
return t.cast(o);
}
@SuppressWarnings("unchecked")
protected <T> T newInstance(Map<String, ?> map, String key, Class<T> klass) throws KafkaException {
Object val = map.get(key);
if (val == null) {
throw new KafkaException("No value for '" + key + "' found");
} else if (val instanceof String) {
try {
return (T) Utils.newInstance(Class.forName((String) val));
} catch (Exception e) {
throw new KafkaException(e);
}
} else if (val instanceof Class) {
return (T) Utils.newInstance((Class<T>) val);
} else {
throw new KafkaException("Unexpected type '" + val.getClass() + "' for '" + key + "'");
}
}
@Test
public void consumerCreationFailsFirstTime() {
org.mockito.BDDMockito
.given(consumerFactory.createConsumer(ArgumentMatchers.any(),
ArgumentMatchers.any()))
.willThrow(KafkaException.class).willReturn(consumer);
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC,
new TopicInformation("group5-metrics", partitions, false));
metrics.bindTo(meterRegistry);
Gauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME)
.tag("group", "group5-metrics").tag("topic", TEST_TOPIC).gauge();
assertThat(gauge.value()).isEqualTo(0);
assertThat(gauge.value()).isEqualTo(1000.0);
org.mockito.Mockito.verify(this.consumerFactory, Mockito.times(2))
.createConsumer(ArgumentMatchers.any(), ArgumentMatchers.any());
}
@Test
public void consumerCreationFailsFirstTime() {
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation(
"foo-healthIndicator", partitions, false));
org.mockito.BDDMockito.given(consumerFactory.createConsumer())
.willThrow(KafkaException.class).willReturn(consumer);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
org.mockito.Mockito.verify(this.consumerFactory, Mockito.times(2))
.createConsumer();
}
@Override
public void run() {
try {
for (Map.Entry<String, TopicManagementHelper> entry : _topicManagementByCluster.entrySet()) {
String clusterName = entry.getKey();
TopicManagementHelper helper = entry.getValue();
try {
helper.maybeElectLeader();
} catch (IOException | KafkaException e) {
LOGGER.warn(_serviceName + "/MultiClusterTopicManagementService will retry later in cluster " + clusterName, e);
}
}
} catch (Throwable t) {
/* Need to catch throwable because there is scala API that can throw NoSuchMethodError in runtime
and such error is not caught by compilation. */
LOGGER.error(_serviceName
+ "/MultiClusterTopicManagementService/PreferredLeaderElectionRunnable will stop due to an error.", t);
stop();
}
}
Future<RecordMetadata> sendRecord(
TimestampedValue<ProducerRecord<K, V>> record, Counter sendCounter) {
try {
Long timestampMillis =
spec.getPublishTimestampFunction() != null
? spec.getPublishTimestampFunction()
.getTimestamp(record.getValue(), record.getTimestamp())
.getMillis()
: null;
Future<RecordMetadata> result =
producer.send(
new ProducerRecord<>(
spec.getTopic(),
null,
timestampMillis,
record.getValue().key(),
record.getValue().value()));
sendCounter.inc();
return result;
} catch (KafkaException e) {
ProducerSpEL.abortTransaction(producer);
throw e;
}
}
void commitTxn(long lastRecordId, Counter numTransactions) throws IOException {
try {
// Store id in consumer group metadata for the partition.
// NOTE: Kafka keeps this metadata for 24 hours since the last update. This limits
// how long the pipeline could be down before resuming it. It does not look like
// this TTL can be adjusted (asked about it on Kafka users list).
ProducerSpEL.sendOffsetsToTransaction(
producer,
ImmutableMap.of(
new TopicPartition(spec.getTopic(), shard),
new OffsetAndMetadata(
0L,
JSON_MAPPER.writeValueAsString(new ShardMetadata(lastRecordId, writerId)))),
spec.getSinkGroupId());
ProducerSpEL.commitTransaction(producer);
numTransactions.inc();
LOG.debug("{} : committed {} records", shard, lastRecordId - committedId);
committedId = lastRecordId;
} catch (KafkaException e) {
ProducerSpEL.abortTransaction(producer);
throw e;
}
}
@Test(expected = KafkaException.class)
public void executeEFFECTIVEONCE() throws ExecutionException, InterruptedException {
kafkaBolt.prepare(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, EFFECTIVELY_ONCE),
null, outputCollector);
when(tupleTransformer.transformToKey(tuple)).thenReturn("key");
byte[] value = new byte[]{1, 2, 3};
when(tupleTransformer.transformToValue(tuple)).thenReturn(value);
when(tupleTransformer.getTopicName(tuple)).thenReturn("topic");
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>("topic", "key", value);
when(producer.send(producerRecord)).thenReturn(future);
kafkaBolt.execute(tuple);
verify(future).get();
when(future.get()).thenThrow(ExecutionException.class);
kafkaBolt.execute(tuple);
}
@Test
public void validatePoolConsumerFails() throws Exception {
when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
try {
lease.poll();
fail();
} catch (final KafkaException ke) {
}
}
testPool.close();
verify(mockSession, times(0)).create();
verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
}
@Override
public int getPartitionCount(
String metadataBrokerList,
String topic,
Map<String, Object> kafkaClientConfigs,
int messageSendMaxRetries,
long retryBackoffMs
) throws StageException {
int partitionCount = -1;
Consumer<String, String> kafkaConsumer = null;
try {
kafkaConsumer = createTopicMetadataClient(metadataBrokerList, kafkaClientConfigs);
List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor(topic);
if(partitionInfoList != null) {
partitionCount = partitionInfoList.size();
}
} catch (KafkaException e) {
LOG.error(KafkaErrors.KAFKA_41.getMessage(), topic, e.toString(), e);
throw new StageException(KafkaErrors.KAFKA_41, topic, e.toString());
} finally {
if (kafkaConsumer != null) {
kafkaConsumer.close();
}
}
return partitionCount;
}
@Test
void whenStartingBySubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() {
// GIVEN
consumer.schedulePollTask(() -> consumer.setPollException(new KafkaException("poll exception")));
consumer.schedulePollTask(() -> countryPopulationConsumer.stop());
HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
TopicPartition tp = new TopicPartition(TOPIC, 0);
startOffsets.put(tp, 0L);
consumer.updateBeginningOffsets(startOffsets);
// WHEN
countryPopulationConsumer.startBySubscribing(TOPIC);
// THEN
assertThat(pollException).isInstanceOf(KafkaException.class).hasMessage("poll exception");
assertThat(consumer.closed()).isTrue();
}
@Override
public int getPartitionCount(
String metadataBrokerList,
String topic,
Map<String, Object> kafkaClientConfigs,
int messageSendMaxRetries,
long retryBackoffMs
) throws StageException {
int partitionCount = -1;
try {
Consumer<String, String> kafkaConsumer = createTopicMetadataClient();
List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor(topic);
if(partitionInfoList != null) {
partitionCount = partitionInfoList.size();
}
} catch (KafkaException e) {
LOG.error(KafkaErrors.KAFKA_41.getMessage(), topic, e.toString(), e);
throw new StageException(KafkaErrors.KAFKA_41, topic, e.toString(), e);
}
return partitionCount;
}
public void addAcls(scala.collection.immutable.Set<Acl> acls, final Resource resource) {
verifyAcls(acls);
LOG.info("Adding Acl: acl->" + acls + " resource->" + resource);
final Iterator<Acl> iterator = acls.iterator();
while (iterator.hasNext()) {
final Acl acl = iterator.next();
final String role = getRole(acl);
if (!roleExists(role)) {
throw new KafkaException("Can not add Acl for non-existent Role: " + role);
}
execute(new Command<Void>() {
@Override
public Void run(SentryGenericServiceClient client) throws Exception {
client.grantPrivilege(
requestorName, role, COMPONENT_NAME, toTSentryPrivilege(acl, resource));
return null;
}
});
}
}
public static ConsumerRecords<byte[], byte[]> fetchMessages(
KafkaConfig config, KafkaConsumer<byte[], byte[]> consumer, Partition partition,
long offset) {
String topic = (String) config._stateConf.get(Config.KAFKA_TOPIC);
int partitionId = partition.partition;
TopicPartition topicAndPartition = new TopicPartition (topic, partitionId);
consumer.seek(topicAndPartition, offset);
ConsumerRecords<byte[], byte[]> records;
try {
records = consumer.poll(config._fillFreqMs / 2);
} catch(InvalidOffsetException ex) {
throw new OutOfRangeException(ex.getMessage());
} catch (Exception e) {
if (e instanceof KafkaException || e instanceof ConnectException
|| e instanceof SocketTimeoutException || e instanceof IOException
|| e instanceof UnresolvedAddressException) {
LOG.warn("Network error when fetching messages:", e);
throw new FailedFetchException(e);
} else {
throw new RuntimeException(e);
}
}
return records;
}
private Response handleException(final Throwable exception) {
if (exception instanceof AuthenticationException) {
return getResponse(exception, Status.UNAUTHORIZED,
KAFKA_AUTHENTICATION_ERROR_CODE);
} else if (exception instanceof AuthorizationException) {
return getResponse(exception, Status.FORBIDDEN,
KAFKA_AUTHORIZATION_ERROR_CODE);
} else if (HANDLED.containsKey(exception.getClass())) {
return getResponse(exception);
} else if (exception instanceof RetriableException) {
log.debug("Kafka retriable exception", exception);
return getResponse(exception, Status.INTERNAL_SERVER_ERROR,
KAFKA_RETRIABLE_ERROR_ERROR_CODE);
} else if (exception instanceof KafkaException) {
log.error("Kafka exception", exception);
return getResponse(exception, Status.INTERNAL_SERVER_ERROR,
KAFKA_ERROR_ERROR_CODE);
} else if (exception instanceof InvalidFormatException) {
return getResponse(exception, Status.BAD_REQUEST,
KAFKA_BAD_REQUEST_ERROR_CODE);
} else {
log.error("Unhandled exception", exception);
return super.toResponse(exception);
}
}
@Test
public void validatePoolConsumerFails() throws Exception {
when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
try {
lease.poll();
fail();
} catch (final KafkaException ke) {
}
}
testPool.close();
verify(mockSession, times(0)).create();
verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
}
private TrafficControlStatus checkAndUpdateTopic(KafkaRoad road) {
String message = "";
try {
adminClient.checkAndUpdateTopic(road);
} catch (KafkaException e) {
log.warn("Problem updating Kafka topic for {}", road.getName(), e);
message = String.format(UPDATE_EXCEPTION_FORMAT, e.toString());
}
KafkaTopicDetails topicDetails = adminClient.topicDetails(road.getTopicName());
return status(true, topicDetails.getNumPartitions(), topicDetails.getNumReplicas(), message);
}
@Test
public void create_road_fails_when_there_is_a_kafka_error() throws Exception {
willThrow(KafkaException.class).given(kafkaAdminClient).createTopic(testRoadModel);
List<PatchOperation> operations = trafficControl.newModel("test_road", testRoadModel);
verify(kafkaAdminClient).createTopic(testRoadModel);
assertThat(operations.size(), is(1));
assertThat(operations.get(0).getOperation(), is(Operation.ADD));
assertThat(operations.get(0).getPath(), is("/status"));
TrafficControlStatus status = (TrafficControlStatus) operations.get(0).getValue();
assertFalse(status.isTopicCreated());
assertThat(status.getMessage(), containsString(KafkaException.class.getSimpleName()));
}
@Test
public void commitFailure() throws Exception {
doThrow(KafkaException.class).when(consumer).commitSync(any());
underTest.init(1L, rebalanceListener);
boolean result = underTest.commit(singletonMap(0, 1L));
assertThat(result, is(false));
verify(consumer).commitSync(singletonMap(topicPartition, new OffsetAndMetadata(1L)));
}
private static Schema schemaForKey(int version) {
Schema schema = MESSAGE_TYPE_SCHEMAS.get(version);
if (null == schema) {
throw new KafkaException("Unknown offset schema version " + version);
}
return schema;
}
private static Schema schemaForOffset(int version) {
Schema schema = OFFSET_VALUE_SCHEMAS.get(version);
if (null == schema) {
throw new KafkaException("Unknown offset schema version " + version);
}
return schema;
}
private static Schema schemaForGroup(int version) {
Schema schema = GROUP_VALUE_SCHEMAS.get(version);
if (null == schema) {
throw new KafkaException("Unknown group metadata version " + version);
}
return schema;
}