类org.apache.kafka.common.config.ConfigResource源码实例Demo

下面列出了怎么用org.apache.kafka.common.config.ConfigResource的API类实例代码及写法,或者点击链接到github查看源代码。

@Test
public void typical() throws Exception {
  String topic = "topic";
  Collection<String> topics = singleton(topic);
  ConfigResource configResource = new ConfigResource(TOPIC, topic);
  Config config = new Config(singleton(new ConfigEntry("retention.ms", "1")));
  KafkaFuture<Map<ConfigResource, Config>> kafkaFuture = completedFuture(singletonMap(configResource, config));

  doReturn(describeConfigsResult).when(adminClient).describeConfigs(any());
  doReturn(kafkaFuture).when(describeConfigsResult).all();

  Map<String, Duration> result = underTest.apply(topics);

  assertThat(result.size(), is(1));
  Duration retention = result.get(topic);
  assertThat(retention, is(Duration.ofMillis(1)));
}
 
源代码2 项目: kafka-topology-builder   文件: TopicManagerIT.java
private void verifyTopicConfiguration(
    String topic, HashMap<String, String> config, List<String> removedConfigs)
    throws ExecutionException, InterruptedException {

  ConfigResource resource = new ConfigResource(Type.TOPIC, topic);
  Collection<ConfigResource> resources = Collections.singletonList(resource);

  Map<ConfigResource, Config> configs = kafkaAdminClient.describeConfigs(resources).all().get();

  Config topicConfig = configs.get(resource);
  Assert.assertNotNull(topicConfig);

  topicConfig
      .entries()
      .forEach(
          entry -> {
            if (!entry.isDefault()) {
              if (config.get(entry.name()) != null)
                Assert.assertEquals(config.get(entry.name()), entry.value());
              Assert.assertFalse(removedConfigs.contains(entry.name()));
            }
          });
}
 
源代码3 项目: kafka-helmsman   文件: TopicServiceImpl.java
@Override
public Map<String, ConfiguredTopic> listExisting(boolean excludeInternal) {
  try {
    Set<String> topics = adminClient
        .listTopics(excludeInternal ? EXCLUDE_INTERNAL : INCLUDE_INTERNAL)
        .names().get();
    Collection<TopicDescription> topicDescriptions = adminClient.describeTopics(topics).all().get().values();

    List<ConfigResource> resources = topics
        .stream()
        .map(t -> new ConfigResource(Type.TOPIC, t))
        .collect(toList());

    Map<ConfigResource, KafkaFuture<Config>> topicConfigs = adminClient.describeConfigs(resources).values();

    return topicDescriptions
        .stream()
        .map(td -> configuredTopic(td, topicConfigs.get(new ConfigResource(Type.TOPIC, td.name()))))
        .filter(t -> !INTERNAL_TOPIC.test(t))
        .collect(toMap(ConfiguredTopic::getName, td -> td));

  } catch (InterruptedException | ExecutionException e) {
    // TODO: FA-10109: Improve exception handling
    throw new RuntimeException(e);
  }
}
 
源代码4 项目: kafka-helmsman   文件: TopicServiceImplTest.java
@Test
public void testAlterConfiguration() {
  TopicService service = new TopicServiceImpl(adminClient, true);
  AlterConfigsResult result = mock(AlterConfigsResult.class);
  when(result.all()).thenReturn(KafkaFuture.completedFuture(null));
  when(adminClient.alterConfigs(any(Map.class), any(AlterConfigsOptions.class))).thenReturn(result);

  service.alterConfiguration(Collections.singletonList(
      new ConfiguredTopic("test", 3, (short) 2, Collections.singletonMap("k", "v"))));

  ArgumentCaptor<Map> alter = ArgumentCaptor.forClass(Map.class);
  ArgumentCaptor<AlterConfigsOptions> options = ArgumentCaptor.forClass(AlterConfigsOptions.class);
  verify(adminClient).alterConfigs((Map<ConfigResource, Config>) alter.capture(), options.capture());
  Assert.assertEquals(1, alter.getValue().size());
  ConfigResource expectedKey = new ConfigResource(TOPIC, "test");
  Assert.assertTrue(alter.getValue().containsKey(expectedKey));
  Assert.assertEquals("v", ((Config) alter.getValue().get(expectedKey)).get("k").value());
  Assert.assertTrue(options.getValue().shouldValidateOnly());
}
 
public static void describeTopicConfig() throws ExecutionException,
        InterruptedException {
    String brokerList =  "localhost:9092";
    String topic = "topic-admin";

    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
    AdminClient client = AdminClient.create(props);

    ConfigResource resource =
            new ConfigResource(ConfigResource.Type.TOPIC, topic);
    DescribeConfigsResult result =
            client.describeConfigs(Collections.singleton(resource));
    Config config = result.all().get().get(resource);
    System.out.println(config);
    client.close();
}
 
public static void alterTopicConfig() throws ExecutionException, InterruptedException {
    String brokerList =  "localhost:9092";
    String topic = "topic-admin";

    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
    AdminClient client = AdminClient.create(props);

    ConfigResource resource =
            new ConfigResource(ConfigResource.Type.TOPIC, topic);
    ConfigEntry entry = new ConfigEntry("cleanup.policy", "compact");
    Config config = new Config(Collections.singleton(entry));
    Map<ConfigResource, Config> configs = new HashMap<>();
    configs.put(resource, config);
    AlterConfigsResult result = client.alterConfigs(configs);
    result.all().get();

    client.close();
}
 
源代码7 项目: common-kafka   文件: KafkaAdminClient.java
/**
 * Updates the given topic's config with the {@link Properties} provided. This is not additive but a full
 * replacement
 *
 * @param topic
 *      the topic to update config for
 * @param properties
 *      the properties to assign to the topic
 * @throws IllegalArgumentException
 *      if topic is null, empty or blank, or properties is {@code null}
 * @throws AdminOperationException
 *      if there is an issue updating the topic config
 */
public void updateTopicConfig(String topic, Properties properties) {
    if (StringUtils.isBlank(topic))
        throw new IllegalArgumentException("topic cannot be null, empty or blank");
    if (properties == null)
        throw new IllegalArgumentException("properties cannot be null");

    LOG.debug("Updating topic config for topic [{}] with config [{}]", topic, properties);

    try {
        List<ConfigEntry> configEntries = new ArrayList<>();
        for (String property : properties.stringPropertyNames()) {
            configEntries.add(new ConfigEntry(property, properties.getProperty(property)));
        }

        getNewAdminClient()
            .alterConfigs(
                Collections.singletonMap(
                    new ConfigResource(ConfigResource.Type.TOPIC, topic),
                    new Config(configEntries)))
            .all()
            .get(operationTimeout, TimeUnit.MILLISECONDS);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        throw new AdminOperationException("Unable to update configuration for topic: " + topic, e);
    }
}
 
/**
 * Retrieve topic minISR config information if it is not cached locally.
 * @param topicsToCheck Set of topics to check.
 */
private void maybeRetrieveAndCacheTopicMinISR(Set<String> topicsToCheck) {
  Set<ConfigResource> topicResourcesToCheck = new HashSet<>(topicsToCheck.size());
  topicsToCheck.stream().filter(t -> !_cachedTopicMinISR.containsKey(t))
                        .forEach(t -> topicResourcesToCheck.add(new ConfigResource(ConfigResource.Type.TOPIC, t)));
  if (topicResourcesToCheck.isEmpty()) {
    return;
  }
  for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : _adminClient.describeConfigs(topicResourcesToCheck).values().entrySet()) {
    try {
      short topicMinISR = Short.parseShort(entry.getValue().get(DESCRIBE_TOPIC_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS)
                                                .get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value());
      _cachedTopicMinISR.put(entry.getKey().name(), new TopicMinISREntry(topicMinISR, System.currentTimeMillis()));
    } catch (TimeoutException | InterruptedException | ExecutionException e) {
      LOG.warn("Skip attempt to fix replication factor of topic {} due to unable to retrieve its minISR config.",
               entry.getKey().name());
    }
  }
}
 
protected void maybeUpdateTopicConfig() {
  try {
    // Retrieve topic config to check and update.
    ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, _cruiseControlMetricsTopic);
    DescribeConfigsResult describeConfigsResult = _adminClient.describeConfigs(Collections.singleton(topicResource));
    Config topicConfig = describeConfigsResult.values().get(topicResource).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    Set<AlterConfigOp> alterConfigOps = new HashSet<>(2);
    Map<String, String> configsToSet = new HashMap<>(2);
    configsToSet.put(LogConfig.RetentionMsProp(), _metricsTopic.configs().get(LogConfig.RetentionMsProp()));
    configsToSet.put(LogConfig.CleanupPolicyProp(), _metricsTopic.configs().get(LogConfig.CleanupPolicyProp()));
    maybeUpdateConfig(alterConfigOps, configsToSet, topicConfig);
    if (!alterConfigOps.isEmpty()) {
      AlterConfigsResult alterConfigsResult = _adminClient.incrementalAlterConfigs(Collections.singletonMap(topicResource, alterConfigOps));
      alterConfigsResult.values().get(topicResource).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
    }
  } catch (InterruptedException | ExecutionException | TimeoutException e) {
    LOG.warn("Unable to update config of Cruise Cruise Control metrics topic {}", _cruiseControlMetricsTopic, e);
  }
}
 
源代码10 项目: kafka-eagle   文件: KafkaMetricsServiceImpl.java
private String deleteTopicConfig(String clusterAlias, AdminClient adminClient, String topic, ConfigEntry configEntry) {
	try {
		String describeTopicConfigs = describeTopicConfig(clusterAlias, topic);
		JSONObject object = JSON.parseObject(describeTopicConfigs).getJSONObject("config");
		object.remove(configEntry.name());
		List<ConfigEntry> configEntrys = new ArrayList<>();
		for (String key : KConstants.Topic.getTopicConfigKeys()) {
			if (object.containsKey(key)) {
				configEntrys.add(new ConfigEntry(key, object.getString(key)));
			}
		}
		Map<ConfigResource, Config> configs = new HashMap<>();
		ConfigResource configRes = new ConfigResource(Type.TOPIC, topic);
		Config config = new Config(configEntrys);
		configs.put(configRes, config);
		adminClient.alterConfigs(configs);
		return KConstants.Topic.SUCCESS;
	} catch (Exception e) {
		e.printStackTrace();
		LOG.error("Delete topic[" + topic + "] config has error, msg is " + e.getMessage());
		return e.getMessage();
	}
}
 
/**
 * Simple smoke test to ensure broker running appropriate listeners.
 */
@Test
void validateListener() throws ExecutionException, InterruptedException {
    try (final AdminClient adminClient  = getKafkaTestUtils().getAdminClient()) {
        final ConfigResource broker1Resource = new ConfigResource(ConfigResource.Type.BROKER, "1");

        // Pull broker configs
        final Config configResult = adminClient
            .describeConfigs(Collections.singletonList(broker1Resource))
            .values()
            .get(broker1Resource)
            .get();

        // Check listener
        final String actualListener = configResult.get("listeners").value();
        Assertions.assertTrue(
            actualListener.contains(getExpectedListenerProtocol() + "://"),
            "Expected " + getExpectedListenerProtocol() + ":// and found: " + actualListener);

        // Check inter broker protocol
        final String actualBrokerProtocol = configResult.get("security.inter.broker.protocol").value();
        Assertions.assertEquals(getExpectedListenerProtocol(), actualBrokerProtocol, "Unexpected inter-broker protocol");
    }
}
 
源代码12 项目: javabase   文件: AdminClientTest.java
/**
 * describe topic‘s config
 * 
 * @param client
 */
public static void describeConfig(AdminClient client)
        throws ExecutionException, InterruptedException {
    DescribeConfigsResult ret = client.describeConfigs(
            Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, TEST_TOPIC)));
    Map<ConfigResource, Config> configs = ret.all().get();
    for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
        ConfigResource key = entry.getKey();
        Config value = entry.getValue();
        System.out.println(
                String.format("Resource type: %s, resource name: %s", key.type(), key.name()));
        Collection<ConfigEntry> configEntries = value.entries();
        for (ConfigEntry each : configEntries) {
            System.out.println(each.name() + " = " + each.value());
        }
    }

}
 
源代码13 项目: strimzi-kafka-operator   文件: KafkaAvailability.java
private Future<Map<String, Config>> topicConfigs(Collection<String> topicNames) {
    log.debug("Getting topic configs for {} topics", topicNames.size());
    List<ConfigResource> configs = topicNames.stream()
            .map((String topicName) -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
            .collect(Collectors.toList());
    Promise<Map<String, Config>> promise = Promise.promise();
    ac.describeConfigs(configs).all().whenComplete((topicNameToConfig, error) -> {
        if (error != null) {
            promise.fail(error);
        } else {
            log.debug("Got topic configs for {} topics", topicNames.size());
            promise.complete(topicNameToConfig.entrySet().stream()
                    .collect(Collectors.toMap(
                        entry -> entry.getKey().name(),
                        entry -> entry.getValue())));
        }
    });
    return promise.future();
}
 
源代码14 项目: strimzi-kafka-operator   文件: KafkaImpl.java
/**
 * Completes the returned Future on the Vertx event loop
 * with the topic config obtained from the Kafka AdminClient API.
 * The Future completes with a null result a topic with the given {@code topicName} does not exist.
 */
@Override
public Future<TopicMetadata> topicMetadata(TopicName topicName) {
    LOGGER.debug("Getting metadata for topic {}", topicName);
    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName.toString());
    Future<TopicDescription> topicDescriptionFuture = mapFuture(adminClient.describeTopics(
            singleton(topicName.toString())).values().get(topicName.toString()));
    Future<Config> configFuture = mapFuture(adminClient.describeConfigs(
            singleton(resource)).values().get(resource));
    return CompositeFuture.all(topicDescriptionFuture, configFuture)
    .map(compositeFuture ->
        new TopicMetadata(compositeFuture.resultAt(0), compositeFuture.resultAt(1)))
        .recover(error -> {
            if (error instanceof UnknownTopicOrPartitionException) {
                return Future.succeededFuture(null);
            } else {
                return Future.failedFuture(error);
            }
        });
}
 
@Test
public void testToTopicConfig() {
    Topic topic = new Topic.Builder()
            .withTopicName("test-topic")
            .withConfigEntry("foo", "bar")
            .withNumPartitions(3)
            .withNumReplicas((short) 2)
            .withMapName("gee")
            .build();
    Map<ConfigResource, Config> config = TopicSerialization.toTopicConfig(topic);
    assertThat(config.size(), is(1));
    Map.Entry<ConfigResource, Config> c = config.entrySet().iterator().next();
    assertThat(c.getKey().type(), is(ConfigResource.Type.TOPIC));
    assertThat(c.getValue().entries().size(), is(1));
    assertThat(c.getKey().name(), is("test-topic"));
    assertThat(c.getValue().get("foo").value(), is("bar"));
}
 
protected String alterTopicConfigInKafka(String topicName, String key, Function<String, String> mutator) throws InterruptedException, ExecutionException {
    // Get the topic config
    ConfigResource configResource = topicConfigResource(topicName);
    org.apache.kafka.clients.admin.Config config = getTopicConfig(configResource);

    Map<String, ConfigEntry> m = new HashMap<>();
    for (ConfigEntry entry: config.entries()) {
        if (entry.name().equals(key)
            || entry.source() != ConfigEntry.ConfigSource.DEFAULT_CONFIG
                && entry.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) {
            m.put(entry.name(), entry);
        }
    }
    final String changedValue = mutator.apply(m.get(key).value());
    m.put(key, new ConfigEntry(key, changedValue));
    LOGGER.info("Changing topic config {} to {}", key, changedValue);

    // Update the topic config
    AlterConfigsResult cgf = adminClient.alterConfigs(singletonMap(configResource,
            new org.apache.kafka.clients.admin.Config(m.values())));
    cgf.all().get();
    return changedValue;
}
 
源代码17 项目: data-highway   文件: RetentionByTopicFunction.java
public Map<String, Duration> apply(Collection<String> topics) {
  return Flux
      .fromIterable(topics)
      .map(name -> new ConfigResource(TOPIC, name))
      .collectList()
      .map(crs -> client.describeConfigs(crs).all())
      .map(KafkaFutures::join)
      .flatMapIterable(Map::entrySet)
      .collectMap(e -> e.getKey().name(), this::retention)
      .block();
}
 
private void updateTopicConfigPostAK23(Topic topic, String fullTopicName)
    throws ExecutionException, InterruptedException {

  Config currentConfigs = getActualTopicConfig(fullTopicName);

  Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
  ArrayList<AlterConfigOp> listOfValues = new ArrayList<>();

  topic
      .rawConfig()
      .forEach(
          (configKey, configValue) -> {
            listOfValues.add(
                new AlterConfigOp(new ConfigEntry(configKey, configValue), OpType.SET));
          });
  Set<String> newEntryKeys = topic.rawConfig().keySet();

  currentConfigs
      .entries()
      .forEach(
          entry -> {
            if (!newEntryKeys.contains(entry.name())) {
              listOfValues.add(new AlterConfigOp(entry, OpType.DELETE));
            }
          });

  configs.put(new ConfigResource(Type.TOPIC, fullTopicName), listOfValues);

  adminClient.incrementalAlterConfigs(configs).all().get();
}
 
private Config getActualTopicConfig(String topic)
    throws ExecutionException, InterruptedException {
  ConfigResource resource = new ConfigResource(Type.TOPIC, topic);
  Collection<ConfigResource> resources = Collections.singletonList(resource);

  Map<ConfigResource, Config> configs = adminClient.describeConfigs(resources).all().get();

  return configs.get(resource);
}
 
/**
 * Find cluster inter protocol version, used to determine the minimum level of Api compatibility
 *
 * @return String, the current Kafka Protocol version
 */
private String findKafkaVersion() throws IOException {
  ConfigResource resource = new ConfigResource(Type.BROKER, "inter.broker.protocol.version");
  String kafkaVersion = "";
  try {
    Map<ConfigResource, Config> configs =
        adminClient.describeConfigs(Collections.singletonList(resource)).all().get();
    kafkaVersion =
        configs.get(resource).get("inter.broker.protocol.version").value().split("-")[0];
  } catch (ExecutionException | InterruptedException e) {
    LOGGER.error(e);
    throw new IOException(e);
  }
  return kafkaVersion;
}
 
源代码21 项目: kafka-helmsman   文件: TopicServiceImpl.java
@Override
public void alterConfiguration(List<ConfiguredTopic> topics) {
  AlterConfigsOptions options = dryRun ? new AlterConfigsOptions().validateOnly(true) : new AlterConfigsOptions();
  Map<ConfigResource, Config> configs = topics
      .stream()
      .collect(toMap(t -> new ConfigResource(Type.TOPIC, t.getName()), TopicServiceImpl::resourceConfig));
  try {
    adminClient.alterConfigs(configs, options).all().get();
  } catch (InterruptedException | ExecutionException e) {
    // TODO: FA-10109: Improve exception handling
    throw new RuntimeException(e);
  }
}
 
源代码22 项目: kafka-helmsman   文件: TopicServiceImplTest.java
@Test
public void testListExisting() {
  Cluster cluster = createCluster(1);
  TopicPartitionInfo tp = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.emptyList());
  ConfigEntry configEntry = new ConfigEntry("k", "v");
  KafkaFuture<Config> kfc = KafkaFuture.completedFuture(new Config(Collections.singletonList(configEntry)));
  Set<String> topicNames = new HashSet<>(Arrays.asList("a", "b", "_c"));
  Map<String, TopicDescription> tds = new HashMap<String, TopicDescription>() {
    {
      put("a", new TopicDescription("a", false, Collections.singletonList(tp)));
      put("b", new TopicDescription("b", false, Collections.singletonList(tp)));
      put("c", new TopicDescription("_c", false, Collections.singletonList(tp)));
    }
  };
  Map<ConfigResource, KafkaFuture<Config>> configs = new HashMap<ConfigResource, KafkaFuture<Config>>() {
    {
      put(new ConfigResource(TOPIC, "a"), kfc);
      put(new ConfigResource(TOPIC, "b"), kfc);
      put(new ConfigResource(TOPIC, "_c"), kfc);
    }
  };

  TopicService service = new TopicServiceImpl(adminClient, true);
  ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
  DescribeTopicsResult describeTopicsResult = mock(DescribeTopicsResult.class);
  DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class);

  when(describeTopicsResult.all()).thenReturn(KafkaFuture.completedFuture(tds));
  when(listTopicsResult.names()).thenReturn(KafkaFuture.completedFuture(topicNames));
  when(describeConfigsResult.values()).thenReturn(configs);
  when(adminClient.listTopics(any(ListTopicsOptions.class))).thenReturn(listTopicsResult);
  when(adminClient.describeTopics(topicNames)).thenReturn(describeTopicsResult);
  when(adminClient.describeConfigs(any(Collection.class))).thenReturn(describeConfigsResult);

  Map<String, ConfiguredTopic> actual = service.listExisting(true);
  Assert.assertEquals(2, actual.size());
  Assert.assertEquals(new HashSet<>(Arrays.asList("a", "b")), actual.keySet());
}
 
private static boolean isTopicDeleteEnabled(final AdminClient adminClient) {
  try {
    DescribeClusterResult describeClusterResult = adminClient.describeCluster();
    Collection<Node> nodes = describeClusterResult.nodes().get();
    if (nodes.isEmpty()) {
      log.warn("No available broker found to fetch config info.");
      throw new KsqlException("Could not fetch broker information. KSQL cannot initialize");
    }

    ConfigResource resource = new ConfigResource(
        ConfigResource.Type.BROKER,
        String.valueOf(nodes.iterator().next().id())
    );

    Map<ConfigResource, Config> config = executeWithRetries(
        () -> adminClient.describeConfigs(Collections.singleton(resource)).all());

    return config.get(resource)
        .entries()
        .stream()
        .anyMatch(configEntry -> configEntry.name().equalsIgnoreCase("delete.topic.enable")
                                 && configEntry.value().equalsIgnoreCase("true"));

  } catch (final Exception e) {
    log.error("Failed to initialize TopicClient: {}", e.getMessage());
    throw new KsqlException("Could not fetch broker information. KSQL cannot initialize", e);
  }
}
 
@Test
public void shouldAddTopicConfig() {
  final Map<String, ?> overrides = ImmutableMap.of(
      TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT
  );

  expect(adminClient.describeConfigs(topicConfigsRequest("peter")))
      .andReturn(topicConfigResponse(
          "peter",
          overriddenConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "12345"),
          defaultConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")
      ));

  expect(adminClient.alterConfigs(
      withResourceConfig(
          new ConfigResource(ConfigResource.Type.TOPIC, "peter"),
          new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "12345"),
          new ConfigEntry(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
      )))
      .andReturn(alterTopicConfigResponse());
  replay(adminClient);

  KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(adminClient);
  kafkaTopicClient.addTopicConfig("peter", overrides);

  verify(adminClient);
}
 
private DescribeConfigsResult describeBrokerResult() {
  DescribeConfigsResult describeConfigsResult = mock(DescribeConfigsResult.class);
  ConfigEntry configEntryDeleteEnable = new ConfigEntry("delete.topic.enable", "true");
  List<ConfigEntry> configEntries = new ArrayList<>();
  configEntries.add(configEntryDeleteEnable);
  Map<ConfigResource, Config> config = ImmutableMap.of(
      new ConfigResource(ConfigResource.Type.BROKER, node.idString()), new Config(configEntries));
  expect(describeConfigsResult.all()).andReturn(KafkaFuture.completedFuture(config));
  replay(describeConfigsResult);
  return describeConfigsResult;
}
 
private static DescribeConfigsResult topicConfigResponse(final String topicName,
                                                         final ConfigEntry... entries) {

  final Map<ConfigResource, Config> config = ImmutableMap.of(
      new ConfigResource(ConfigResource.Type.TOPIC, topicName),
      new Config(Arrays.asList(entries)));

  final DescribeConfigsResult response = mock(DescribeConfigsResult.class);
  expect(response.all()).andReturn(KafkaFuture.completedFuture(config));
  replay(response);
  return response;
}
 
private static Map<ConfigResource, Config> withResourceConfig(final ConfigResource resource,
                                                              final ConfigEntry... entries) {
  final Set<ConfigEntry> expected = Arrays.stream(entries)
      .collect(Collectors.toSet());

  class ConfigMatcher implements IArgumentMatcher {
    @SuppressWarnings("unchecked")
    @Override
    public boolean matches(final Object argument) {
      final Map<ConfigResource, Config> request = (Map<ConfigResource, Config>)argument;
      if (request.size() != 1) {
        return false;
      }

      final Config config = request.get(resource);
      if (config == null) {
        return false;
      }

      final Set<ConfigEntry> actual = new HashSet<>(config.entries());
      return actual.equals(expected);
    }

    @Override
    public void appendTo(final StringBuffer buffer) {
      buffer.append(resource).append("->")
          .append("Config{").append(expected).append("}");
    }
  }
  EasyMock.reportMatcher(new ConfigMatcher());
  return null;
}
 
/**
 * 查看 Topic 信息
 */
public static void describeTopicConfig() throws ExecutionException, InterruptedException {
    AdminClient client = new AdminClientFactory().create();
    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, AdminClientFactory.topic);
    // 查看 topic 信息
    DescribeConfigsResult result = client.describeConfigs(Collections.singleton(resource));
    Config config = result.all().get().get(resource);
    System.out.println(config);
    // Config(entries=[ConfigEntry(name=compression.type, value=producer, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=leader.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.bytes, value=1073741824, source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.format.version, value=2.2-IV1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=max.message.bytes, value=1000012, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]), ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])])
    client.close();
}
 
/**
 * 修改 topic
 */
public static void alterTopicConfig() throws ExecutionException, InterruptedException {
    AdminClient client = new AdminClientFactory().create();
    ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, AdminClientFactory.topic);
    // 注意,设置log.cleanup.policy=compact启用压缩策略,发送数据将受影响,可能遇到发送失败的问题
    ConfigEntry entry = new ConfigEntry("cleanup.policy", "compact");
    Config config = new Config(Collections.singleton(entry));
    Map<ConfigResource, Config> configs = new HashMap<>();
    configs.put(resource, config);
    AlterConfigsResult result = client.alterConfigs(configs);
    result.all().get();
    client.close();
}
 
源代码30 项目: kafka-message-tool   文件: TopicAdmin.java
public Set<ConfigEntry> getConfigEntriesForTopic(String topicName) {
    final ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
    final DescribeConfigsResult topicConfiEntries = kafkaClientsAdminClient.describeConfigs(Collections.singleton(configResource));
    try {
        final Config config = topicConfiEntries.all().get(ApplicationConstants.FUTURE_GET_TIMEOUT_MS, TimeUnit.MILLISECONDS).get(configResource);
        final Collection<ConfigEntry> entries = config.entries();
        Logger.debug(String.format("Config entries for topic '%s' : %n%s", topicName, AppUtils.configEntriesToPrettyString(entries)));
        return new HashSet<>(entries);
    } catch (Exception e) {
        Logger.error(String.format("Could not retrieve config resource for topic '%s'", topicName), e);
    }
    return Collections.emptySet();
}
 
 类所在包
 类方法
 同包方法