下面列出了怎么用org.apache.kafka.common.config.ConfigResource的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void typical() throws Exception {
String topic = "topic";
Collection<String> topics = singleton(topic);
ConfigResource configResource = new ConfigResource(TOPIC, topic);
Config config = new Config(singleton(new ConfigEntry("retention.ms", "1")));
KafkaFuture<Map<ConfigResource, Config>> kafkaFuture = completedFuture(singletonMap(configResource, config));
doReturn(describeConfigsResult).when(adminClient).describeConfigs(any());
doReturn(kafkaFuture).when(describeConfigsResult).all();
Map<String, Duration> result = underTest.apply(topics);
assertThat(result.size(), is(1));
Duration retention = result.get(topic);
assertThat(retention, is(Duration.ofMillis(1)));
}
private void verifyTopicConfiguration(
String topic, HashMap<String, String> config, List<String> removedConfigs)
throws ExecutionException, InterruptedException {
ConfigResource resource = new ConfigResource(Type.TOPIC, topic);
Collection<ConfigResource> resources = Collections.singletonList(resource);
Map<ConfigResource, Config> configs = kafkaAdminClient.describeConfigs(resources).all().get();
Config topicConfig = configs.get(resource);
Assert.assertNotNull(topicConfig);
topicConfig
.entries()
.forEach(
entry -> {
if (!entry.isDefault()) {
if (config.get(entry.name()) != null)
Assert.assertEquals(config.get(entry.name()), entry.value());
Assert.assertFalse(removedConfigs.contains(entry.name()));
}
});
}
@Override
public Map<String, ConfiguredTopic> listExisting(boolean excludeInternal) {
try {
Set<String> topics = adminClient
.listTopics(excludeInternal ? EXCLUDE_INTERNAL : INCLUDE_INTERNAL)
.names().get();
Collection<TopicDescription> topicDescriptions = adminClient.describeTopics(topics).all().get().values();
List<ConfigResource> resources = topics
.stream()
.map(t -> new ConfigResource(Type.TOPIC, t))
.collect(toList());
Map<ConfigResource, KafkaFuture<Config>> topicConfigs = adminClient.describeConfigs(resources).values();
return topicDescriptions
.stream()
.map(td -> configuredTopic(td, topicConfigs.get(new ConfigResource(Type.TOPIC, td.name()))))
.filter(t -> !INTERNAL_TOPIC.test(t))
.collect(toMap(ConfiguredTopic::getName, td -> td));
} catch (InterruptedException | ExecutionException e) {
// TODO: FA-10109: Improve exception handling
throw new RuntimeException(e);
}
}
@Test
public void testAlterConfiguration() {
TopicService service = new TopicServiceImpl(adminClient, true);
AlterConfigsResult result = mock(AlterConfigsResult.class);
when(result.all()).thenReturn(KafkaFuture.completedFuture(null));
when(adminClient.alterConfigs(any(Map.class), any(AlterConfigsOptions.class))).thenReturn(result);
service.alterConfiguration(Collections.singletonList(
new ConfiguredTopic("test", 3, (short) 2, Collections.singletonMap("k", "v"))));
ArgumentCaptor<Map> alter = ArgumentCaptor.forClass(Map.class);
ArgumentCaptor<AlterConfigsOptions> options = ArgumentCaptor.forClass(AlterConfigsOptions.class);
verify(adminClient).alterConfigs((Map<ConfigResource, Config>) alter.capture(), options.capture());
Assert.assertEquals(1, alter.getValue().size());
ConfigResource expectedKey = new ConfigResource(TOPIC, "test");
Assert.assertTrue(alter.getValue().containsKey(expectedKey));
Assert.assertEquals("v", ((Config) alter.getValue().get(expectedKey)).get("k").value());
Assert.assertTrue(options.getValue().shouldValidateOnly());
}
public static void describeTopicConfig() throws ExecutionException,
InterruptedException {
String brokerList = "localhost:9092";
String topic = "topic-admin";
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
AdminClient client = AdminClient.create(props);
ConfigResource resource =
new ConfigResource(ConfigResource.Type.TOPIC, topic);
DescribeConfigsResult result =
client.describeConfigs(Collections.singleton(resource));
Config config = result.all().get().get(resource);
System.out.println(config);
client.close();
}
public static void alterTopicConfig() throws ExecutionException, InterruptedException {
String brokerList = "localhost:9092";
String topic = "topic-admin";
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
AdminClient client = AdminClient.create(props);
ConfigResource resource =
new ConfigResource(ConfigResource.Type.TOPIC, topic);
ConfigEntry entry = new ConfigEntry("cleanup.policy", "compact");
Config config = new Config(Collections.singleton(entry));
Map<ConfigResource, Config> configs = new HashMap<>();
configs.put(resource, config);
AlterConfigsResult result = client.alterConfigs(configs);
result.all().get();
client.close();
}
/**
* Updates the given topic's config with the {@link Properties} provided. This is not additive but a full
* replacement
*
* @param topic
* the topic to update config for
* @param properties
* the properties to assign to the topic
* @throws IllegalArgumentException
* if topic is null, empty or blank, or properties is {@code null}
* @throws AdminOperationException
* if there is an issue updating the topic config
*/
public void updateTopicConfig(String topic, Properties properties) {
if (StringUtils.isBlank(topic))
throw new IllegalArgumentException("topic cannot be null, empty or blank");
if (properties == null)
throw new IllegalArgumentException("properties cannot be null");
LOG.debug("Updating topic config for topic [{}] with config [{}]", topic, properties);
try {
List<ConfigEntry> configEntries = new ArrayList<>();
for (String property : properties.stringPropertyNames()) {
configEntries.add(new ConfigEntry(property, properties.getProperty(property)));
}
getNewAdminClient()
.alterConfigs(
Collections.singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, topic),
new Config(configEntries)))
.all()
.get(operationTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new AdminOperationException("Unable to update configuration for topic: " + topic, e);
}
}
/**
* Retrieve topic minISR config information if it is not cached locally.
* @param topicsToCheck Set of topics to check.
*/
private void maybeRetrieveAndCacheTopicMinISR(Set<String> topicsToCheck) {
Set<ConfigResource> topicResourcesToCheck = new HashSet<>(topicsToCheck.size());
topicsToCheck.stream().filter(t -> !_cachedTopicMinISR.containsKey(t))
.forEach(t -> topicResourcesToCheck.add(new ConfigResource(ConfigResource.Type.TOPIC, t)));
if (topicResourcesToCheck.isEmpty()) {
return;
}
for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : _adminClient.describeConfigs(topicResourcesToCheck).values().entrySet()) {
try {
short topicMinISR = Short.parseShort(entry.getValue().get(DESCRIBE_TOPIC_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value());
_cachedTopicMinISR.put(entry.getKey().name(), new TopicMinISREntry(topicMinISR, System.currentTimeMillis()));
} catch (TimeoutException | InterruptedException | ExecutionException e) {
LOG.warn("Skip attempt to fix replication factor of topic {} due to unable to retrieve its minISR config.",
entry.getKey().name());
}
}
}
protected void maybeUpdateTopicConfig() {
try {
// Retrieve topic config to check and update.
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, _cruiseControlMetricsTopic);
DescribeConfigsResult describeConfigsResult = _adminClient.describeConfigs(Collections.singleton(topicResource));
Config topicConfig = describeConfigsResult.values().get(topicResource).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
Set<AlterConfigOp> alterConfigOps = new HashSet<>(2);
Map<String, String> configsToSet = new HashMap<>(2);
configsToSet.put(LogConfig.RetentionMsProp(), _metricsTopic.configs().get(LogConfig.RetentionMsProp()));
configsToSet.put(LogConfig.CleanupPolicyProp(), _metricsTopic.configs().get(LogConfig.CleanupPolicyProp()));
maybeUpdateConfig(alterConfigOps, configsToSet, topicConfig);
if (!alterConfigOps.isEmpty()) {
AlterConfigsResult alterConfigsResult = _adminClient.incrementalAlterConfigs(Collections.singletonMap(topicResource, alterConfigOps));
alterConfigsResult.values().get(topicResource).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn("Unable to update config of Cruise Cruise Control metrics topic {}", _cruiseControlMetricsTopic, e);
}
}
private String deleteTopicConfig(String clusterAlias, AdminClient adminClient, String topic, ConfigEntry configEntry) {
try {
String describeTopicConfigs = describeTopicConfig(clusterAlias, topic);
JSONObject object = JSON.parseObject(describeTopicConfigs).getJSONObject("config");
object.remove(configEntry.name());
List<ConfigEntry> configEntrys = new ArrayList<>();
for (String key : KConstants.Topic.getTopicConfigKeys()) {
if (object.containsKey(key)) {
configEntrys.add(new ConfigEntry(key, object.getString(key)));
}
}
Map<ConfigResource, Config> configs = new HashMap<>();
ConfigResource configRes = new ConfigResource(Type.TOPIC, topic);
Config config = new Config(configEntrys);
configs.put(configRes, config);
adminClient.alterConfigs(configs);
return KConstants.Topic.SUCCESS;
} catch (Exception e) {
e.printStackTrace();
LOG.error("Delete topic[" + topic + "] config has error, msg is " + e.getMessage());
return e.getMessage();
}
}
/**
* Simple smoke test to ensure broker running appropriate listeners.
*/
@Test
void validateListener() throws ExecutionException, InterruptedException {
try (final AdminClient adminClient = getKafkaTestUtils().getAdminClient()) {
final ConfigResource broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1");
// Pull broker configs
final Config configResult = adminClient
.describeConfigs(Collections.singletonList(broker1Resource))
.values()
.get(broker1Resource)
.get();
// Check listener
final String actualListener = configResult.get("listeners").value();
Assertions.assertTrue(
actualListener.contains(getExpectedListenerProtocol() + "://"),
"Expected " + getExpectedListenerProtocol() + ":// and found: " + actualListener);
// Check inter broker protocol
final String actualBrokerProtocol = configResult.get("security.inter.broker.protocol").value();
Assertions.assertEquals(getExpectedListenerProtocol(), actualBrokerProtocol, "Unexpected inter-broker protocol");
}
}
/**
* describe topic‘s config
*
* @param client
*/
public static void describeConfig(AdminClient client)
throws ExecutionException, InterruptedException {
DescribeConfigsResult ret = client.describeConfigs(
Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, TEST_TOPIC)));
Map<ConfigResource, Config> configs = ret.all().get();
for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
ConfigResource key = entry.getKey();
Config value = entry.getValue();
System.out.println(
String.format("Resource type: %s, resource name: %s", key.type(), key.name()));
Collection<ConfigEntry> configEntries = value.entries();
for (ConfigEntry each : configEntries) {
System.out.println(each.name() + " = " + each.value());
}
}
}
private Future<Map<String, Config>> topicConfigs(Collection<String> topicNames) {
log.debug("Getting topic configs for {} topics", topicNames.size());
List<ConfigResource> configs = topicNames.stream()
.map((String topicName) -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
.collect(Collectors.toList());
Promise<Map<String, Config>> promise = Promise.promise();
ac.describeConfigs(configs).all().whenComplete((topicNameToConfig, error) -> {
if (error != null) {
promise.fail(error);
} else {
log.debug("Got topic configs for {} topics", topicNames.size());
promise.complete(topicNameToConfig.entrySet().stream()
.collect(Collectors.toMap(
entry -> entry.getKey().name(),
entry -> entry.getValue())));
}
});
return promise.future();
}
/**
* Completes the returned Future on the Vertx event loop
* with the topic config obtained from the Kafka AdminClient API.
* The Future completes with a null result a topic with the given {@code topicName} does not exist.
*/
@Override
public Future<TopicMetadata> topicMetadata(TopicName topicName) {
LOGGER.debug("Getting metadata for topic {}", topicName);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName.toString());
Future<TopicDescription> topicDescriptionFuture = mapFuture(adminClient.describeTopics(
singleton(topicName.toString())).values().get(topicName.toString()));
Future<Config> configFuture = mapFuture(adminClient.describeConfigs(
singleton(resource)).values().get(resource));
return CompositeFuture.all(topicDescriptionFuture, configFuture)
.map(compositeFuture ->
new TopicMetadata(compositeFuture.resultAt(0), compositeFuture.resultAt(1)))
.recover(error -> {
if (error instanceof UnknownTopicOrPartitionException) {
return Future.succeededFuture(null);
} else {
return Future.failedFuture(error);
}
});
}
@Test
public void testToTopicConfig() {
Topic topic = new Topic.Builder()
.withTopicName("test-topic")
.withConfigEntry("foo", "bar")
.withNumPartitions(3)
.withNumReplicas((short) 2)
.withMapName("gee")
.build();
Map<ConfigResource, Config> config = TopicSerialization.toTopicConfig(topic);
assertThat(config.size(), is(1));
Map.Entry<ConfigResource, Config> c = config.entrySet().iterator().next();
assertThat(c.getKey().type(), is(ConfigResource.Type.TOPIC));
assertThat(c.getValue().entries().size(), is(1));
assertThat(c.getKey().name(), is("test-topic"));
assertThat(c.getValue().get("foo").value(), is("bar"));
}
protected String alterTopicConfigInKafka(String topicName, String key, Function<String, String> mutator) throws InterruptedException, ExecutionException {
// Get the topic config
ConfigResource configResource = topicConfigResource(topicName);
org.apache.kafka.clients.admin.Config config = getTopicConfig(configResource);
Map<String, ConfigEntry> m = new HashMap<>();
for (ConfigEntry entry: config.entries()) {
if (entry.name().equals(key)
|| entry.source() != ConfigEntry.ConfigSource.DEFAULT_CONFIG
&& entry.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) {
m.put(entry.name(), entry);
}
}
final String changedValue = mutator.apply(m.get(key).value());
m.put(key, new ConfigEntry(key, changedValue));
LOGGER.info("Changing topic config {} to {}", key, changedValue);
// Update the topic config
AlterConfigsResult cgf = adminClient.alterConfigs(singletonMap(configResource,
new org.apache.kafka.clients.admin.Config(m.values())));
cgf.all().get();
return changedValue;
}
public Map<String, Duration> apply(Collection<String> topics) {
return Flux
.fromIterable(topics)
.map(name -> new ConfigResource(TOPIC, name))
.collectList()
.map(crs -> client.describeConfigs(crs).all())
.map(KafkaFutures::join)
.flatMapIterable(Map::entrySet)
.collectMap(e -> e.getKey().name(), this::retention)
.block();
}
private void updateTopicConfigPostAK23(Topic topic, String fullTopicName)
throws ExecutionException, InterruptedException {
Config currentConfigs = getActualTopicConfig(fullTopicName);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
ArrayList<AlterConfigOp> listOfValues = new ArrayList<>();
topic
.rawConfig()
.forEach(
(configKey, configValue) -> {
listOfValues.add(
new AlterConfigOp(new ConfigEntry(configKey, configValue), OpType.SET));
});
Set<String> newEntryKeys = topic.rawConfig().keySet();
currentConfigs
.entries()
.forEach(
entry -> {
if (!newEntryKeys.contains(entry.name())) {
listOfValues.add(new AlterConfigOp(entry, OpType.DELETE));
}
});
configs.put(new ConfigResource(Type.TOPIC, fullTopicName), listOfValues);
adminClient.incrementalAlterConfigs(configs).all().get();
}
private Config getActualTopicConfig(String topic)
throws ExecutionException, InterruptedException {
ConfigResource resource = new ConfigResource(Type.TOPIC, topic);
Collection<ConfigResource> resources = Collections.singletonList(resource);
Map<ConfigResource, Config> configs = adminClient.describeConfigs(resources).all().get();
return configs.get(resource);
}
/**
* Find cluster inter protocol version, used to determine the minimum level of Api compatibility
*
* @return String, the current Kafka Protocol version
*/
private String findKafkaVersion() throws IOException {
ConfigResource resource = new ConfigResource(Type.BROKER, "inter.broker.protocol.version");
String kafkaVersion = "";
try {
Map<ConfigResource, Config> configs =
adminClient.describeConfigs(Collections.singletonList(resource)).all().get();
kafkaVersion =
configs.get(resource).get("inter.broker.protocol.version").value().split("-")[0];
} catch (ExecutionException | InterruptedException e) {
LOGGER.error(e);
throw new IOException(e);
}
return kafkaVersion;
}
@Override
public void alterConfiguration(List<ConfiguredTopic> topics) {
AlterConfigsOptions options = dryRun ? new AlterConfigsOptions().validateOnly(true) : new AlterConfigsOptions();
Map<ConfigResource, Config> configs = topics
.stream()
.collect(toMap(t -> new ConfigResource(Type.TOPIC, t.getName()), TopicServiceImpl::resourceConfig));
try {
adminClient.alterConfigs(configs, options).all().get();
} catch (InterruptedException | ExecutionException e) {
// TODO: FA-10109: Improve exception handling
throw new RuntimeException(e);
}
}
@Test
public void testListExisting() {
Cluster cluster = createCluster(1);
TopicPartitionInfo tp = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.emptyList());
ConfigEntry configEntry = new ConfigEntry("k", "v");
KafkaFuture<Config> kfc = KafkaFuture.completedFuture(new Config(Collections.singletonList(configEntry)));
Set<String> topicNames = new HashSet<>(Arrays.asList("a", "b", "_c"));
Map<String, TopicDescription> tds = new HashMap<String, TopicDescription>() {
{
put("a", new TopicDescription("a", false, Collections.singletonList(tp)));
put("b", new TopicDescription("b", false, Collections.singletonList(tp)));
put("c", new TopicDescription("_c", false, Collections.singletonList(tp)));
}
};
Map<ConfigResource, KafkaFuture<Config>> configs = new HashMap<ConfigResource, KafkaFuture<Config>>() {
{
put(new ConfigResource(TOPIC, "a"), kfc);
put(new ConfigResource(TOPIC, "b"), kfc);
put(new ConfigResource(TOPIC, "_c"), kfc);
}
};
TopicService service = new TopicServiceImpl(adminClient, true);
ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
DescribeTopicsResult describeTopicsResult = mock(DescribeTopicsResult.class);
DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class);
when(describeTopicsResult.all()).thenReturn(KafkaFuture.completedFuture(tds));
when(listTopicsResult.names()).thenReturn(KafkaFuture.completedFuture(topicNames));
when(describeConfigsResult.values()).thenReturn(configs);
when(adminClient.listTopics(any(ListTopicsOptions.class))).thenReturn(listTopicsResult);
when(adminClient.describeTopics(topicNames)).thenReturn(describeTopicsResult);
when(adminClient.describeConfigs(any(Collection.class))).thenReturn(describeConfigsResult);
Map<String, ConfiguredTopic> actual = service.listExisting(true);
Assert.assertEquals(2, actual.size());
Assert.assertEquals(new HashSet<>(Arrays.asList("a", "b")), actual.keySet());
}
private static boolean isTopicDeleteEnabled(final AdminClient adminClient) {
try {
DescribeClusterResult describeClusterResult = adminClient.describeCluster();
Collection<Node> nodes = describeClusterResult.nodes().get();
if (nodes.isEmpty()) {
log.warn("No available broker found to fetch config info.");
throw new KsqlException("Could not fetch broker information. KSQL cannot initialize");
}
ConfigResource resource = new ConfigResource(
ConfigResource.Type.BROKER,
String.valueOf(nodes.iterator().next().id())
);
Map<ConfigResource, Config> config = executeWithRetries(
() -> adminClient.describeConfigs(Collections.singleton(resource)).all());
return config.get(resource)
.entries()
.stream()
.anyMatch(configEntry -> configEntry.name().equalsIgnoreCase("delete.topic.enable")
&& configEntry.value().equalsIgnoreCase("true"));
} catch (final Exception e) {
log.error("Failed to initialize TopicClient: {}", e.getMessage());
throw new KsqlException("Could not fetch broker information. KSQL cannot initialize", e);
}
}
@Test
public void shouldAddTopicConfig() {
final Map<String, ?> overrides = ImmutableMap.of(
TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT
);
expect(adminClient.describeConfigs(topicConfigsRequest("peter")))
.andReturn(topicConfigResponse(
"peter",
overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "12345"),
defaultConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")
));
expect(adminClient.alterConfigs(
withResourceConfig(
new ConfigResource(ConfigResource.Type.TOPIC, "peter"),
new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "12345"),
new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
)))
.andReturn(alterTopicConfigResponse());
replay(adminClient);
KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);
kafkaTopicClient.addTopicConfig("peter", overrides);
verify(adminClient);
}
private DescribeConfigsResult describeBrokerResult() {
DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class);
ConfigEntry configEntryDeleteEnable = new ConfigEntry("delete.topic.enable", "true");
List<ConfigEntry> configEntries = new ArrayList<>();
configEntries.add(configEntryDeleteEnable);
Map<ConfigResource, Config> config = ImmutableMap.of(
new ConfigResource(ConfigResource.Type.BROKER, node.idString()), new Config(configEntries));
expect(describeConfigsResult.all()).andReturn(KafkaFuture.completedFuture(config));
replay(describeConfigsResult);
return describeConfigsResult;
}
private static DescribeConfigsResult topicConfigResponse(final String topicName,
final ConfigEntry... entries) {
final Map<ConfigResource, Config> config = ImmutableMap.of(
new ConfigResource(ConfigResource.Type.TOPIC, topicName),
new Config(Arrays.asList(entries)));
final DescribeConfigsResult response = mock(DescribeConfigsResult.class);
expect(response.all()).andReturn(KafkaFuture.completedFuture(config));
replay(response);
return response;
}
private static Map<ConfigResource, Config> withResourceConfig(final ConfigResource resource,
final ConfigEntry... entries) {
final Set<ConfigEntry> expected = Arrays.stream(entries)
.collect(Collectors.toSet());
class ConfigMatcher implements IArgumentMatcher {
@SuppressWarnings("unchecked")
@Override
public boolean matches(final Object argument) {
final Map<ConfigResource, Config> request = (Map<ConfigResource, Config>)argument;
if (request.size() != 1) {
return false;
}
final Config config = request.get(resource);
if (config == null) {
return false;
}
final Set<ConfigEntry> actual = new HashSet<>(config.entries());
return actual.equals(expected);
}
@Override
public void appendTo(final StringBuffer buffer) {
buffer.append(resource).append("->")
.append("Config{").append(expected).append("}");
}
}
EasyMock.reportMatcher(new ConfigMatcher());
return null;
}
/**
* 查看 Topic 信息
*/
public static void describeTopicConfig() throws ExecutionException, InterruptedException {
AdminClient client = new AdminClientFactory().create();
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, AdminClientFactory.topic);
// 查看 topic 信息
DescribeConfigsResult result = client.describeConfigs(Collections.singleton(resource));
Config config = result.all().get().get(resource);
System.out.println(config);
// Config(entries=[ConfigEntry(name=compression.type, value=producer, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=leader.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.bytes, value=1073741824, source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.format.version, value=2.2-IV1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.message.bytes, value=1000012, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])])
client.close();
}
/**
* 修改 topic
*/
public static void alterTopicConfig() throws ExecutionException, InterruptedException {
AdminClient client = new AdminClientFactory().create();
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, AdminClientFactory.topic);
// 注意,设置log.cleanup.policy=compact启用压缩策略,发送数据将受影响,可能遇到发送失败的问题
ConfigEntry entry = new ConfigEntry("cleanup.policy", "compact");
Config config = new Config(Collections.singleton(entry));
Map<ConfigResource, Config> configs = new HashMap<>();
configs.put(resource, config);
AlterConfigsResult result = client.alterConfigs(configs);
result.all().get();
client.close();
}
public Set<ConfigEntry> getConfigEntriesForTopic(String topicName) {
final ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
final DescribeConfigsResult topicConfiEntries = kafkaClientsAdminClient.describeConfigs(Collections.singleton(configResource));
try {
final Config config = topicConfiEntries.all().get(ApplicationConstants.FUTURE_GET_TIMEOUT_MS, TimeUnit.MILLISECONDS).get(configResource);
final Collection<ConfigEntry> entries = config.entries();
Logger.debug(String.format("Config entries for topic '%s' : %n%s", topicName, AppUtils.configEntriesToPrettyString(entries)));
return new HashSet<>(entries);
} catch (Exception e) {
Logger.error(String.format("Could not retrieve config resource for topic '%s'", topicName), e);
}
return Collections.emptySet();
}