下面列出了怎么用org.springframework.boot.autoconfigure.kafka.KafkaProperties的API类实例代码及写法,或者点击链接到github查看源代码。
@Bean
@ConditionalOnMissingBean(name = "kafkaMessageLogReceiverEndpointFactory")
public MessageLogReceiverEndpointFactory kafkaMessageLogReceiverEndpointFactory(final KafkaProperties kafkaProperties,
final MessageInterceptorRegistry interceptorRegistry,
final ApplicationEventPublisher eventPublisher,
final ConsumerFactory<String, String> kafkaConsumerFactory) {
LOG.info("Auto-configuring Kafka MessageLogReceiverEndpointFactory");
final ExecutorService executorService = newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("kafka-message-log-%d").build()
);
final KafkaConsumer<String, String> kafkaConsumer = (KafkaConsumer<String, String>)kafkaConsumerFactory.createConsumer();
return new KafkaMessageLogReceiverEndpointFactory(
interceptorRegistry,
kafkaConsumer,
executorService,
eventPublisher);
}
@Test
public void brokersInvalid() throws Exception {
KafkaProperties bootConfig = new KafkaProperties();
KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties(
bootConfig);
binderConfig.getConfiguration().put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"localhost:1234");
try {
new KafkaTopicProvisioner(binderConfig, bootConfig);
fail("Expected illegal state");
}
catch (IllegalStateException e) {
assertThat(e.getMessage()).isEqualTo(
"Set binder bootstrap servers via the 'brokers' property, not 'configuration'");
}
}
@Test
public void testStateStoreRetrievalRetry() {
StreamsBuilderFactoryBean mock = Mockito.mock(StreamsBuilderFactoryBean.class);
KafkaStreams mockKafkaStreams = Mockito.mock(KafkaStreams.class);
Mockito.when(mock.getKafkaStreams()).thenReturn(mockKafkaStreams);
KafkaStreamsRegistry kafkaStreamsRegistry = new KafkaStreamsRegistry();
kafkaStreamsRegistry.registerKafkaStreams(mock);
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties =
new KafkaStreamsBinderConfigurationProperties(new KafkaProperties());
binderConfigurationProperties.getStateStoreRetry().setMaxAttempts(3);
InteractiveQueryService interactiveQueryService = new InteractiveQueryService(kafkaStreamsRegistry,
binderConfigurationProperties);
QueryableStoreType<ReadOnlyKeyValueStore<Object, Object>> storeType = QueryableStoreTypes.keyValueStore();
try {
interactiveQueryService.getQueryableStore("foo", storeType);
}
catch (Exception ignored) {
}
Mockito.verify(mockKafkaStreams, times(3)).store("foo", storeType);
}
@Test
public void testMergedConsumerProperties() {
KafkaProperties bootProps = new TestKafkaProperties();
bootProps.getConsumer().getProperties()
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "bar");
KafkaBinderConfigurationProperties props = new KafkaBinderConfigurationProperties(
bootProps);
assertThat(props.mergedConsumerConfiguration()
.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("bar");
props.getConfiguration().put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "baz");
assertThat(props.mergedConsumerConfiguration()
.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("baz");
props.getConsumerProperties().put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "qux");
assertThat(props.mergedConsumerConfiguration()
.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("qux");
}
@Bean
@ConditionalOnMissingBean(name="kafkaConsumerFactory")
public ConsumerFactory<String, String> kafkaConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(
kafkaProperties.buildConsumerProperties(),
new StringDeserializer(),
new StringDeserializer());
}
/**
* Create an instance.
* @param kafkaBinderConfigurationProperties the binder configuration properties.
* @param kafkaProperties the boot Kafka properties used to build the
* {@link AdminClient}.
*/
public KafkaTopicProvisioner(
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties) {
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
this.adminClientProperties = kafkaProperties.buildAdminProperties();
this.configurationProperties = kafkaBinderConfigurationProperties;
normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties,
kafkaBinderConfigurationProperties);
}
/**
* In general, binder properties supersede boot kafka properties. The one exception is
* the bootstrap servers. In that case, we should only override the boot properties if
* (there is a binder property AND it is a non-default value) OR (if there is no boot
* property); this is needed because the binder property never returns a null value.
* @param adminProps the admin properties to normalize.
* @param bootProps the boot kafka properties.
* @param binderProps the binder kafka properties.
*/
public static void normalalizeBootPropsWithBinder(Map<String, Object> adminProps,
KafkaProperties bootProps, KafkaBinderConfigurationProperties binderProps) {
// First deal with the outlier
String kafkaConnectionString = binderProps.getKafkaConnectionString();
if (ObjectUtils
.isEmpty(adminProps.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG))
|| !kafkaConnectionString
.equals(binderProps.getDefaultKafkaConnectionString())) {
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaConnectionString);
}
// Now override any boot values with binder values
Map<String, String> binderProperties = binderProps.getConfiguration();
Set<String> adminConfigNames = AdminClientConfig.configNames();
binderProperties.forEach((key, value) -> {
if (key.equals(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) {
throw new IllegalStateException(
"Set binder bootstrap servers via the 'brokers' property, not 'configuration'");
}
if (adminConfigNames.contains(key)) {
Object replaced = adminProps.put(key, value);
if (replaced != null && KafkaTopicProvisioner.logger.isDebugEnabled()) {
KafkaTopicProvisioner.logger.debug("Overrode boot property: [" + key + "], from: ["
+ replaced + "] to: [" + value + "]");
}
}
});
}
@SuppressWarnings("rawtypes")
@Test
public void bootPropertiesOverriddenExceptServers() throws Exception {
KafkaProperties bootConfig = new KafkaProperties();
bootConfig.getProperties().put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"PLAINTEXT");
bootConfig.setBootstrapServers(Collections.singletonList("localhost:1234"));
KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties(
bootConfig);
binderConfig.getConfiguration().put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
"SSL");
ClassPathResource ts = new ClassPathResource("test.truststore.ks");
binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:9092");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig,
bootConfig);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
Map configs = KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder.configs", Map.class);
assertThat(
((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0))
.isEqualTo("localhost:1234");
adminClient.close();
}
@SuppressWarnings("rawtypes")
@Test
public void bootPropertiesOverriddenIncludingServers() throws Exception {
KafkaProperties bootConfig = new KafkaProperties();
bootConfig.getProperties().put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"PLAINTEXT");
bootConfig.setBootstrapServers(Collections.singletonList("localhost:9092"));
KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties(
bootConfig);
binderConfig.getConfiguration().put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
"SSL");
ClassPathResource ts = new ClassPathResource("test.truststore.ks");
binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:1234");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig,
bootConfig);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
Map configs = KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder.configs", Map.class);
assertThat(
((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0))
.isEqualTo("localhost:1234");
adminClient.close();
}
@Test
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.getConsumer().setGroupId("group1");
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
Map<String, Object> mergedConsumerConfiguration =
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.getConsumer().setEnableAutoCommit(true);
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
Map<String, Object> mergedConsumerConfiguration =
kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConfiguration() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
kafkaBinderConfigurationProperties
.setConfiguration(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConfiguration() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
kafkaBinderConfigurationProperties
.setConfiguration(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersGroupIdFromKafkaBinderConfigurationPropertiesConsumerProperties() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
kafkaBinderConfigurationProperties
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.GROUP_ID_CONFIG, "group1"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
@Test
public void mergedConsumerConfigurationFiltersEnableAutoCommitFromKafkaBinderConfigurationPropertiesConsumerProps() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties =
new KafkaBinderConfigurationProperties(kafkaProperties);
kafkaBinderConfigurationProperties
.setConsumerProperties(Collections.singletonMap(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"));
Map<String, Object> mergedConsumerConfiguration = kafkaBinderConfigurationProperties.mergedConsumerConfiguration();
assertThat(mergedConsumerConfiguration).doesNotContainKeys(ConsumerConfig.GROUP_ID_CONFIG);
}
KafkaStreamsBinderHealthIndicator(KafkaStreamsRegistry kafkaStreamsRegistry,
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
KafkaProperties kafkaProperties,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
super("Kafka-streams health check failed");
kafkaProperties.buildAdminProperties();
this.configurationProperties = kafkaStreamsBinderConfigurationProperties;
this.adminClientProperties = kafkaProperties.buildAdminProperties();
KafkaTopicProvisioner.normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties,
kafkaStreamsBinderConfigurationProperties);
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
}
@Bean
public KafkaTopicProvisioner provisioningProvider(
KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
KafkaProperties kafkaProperties) {
return new KafkaTopicProvisioner(kafkaStreamsBinderConfigurationProperties,
kafkaProperties);
}
@Bean
@ConditionalOnBean(KafkaStreamsRegistry.class)
KafkaStreamsBinderHealthIndicator kafkaStreamsBinderHealthIndicator(
KafkaStreamsRegistry kafkaStreamsRegistry, @Qualifier("binderConfigurationProperties")KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties,
KafkaProperties kafkaProperties, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
return new KafkaStreamsBinderHealthIndicator(kafkaStreamsRegistry, kafkaStreamsBinderConfigurationProperties,
kafkaProperties, kafkaStreamsBindingInformationCatalogue);
}
@Bean
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.streams.binder")
public KafkaStreamsBinderConfigurationProperties binderConfigurationProperties(
KafkaProperties kafkaProperties, ConfigurableEnvironment environment,
BindingServiceProperties properties, ConfigurableApplicationContext context) throws Exception {
final Map<String, BinderConfiguration> binderConfigurations = getBinderConfigurations(
properties);
for (Map.Entry<String, BinderConfiguration> entry : binderConfigurations
.entrySet()) {
final BinderConfiguration binderConfiguration = entry.getValue();
final String binderType = binderConfiguration.getBinderType();
if (binderType != null && (binderType.equals(KSTREAM_BINDER_TYPE)
|| binderType.equals(KTABLE_BINDER_TYPE)
|| binderType.equals(GLOBALKTABLE_BINDER_TYPE))) {
Map<String, Object> binderProperties = new HashMap<>();
this.flatten(null, binderConfiguration.getProperties(), binderProperties);
environment.getPropertySources().addFirst(
new MapPropertySource(entry.getKey() + "-kafkaStreamsBinderEnv", binderProperties));
Binder binder = new Binder(ConfigurationPropertySources.get(environment),
new PropertySourcesPlaceholdersResolver(environment),
IntegrationUtils.getConversionService(context.getBeanFactory()), null);
final Constructor<KafkaStreamsBinderConfigurationProperties> kafkaStreamsBinderConfigurationPropertiesConstructor =
ReflectionUtils.accessibleConstructor(KafkaStreamsBinderConfigurationProperties.class, KafkaProperties.class);
final KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties =
BeanUtils.instantiateClass(kafkaStreamsBinderConfigurationPropertiesConstructor, kafkaProperties);
final BindResult<KafkaStreamsBinderConfigurationProperties> bind = binder.bind("spring.cloud.stream.kafka.streams.binder", Bindable.ofInstance(kafkaStreamsBinderConfigurationProperties));
context.getBeanFactory().registerSingleton(
entry.getKey() + "-KafkaStreamsBinderConfigurationProperties",
bind.get());
}
}
return new KafkaStreamsBinderConfigurationProperties(kafkaProperties);
}
@Bean
public KafkaStreamsConfiguration kafkaStreamsConfiguration(
@Qualifier("binderConfigurationProperties") KafkaStreamsBinderConfigurationProperties properties,
Environment environment) {
KafkaProperties kafkaProperties = properties.getKafkaProperties();
Map<String, Object> streamsProperties = kafkaProperties.buildStreamsProperties();
if (kafkaProperties.getStreams().getApplicationId() == null) {
String applicationName = environment.getProperty("spring.application.name");
if (applicationName != null) {
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
applicationName);
}
}
return new KafkaStreamsConfiguration(streamsProperties);
}
@Test
public void testMergedProducerProperties() {
KafkaProperties bootProps = new TestKafkaProperties();
bootProps.getProducer().getProperties().put(ProducerConfig.RETRIES_CONFIG, "bar");
KafkaBinderConfigurationProperties props = new KafkaBinderConfigurationProperties(
bootProps);
assertThat(props.mergedProducerConfiguration().get(ProducerConfig.RETRIES_CONFIG))
.isEqualTo("bar");
props.getConfiguration().put(ProducerConfig.RETRIES_CONFIG, "baz");
assertThat(props.mergedProducerConfiguration().get(ProducerConfig.RETRIES_CONFIG))
.isEqualTo("baz");
props.getProducerProperties().put(ProducerConfig.RETRIES_CONFIG, "qux");
assertThat(props.mergedProducerConfiguration().get(ProducerConfig.RETRIES_CONFIG))
.isEqualTo("qux");
}
@Test
public void testAutoCreateTopicDisabledFailsOnConsumerIfTopicNonExistentOnBroker()
throws Throwable {
KafkaProperties kafkaProperties = new TestKafkaProperties();
kafkaProperties.setBootstrapServers(Collections
.singletonList(embeddedKafka.getEmbeddedKafka().getBrokersAsString()));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
kafkaProperties);
// disable auto create topic on the binder.
configurationProperties.setAutoCreateTopics(false);
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider);
final String testTopicName = "nonExistent" + System.currentTimeMillis();
ExtendedConsumerProperties<KafkaConsumerProperties> properties = new ExtendedConsumerProperties<>(
new KafkaConsumerProperties());
expectedException.expect(BinderException.class);
expectedException.expectCause(isA(UnknownTopicOrPartitionException.class));
binder.createConsumerEndpoint(() -> testTopicName, "group", properties);
}
@Test
public void testAutoCreateTopicDisabledFailsOnProducerIfTopicNonExistentOnBroker()
throws Throwable {
KafkaProperties kafkaProperties = new TestKafkaProperties();
kafkaProperties.setBootstrapServers(Collections
.singletonList(embeddedKafka.getEmbeddedKafka().getBrokersAsString()));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
kafkaProperties);
// disable auto create topic on the binder.
configurationProperties.setAutoCreateTopics(false);
// reduce the wait time on the producer blocking operations.
configurationProperties.getConfiguration().put("max.block.ms", "3000");
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(1);
final RetryTemplate metadataRetryOperations = new RetryTemplate();
metadataRetryOperations.setRetryPolicy(simpleRetryPolicy);
provisioningProvider.setMetadataRetryOperations(metadataRetryOperations);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider);
final String testTopicName = "nonExistent" + System.currentTimeMillis();
ExtendedProducerProperties<KafkaProducerProperties> properties = new ExtendedProducerProperties<>(
new KafkaProducerProperties());
expectedException.expect(BinderException.class);
expectedException.expectCause(isA(UnknownTopicOrPartitionException.class));
binder.bindProducer(testTopicName, new DirectChannel(), properties);
}
@Bean
public ProducerFactory<String, Object> producerFactory(ObjectMapper aObjectMapper, KafkaProperties aKafkaProperties) {
return new DefaultKafkaProducerFactory<>(
producerConfigs(aKafkaProperties),
new StringSerializer(),
new JsonSerializer<>(aObjectMapper));
}
@Bean
public Map<String, Object> producerConfigs(KafkaProperties aKafkaProperties) {
Map<String, Object> props = aKafkaProperties.buildProducerProperties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
@ConditionalOnMissingBean
MessageVerifier<Message<?>> contractVerifierKafkaMessageExchange(
KafkaTemplate kafkaTemplate, EmbeddedKafkaBroker broker,
KafkaProperties kafkaProperties, KafkaStubMessagesInitializer initializer) {
return new KafkaStubMessages(kafkaTemplate, broker, kafkaProperties, initializer);
}
KafkaStubMessages(KafkaTemplate kafkaTemplate, EmbeddedKafkaBroker broker,
KafkaProperties kafkaProperties, KafkaStubMessagesInitializer initializer) {
this.kafkaTemplate = kafkaTemplate;
Map<String, Consumer> topicToConsumer = initializer.initialize(broker,
kafkaProperties);
this.receiver = new Receiver(topicToConsumer);
}
@Override
public Map<String, Consumer> initialize(EmbeddedKafkaBroker broker,
KafkaProperties kafkaProperties) {
Map<String, Consumer> map = new HashMap<>();
for (String topic : broker.getTopics()) {
map.put(topic, prepareListener(broker, topic, kafkaProperties));
}
return map;
}
private Consumer prepareListener(EmbeddedKafkaBroker broker, String destination,
KafkaProperties kafkaProperties) {
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(
kafkaProperties.getConsumer().getGroupId(), "false", broker);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(
consumerProperties);
Consumer<String, String> consumer = consumerFactory.createConsumer();
broker.consumeFromAnEmbeddedTopic(consumer, destination);
if (log.isDebugEnabled()) {
log.debug("Prepared consumer for destination [" + destination + "]");
}
return consumer;
}
@Bean(ZipkinAutoConfiguration.SENDER_BEAN_NAME)
Sender kafkaSender(KafkaProperties config) {
Map<String, Object> properties = config.buildProducerProperties();
properties.put("key.serializer", ByteArraySerializer.class.getName());
properties.put("value.serializer", ByteArraySerializer.class.getName());
// Kafka expects the input to be a String, but KafkaProperties returns a list
Object bootstrapServers = properties.get("bootstrap.servers");
if (bootstrapServers instanceof List) {
properties.put("bootstrap.servers", join((List) bootstrapServers));
}
return KafkaSender.newBuilder().topic(this.topic).overrides(properties)
.build();
}