下面列出了怎么用org.apache.kafka.common.config.ConfigResource.Type的API类实例代码及写法,或者点击链接到github查看源代码。
private void verifyTopicConfiguration(
String topic, HashMap<String, String> config, List<String> removedConfigs)
throws ExecutionException, InterruptedException {
ConfigResource resource = new ConfigResource(Type.TOPIC, topic);
Collection<ConfigResource> resources = Collections.singletonList(resource);
Map<ConfigResource, Config> configs = kafkaAdminClient.describeConfigs(resources).all().get();
Config topicConfig = configs.get(resource);
Assert.assertNotNull(topicConfig);
topicConfig
.entries()
.forEach(
entry -> {
if (!entry.isDefault()) {
if (config.get(entry.name()) != null)
Assert.assertEquals(config.get(entry.name()), entry.value());
Assert.assertFalse(removedConfigs.contains(entry.name()));
}
});
}
@Override
public Map<String, ConfiguredTopic> listExisting(boolean excludeInternal) {
try {
Set<String> topics = adminClient
.listTopics(excludeInternal ? EXCLUDE_INTERNAL : INCLUDE_INTERNAL)
.names().get();
Collection<TopicDescription> topicDescriptions = adminClient.describeTopics(topics).all().get().values();
List<ConfigResource> resources = topics
.stream()
.map(t -> new ConfigResource(Type.TOPIC, t))
.collect(toList());
Map<ConfigResource, KafkaFuture<Config>> topicConfigs = adminClient.describeConfigs(resources).values();
return topicDescriptions
.stream()
.map(td -> configuredTopic(td, topicConfigs.get(new ConfigResource(Type.TOPIC, td.name()))))
.filter(t -> !INTERNAL_TOPIC.test(t))
.collect(toMap(ConfiguredTopic::getName, td -> td));
} catch (InterruptedException | ExecutionException e) {
// TODO: FA-10109: Improve exception handling
throw new RuntimeException(e);
}
}
private String deleteTopicConfig(String clusterAlias, AdminClient adminClient, String topic, ConfigEntry configEntry) {
try {
String describeTopicConfigs = describeTopicConfig(clusterAlias, topic);
JSONObject object = JSON.parseObject(describeTopicConfigs).getJSONObject("config");
object.remove(configEntry.name());
List<ConfigEntry> configEntrys = new ArrayList<>();
for (String key : KConstants.Topic.getTopicConfigKeys()) {
if (object.containsKey(key)) {
configEntrys.add(new ConfigEntry(key, object.getString(key)));
}
}
Map<ConfigResource, Config> configs = new HashMap<>();
ConfigResource configRes = new ConfigResource(Type.TOPIC, topic);
Config config = new Config(configEntrys);
configs.put(configRes, config);
adminClient.alterConfigs(configs);
return KConstants.Topic.SUCCESS;
} catch (Exception e) {
e.printStackTrace();
LOG.error("Delete topic[" + topic + "] config has error, msg is " + e.getMessage());
return e.getMessage();
}
}
private void updateTopicConfigPostAK23(Topic topic, String fullTopicName)
throws ExecutionException, InterruptedException {
Config currentConfigs = getActualTopicConfig(fullTopicName);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
ArrayList<AlterConfigOp> listOfValues = new ArrayList<>();
topic
.rawConfig()
.forEach(
(configKey, configValue) -> {
listOfValues.add(
new AlterConfigOp(new ConfigEntry(configKey, configValue), OpType.SET));
});
Set<String> newEntryKeys = topic.rawConfig().keySet();
currentConfigs
.entries()
.forEach(
entry -> {
if (!newEntryKeys.contains(entry.name())) {
listOfValues.add(new AlterConfigOp(entry, OpType.DELETE));
}
});
configs.put(new ConfigResource(Type.TOPIC, fullTopicName), listOfValues);
adminClient.incrementalAlterConfigs(configs).all().get();
}
private Config getActualTopicConfig(String topic)
throws ExecutionException, InterruptedException {
ConfigResource resource = new ConfigResource(Type.TOPIC, topic);
Collection<ConfigResource> resources = Collections.singletonList(resource);
Map<ConfigResource, Config> configs = adminClient.describeConfigs(resources).all().get();
return configs.get(resource);
}
/**
* Find cluster inter protocol version, used to determine the minimum level of Api compatibility
*
* @return String, the current Kafka Protocol version
*/
private String findKafkaVersion() throws IOException {
ConfigResource resource = new ConfigResource(Type.BROKER, "inter.broker.protocol.version");
String kafkaVersion = "";
try {
Map<ConfigResource, Config> configs =
adminClient.describeConfigs(Collections.singletonList(resource)).all().get();
kafkaVersion =
configs.get(resource).get("inter.broker.protocol.version").value().split("-")[0];
} catch (ExecutionException | InterruptedException e) {
LOGGER.error(e);
throw new IOException(e);
}
return kafkaVersion;
}
@Override
public void alterConfiguration(List<ConfiguredTopic> topics) {
AlterConfigsOptions options = dryRun ? new AlterConfigsOptions().validateOnly(true) : new AlterConfigsOptions();
Map<ConfigResource, Config> configs = topics
.stream()
.collect(toMap(t -> new ConfigResource(Type.TOPIC, t.getName()), TopicServiceImpl::resourceConfig));
try {
adminClient.alterConfigs(configs, options).all().get();
} catch (InterruptedException | ExecutionException e) {
// TODO: FA-10109: Improve exception handling
throw new RuntimeException(e);
}
}
private String addTopicConfig(String clusterAlias, AdminClient adminClient, String topic, ConfigEntry configEntry) {
try {
String describeTopicConfigs = describeTopicConfig(clusterAlias, topic);
JSONObject object = JSON.parseObject(describeTopicConfigs).getJSONObject("config");
if (object.containsKey(configEntry.name())) {
object.remove(configEntry.name());
}
List<ConfigEntry> configEntrys = new ArrayList<>();
for (String key : KConstants.Topic.getTopicConfigKeys()) {
if (object.containsKey(key)) {
configEntrys.add(new ConfigEntry(key, object.getString(key)));
}
}
configEntrys.add(configEntry);
Map<ConfigResource, Config> configs = new HashMap<>();
ConfigResource configRes = new ConfigResource(Type.TOPIC, topic);
Config config = new Config(configEntrys);
configs.put(configRes, config);
AlterConfigsResult alterConfig = adminClient.alterConfigs(configs);
alterConfig.all().get();
return KConstants.Topic.SUCCESS;
} catch (Exception e) {
e.printStackTrace();
LOG.error("Add topic[" + topic + "] config has error, msg is " + e.getMessage());
return e.getMessage();
}
}
/**
* @return the resource type
*/
public Type getType() {
return type;
}
/**
* Constructor
*
* @param type a non-null resource type
* @param name a non-null resource name
*/
public ConfigResource(Type type,
java.lang.String name) {
this.type = type;
this.name = name;
}
/**
* Set the resource type
*
* @param type the resource type
* @return current instance of the class to be fluent
*/
public ConfigResource setType(Type type) {
this.type = type;
return this;
}