类org.apache.kafka.common.config.ConfigResource.Type源码实例Demo

下面列出了怎么用org.apache.kafka.common.config.ConfigResource.Type的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: kafka-topology-builder   文件: TopicManagerIT.java
private void verifyTopicConfiguration(
    String topic, HashMap<String, String> config, List<String> removedConfigs)
    throws ExecutionException, InterruptedException {

  ConfigResource resource = new ConfigResource(Type.TOPIC, topic);
  Collection<ConfigResource> resources = Collections.singletonList(resource);

  Map<ConfigResource, Config> configs = kafkaAdminClient.describeConfigs(resources).all().get();

  Config topicConfig = configs.get(resource);
  Assert.assertNotNull(topicConfig);

  topicConfig
      .entries()
      .forEach(
          entry -> {
            if (!entry.isDefault()) {
              if (config.get(entry.name()) != null)
                Assert.assertEquals(config.get(entry.name()), entry.value());
              Assert.assertFalse(removedConfigs.contains(entry.name()));
            }
          });
}
 
源代码2 项目: kafka-helmsman   文件: TopicServiceImpl.java
@Override
public Map<String, ConfiguredTopic> listExisting(boolean excludeInternal) {
  try {
    Set<String> topics = adminClient
        .listTopics(excludeInternal ? EXCLUDE_INTERNAL : INCLUDE_INTERNAL)
        .names().get();
    Collection<TopicDescription> topicDescriptions = adminClient.describeTopics(topics).all().get().values();

    List<ConfigResource> resources = topics
        .stream()
        .map(t -> new ConfigResource(Type.TOPIC, t))
        .collect(toList());

    Map<ConfigResource, KafkaFuture<Config>> topicConfigs = adminClient.describeConfigs(resources).values();

    return topicDescriptions
        .stream()
        .map(td -> configuredTopic(td, topicConfigs.get(new ConfigResource(Type.TOPIC, td.name()))))
        .filter(t -> !INTERNAL_TOPIC.test(t))
        .collect(toMap(ConfiguredTopic::getName, td -> td));

  } catch (InterruptedException | ExecutionException e) {
    // TODO: FA-10109: Improve exception handling
    throw new RuntimeException(e);
  }
}
 
源代码3 项目: kafka-eagle   文件: KafkaMetricsServiceImpl.java
private String deleteTopicConfig(String clusterAlias, AdminClient adminClient, String topic, ConfigEntry configEntry) {
	try {
		String describeTopicConfigs = describeTopicConfig(clusterAlias, topic);
		JSONObject object = JSON.parseObject(describeTopicConfigs).getJSONObject("config");
		object.remove(configEntry.name());
		List<ConfigEntry> configEntrys = new ArrayList<>();
		for (String key : KConstants.Topic.getTopicConfigKeys()) {
			if (object.containsKey(key)) {
				configEntrys.add(new ConfigEntry(key, object.getString(key)));
			}
		}
		Map<ConfigResource, Config> configs = new HashMap<>();
		ConfigResource configRes = new ConfigResource(Type.TOPIC, topic);
		Config config = new Config(configEntrys);
		configs.put(configRes, config);
		adminClient.alterConfigs(configs);
		return KConstants.Topic.SUCCESS;
	} catch (Exception e) {
		e.printStackTrace();
		LOG.error("Delete topic[" + topic + "] config has error, msg is " + e.getMessage());
		return e.getMessage();
	}
}
 
private void updateTopicConfigPostAK23(Topic topic, String fullTopicName)
    throws ExecutionException, InterruptedException {

  Config currentConfigs = getActualTopicConfig(fullTopicName);

  Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
  ArrayList<AlterConfigOp> listOfValues = new ArrayList<>();

  topic
      .rawConfig()
      .forEach(
          (configKey, configValue) -> {
            listOfValues.add(
                new AlterConfigOp(new ConfigEntry(configKey, configValue), OpType.SET));
          });
  Set<String> newEntryKeys = topic.rawConfig().keySet();

  currentConfigs
      .entries()
      .forEach(
          entry -> {
            if (!newEntryKeys.contains(entry.name())) {
              listOfValues.add(new AlterConfigOp(entry, OpType.DELETE));
            }
          });

  configs.put(new ConfigResource(Type.TOPIC, fullTopicName), listOfValues);

  adminClient.incrementalAlterConfigs(configs).all().get();
}
 
private Config getActualTopicConfig(String topic)
    throws ExecutionException, InterruptedException {
  ConfigResource resource = new ConfigResource(Type.TOPIC, topic);
  Collection<ConfigResource> resources = Collections.singletonList(resource);

  Map<ConfigResource, Config> configs = adminClient.describeConfigs(resources).all().get();

  return configs.get(resource);
}
 
/**
 * Find cluster inter protocol version, used to determine the minimum level of Api compatibility
 *
 * @return String, the current Kafka Protocol version
 */
private String findKafkaVersion() throws IOException {
  ConfigResource resource = new ConfigResource(Type.BROKER, "inter.broker.protocol.version");
  String kafkaVersion = "";
  try {
    Map<ConfigResource, Config> configs =
        adminClient.describeConfigs(Collections.singletonList(resource)).all().get();
    kafkaVersion =
        configs.get(resource).get("inter.broker.protocol.version").value().split("-")[0];
  } catch (ExecutionException | InterruptedException e) {
    LOGGER.error(e);
    throw new IOException(e);
  }
  return kafkaVersion;
}
 
源代码7 项目: kafka-helmsman   文件: TopicServiceImpl.java
@Override
public void alterConfiguration(List<ConfiguredTopic> topics) {
  AlterConfigsOptions options = dryRun ? new AlterConfigsOptions().validateOnly(true) : new AlterConfigsOptions();
  Map<ConfigResource, Config> configs = topics
      .stream()
      .collect(toMap(t -> new ConfigResource(Type.TOPIC, t.getName()), TopicServiceImpl::resourceConfig));
  try {
    adminClient.alterConfigs(configs, options).all().get();
  } catch (InterruptedException | ExecutionException e) {
    // TODO: FA-10109: Improve exception handling
    throw new RuntimeException(e);
  }
}
 
源代码8 项目: kafka-eagle   文件: KafkaMetricsServiceImpl.java
private String addTopicConfig(String clusterAlias, AdminClient adminClient, String topic, ConfigEntry configEntry) {
	try {
		String describeTopicConfigs = describeTopicConfig(clusterAlias, topic);
		JSONObject object = JSON.parseObject(describeTopicConfigs).getJSONObject("config");
		if (object.containsKey(configEntry.name())) {
			object.remove(configEntry.name());
		}
		List<ConfigEntry> configEntrys = new ArrayList<>();
		for (String key : KConstants.Topic.getTopicConfigKeys()) {
			if (object.containsKey(key)) {
				configEntrys.add(new ConfigEntry(key, object.getString(key)));
			}
		}
		configEntrys.add(configEntry);
		Map<ConfigResource, Config> configs = new HashMap<>();
		ConfigResource configRes = new ConfigResource(Type.TOPIC, topic);
		Config config = new Config(configEntrys);
		configs.put(configRes, config);
		AlterConfigsResult alterConfig = adminClient.alterConfigs(configs);
		alterConfig.all().get();
		return KConstants.Topic.SUCCESS;
	} catch (Exception e) {
		e.printStackTrace();
		LOG.error("Add topic[" + topic + "] config has error, msg is " + e.getMessage());
		return e.getMessage();
	}
}
 
源代码9 项目: vertx-kafka-client   文件: ConfigResource.java
/**
 * @return the resource type
 */
public Type getType() {
  return type;
}
 
源代码10 项目: vertx-kafka-client   文件: ConfigResource.java
/**
 * Constructor
 *
 * @param type a non-null resource type
 * @param name a non-null resource name
 */
public ConfigResource(Type type,
                      java.lang.String name) {
  this.type = type;
  this.name = name;
}
 
源代码11 项目: vertx-kafka-client   文件: ConfigResource.java
/**
 * Set the resource type
 *
 * @param type the resource type
 * @return current instance of the class to be fluent
 */
public ConfigResource setType(Type type) {
  this.type = type;
  return this;
}
 
 类所在包
 类方法
 同包方法