下面列出了org.apache.kafka.clients.consumer.ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Validate that at least one seed broker is valid in case of a
* ClosedChannelException.
*
* @param seedBrokers
* array containing the seed brokers e.g. ["host1:port1",
* "host2:port2"]
* @param exception
* instance
*/
private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
if (!(exception instanceof ClosedChannelException)) {
return;
}
int unknownHosts = 0;
for (String broker : seedBrokers) {
URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim());
try {
InetAddress.getByName(brokerUrl.getHost());
} catch (UnknownHostException e) {
unknownHosts++;
}
}
// throw meaningful exception if all the provided hosts are invalid
if (unknownHosts == seedBrokers.length) {
throw new IllegalArgumentException("All the servers provided in: '"
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)");
}
}
/**
* Validate that at least one seed broker is valid in case of a
* ClosedChannelException.
*
* @param seedBrokers
* array containing the seed brokers e.g. ["host1:port1",
* "host2:port2"]
* @param exception
* instance
*/
private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
if (!(exception instanceof ClosedChannelException)) {
return;
}
int unknownHosts = 0;
for (String broker : seedBrokers) {
URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim());
try {
InetAddress.getByName(brokerUrl.getHost());
} catch (UnknownHostException e) {
unknownHosts++;
}
}
// throw meaningful exception if all the provided hosts are invalid
if (unknownHosts == seedBrokers.length) {
throw new IllegalArgumentException("All the servers provided in: '"
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)");
}
}
protected Properties createAdminClientProperties() {
// populate brokerList from either consumer or producer configs
Properties props = new Properties();
// included SSL settings if needed
props.putAll(config.subset(String.format("systems.%s.consumer.", systemName), true));
//validate brokerList
String brokerList = config.get(
String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
if (brokerList == null) {
brokerList = config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), systemName,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
}
if (brokerList == null) {
throw new SamzaException(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " is required for systemAdmin for system " + systemName);
}
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
return props;
}
public static Properties toConsumerConfig(Config config) {
val keys = new String[]{
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
ConsumerConfig.GROUP_ID_CONFIG,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
};
val props = new Properties();
copyProps(config, props, keys);
return props;
}
public ParaflowKafkaConsumer(List<TopicPartition> topicPartitions, Properties config)
{
// set the consumer configuration properties for kafka record key and value serializers
if (!config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
if (!config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
if (!config.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
throw new IllegalArgumentException(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be specified in the config");
}
this.consumer = new KafkaConsumer<>(config);
this.consumer.assign(topicPartitions);
}
public ConsumerConfigsBuilder bootstrapServers(String bootstrapServersConfig) {
return new ConsumerConfigsBuilder(this, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
}
/**
* Create kafka consumer configs, based on the subset of global configs.
* @param config application config
* @param systemName system name
* @param clientId client id provided by the caller
* @return KafkaConsumerConfig
*/
public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) {
Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true);
final String groupId = createConsumerGroupId(config);
Map<String, Object> consumerProps = new HashMap<>(subConf);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
// These are values we enforce in sazma, and they cannot be overwritten.
// Disable consumer auto-commit because Samza controls commits
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// check if samza default offset value is defined
String systemOffsetDefault = new SystemConfig(config).getSystemOffsetDefault(systemName);
// Translate samza config value to kafka config value
String autoOffsetReset = getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), systemOffsetDefault);
LOG.info("setting auto.offset.reset for system {} to {}", systemName, autoOffsetReset);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
// if consumer bootstrap servers are not configured, get them from the producer configs
if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
String bootstrapServers =
config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
if (StringUtils.isEmpty(bootstrapServers)) {
throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config for " + systemName);
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}
// Always use default partition assignment strategy. Do not allow override.
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
// the consumer is fully typed, and deserialization can be too. But in case it is not provided we should
// default to byte[]
if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
LOG.info("setting key serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
LOG.info("setting value serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
// Override default max poll config if there is no value
consumerProps.putIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);
return new KafkaConsumerConfig(consumerProps, systemName);
}