org.apache.kafka.clients.consumer.ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG源码实例Demo

下面列出了org.apache.kafka.clients.consumer.ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

/**
 * Validate that at least one seed broker is valid in case of a
 * ClosedChannelException.
 *
 * @param seedBrokers
 *            array containing the seed brokers e.g. ["host1:port1",
 *            "host2:port2"]
 * @param exception
 *            instance
 */
private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
	if (!(exception instanceof ClosedChannelException)) {
		return;
	}
	int unknownHosts = 0;
	for (String broker : seedBrokers) {
		URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim());
		try {
			InetAddress.getByName(brokerUrl.getHost());
		} catch (UnknownHostException e) {
			unknownHosts++;
		}
	}
	// throw meaningful exception if all the provided hosts are invalid
	if (unknownHosts == seedBrokers.length) {
		throw new IllegalArgumentException("All the servers provided in: '"
			+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)");
	}
}
 
源代码2 项目: flink   文件: Kafka08PartitionDiscoverer.java
/**
 * Validate that at least one seed broker is valid in case of a
 * ClosedChannelException.
 *
 * @param seedBrokers
 *            array containing the seed brokers e.g. ["host1:port1",
 *            "host2:port2"]
 * @param exception
 *            instance
 */
private static void validateSeedBrokers(String[] seedBrokers, Exception exception) {
	if (!(exception instanceof ClosedChannelException)) {
		return;
	}
	int unknownHosts = 0;
	for (String broker : seedBrokers) {
		URL brokerUrl = NetUtils.getCorrectHostnamePort(broker.trim());
		try {
			InetAddress.getByName(brokerUrl.getHost());
		} catch (UnknownHostException e) {
			unknownHosts++;
		}
	}
	// throw meaningful exception if all the provided hosts are invalid
	if (unknownHosts == seedBrokers.length) {
		throw new IllegalArgumentException("All the servers provided in: '"
			+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + "' config are invalid. (unknown hosts)");
	}
}
 
源代码3 项目: samza   文件: KafkaSystemAdmin.java
protected Properties createAdminClientProperties() {
  // populate brokerList from either consumer or producer configs
  Properties props = new Properties();
  // included SSL settings if needed

  props.putAll(config.subset(String.format("systems.%s.consumer.", systemName), true));

  //validate brokerList
  String brokerList = config.get(
      String.format(KafkaConfig.CONSUMER_CONFIGS_CONFIG_KEY(), systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
  if (brokerList == null) {
    brokerList = config.get(String.format(KafkaConfig.PRODUCER_CONFIGS_CONFIG_KEY(), systemName,
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
  }
  if (brokerList == null) {
    throw new SamzaException(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " is required for systemAdmin for system " + systemName);
  }
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

  return props;
}
 
源代码4 项目: adaptive-alerting   文件: ConfigUtil.java
public static Properties toConsumerConfig(Config config) {
    val keys = new String[]{
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
        ConsumerConfig.GROUP_ID_CONFIG,
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
    };
    val props = new Properties();
    copyProps(config, props, keys);
    return props;
}
 
源代码5 项目: paraflow   文件: ParaflowKafkaConsumer.java
public ParaflowKafkaConsumer(List<TopicPartition> topicPartitions, Properties config)
{
    // set the consumer configuration properties for kafka record key and value serializers
    if (!config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    }
    if (!config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    }
    if (!config.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
        throw new IllegalArgumentException(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be specified in the config");
    }
    this.consumer = new KafkaConsumer<>(config);
    this.consumer.assign(topicPartitions);
}
 
源代码6 项目: ameliant-tools   文件: ConsumerConfigsBuilder.java
public ConsumerConfigsBuilder bootstrapServers(String bootstrapServersConfig) {
    return new ConsumerConfigsBuilder(this, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
}
 
源代码7 项目: samza   文件: KafkaConsumerConfig.java
/**
 * Create kafka consumer configs, based on the subset of global configs.
 * @param config application config
 * @param systemName system name
 * @param clientId client id provided by the caller
 * @return KafkaConsumerConfig
 */
public static KafkaConsumerConfig getKafkaSystemConsumerConfig(Config config, String systemName, String clientId) {

  Config subConf = config.subset(String.format("systems.%s.consumer.", systemName), true);

  final String groupId = createConsumerGroupId(config);

  Map<String, Object> consumerProps = new HashMap<>(subConf);

  consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);

  // These are values we enforce in sazma, and they cannot be overwritten.

  // Disable consumer auto-commit because Samza controls commits
  consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

  // check if samza default offset value is defined
  String systemOffsetDefault = new SystemConfig(config).getSystemOffsetDefault(systemName);

  // Translate samza config value to kafka config value
  String autoOffsetReset = getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), systemOffsetDefault);
  LOG.info("setting auto.offset.reset for system {} to {}", systemName, autoOffsetReset);
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

  // if consumer bootstrap servers are not configured, get them from the producer configs
  if (!subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
    String bootstrapServers =
        config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
    if (StringUtils.isEmpty(bootstrapServers)) {
      throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config  for " + systemName);
    }
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  }

  // Always use default partition assignment strategy. Do not allow override.
  consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());

  // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should
  // default to byte[]
  if (!consumerProps.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
    LOG.info("setting key serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
  }
  if (!consumerProps.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
    LOG.info("setting value serialization for the consumer(for system {}) to ByteArrayDeserializer", systemName);
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
  }

  // Override default max poll config if there is no value
  consumerProps.putIfAbsent(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, DEFAULT_KAFKA_CONSUMER_MAX_POLL_RECORDS);

  return new KafkaConsumerConfig(consumerProps, systemName);
}