下面列出了org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer#org.springframework.kafka.core.ConsumerFactory 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
private ConsumerFactory<String, Message> consumerFactory() throws Exception {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId());
props.put(ConsumerConfig.GROUP_ID_CONFIG, app.name);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
return new TracingConsumerFactory<>( //
new DefaultKafkaConsumerFactory<String, Message>( //
props, //
new StringDeserializer(), //
new JsonDeserializer<>(Message.class)));
}
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
consumerGroupId);
props.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
autoOffsetReset);
// props.put(
// ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
// StringDeserializer.class);
// props.put(
// ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
// StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Message.class));
}
@Bean({"eventsKafkaListenerContainerFactory", "kafkaListenerContainerFactory"})
public ConcurrentKafkaListenerContainerFactory<String, PublishedEventWrapper> eventsKafkaListenerContainerFactory(
EventMessageConverter eventMessageConverter, ConsumerFactory<String, PublishedEventWrapper> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, PublishedEventWrapper> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(eventApisConfiguration.getEventBus().getConsumer().getEventConcurrency());
factory.setMessageConverter(eventMessageConverter);
factory.getContainerProperties().setPollTimeout(3000);
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(eventApisConfiguration.getEventBus().getConsumer().getEventSchedulerPoolSize());
scheduler.setBeanName("EventsFactory-Scheduler");
scheduler.initialize();
factory.getContainerProperties().setScheduler(scheduler);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<String, String>(properties);
}
@Bean
@ConditionalOnMissingBean(name = "kafkaMessageLogReceiverEndpointFactory")
public MessageLogReceiverEndpointFactory kafkaMessageLogReceiverEndpointFactory(final KafkaProperties kafkaProperties,
final MessageInterceptorRegistry interceptorRegistry,
final ApplicationEventPublisher eventPublisher,
final ConsumerFactory<String, String> kafkaConsumerFactory) {
LOG.info("Auto-configuring Kafka MessageLogReceiverEndpointFactory");
final ExecutorService executorService = newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("kafka-message-log-%d").build()
);
final KafkaConsumer<String, String> kafkaConsumer = (KafkaConsumer<String, String>)kafkaConsumerFactory.createConsumer();
return new KafkaMessageLogReceiverEndpointFactory(
interceptorRegistry,
kafkaConsumer,
executorService,
eventPublisher);
}
private synchronized ConsumerFactory<?, ?> createConsumerFactory() {
if (this.defaultConsumerFactory == null) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
Map<String, Object> mergedConfig = this.binderConfigurationProperties
.mergedConsumerConfiguration();
if (!ObjectUtils.isEmpty(mergedConfig)) {
props.putAll(mergedConfig);
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.binderConfigurationProperties
.getKafkaConnectionString());
}
this.defaultConsumerFactory = new DefaultKafkaConsumerFactory<>(
props);
}
return this.defaultConsumerFactory;
}
@Bean
@ConditionalOnMissingBean(name = "binderClientFactoryCustomizer")
public ClientFactoryCustomizer binderClientFactoryCustomizer(MeterRegistry meterRegistry) {
return new ClientFactoryCustomizer() {
@Override
public void configure(ProducerFactory<?, ?> pf) {
if (pf instanceof DefaultKafkaProducerFactory) {
((DefaultKafkaProducerFactory<?, ?>) pf)
.addListener(new MicrometerProducerListener<>(meterRegistry));
}
}
@Override
public void configure(ConsumerFactory<?, ?> cf) {
if (cf instanceof DefaultKafkaConsumerFactory) {
((DefaultKafkaConsumerFactory<?, ?>) cf)
.addListener(new MicrometerConsumerListener<>(meterRegistry));
}
}
};
}
@Bean
KafkaBinderHealthIndicator kafkaBinderHealthIndicator(
KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
Map<String, Object> mergedConfig = configurationProperties
.mergedConsumerConfiguration();
if (!ObjectUtils.isEmpty(mergedConfig)) {
props.putAll(mergedConfig);
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(
kafkaMessageChannelBinder, consumerFactory);
indicator.setTimeout(configurationProperties.getHealthTimeout());
return indicator;
}
@Test
@SuppressWarnings("unchecked")
public void testKafkaHealthIndicatorProperties() {
assertThat(this.kafkaBinderHealthIndicator).isNotNull();
Field consumerFactoryField = ReflectionUtils.findField(
KafkaBinderHealthIndicator.class, "consumerFactory",
ConsumerFactory.class);
ReflectionUtils.makeAccessible(consumerFactoryField);
DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) ReflectionUtils
.getField(consumerFactoryField, this.kafkaBinderHealthIndicator);
Field configField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class,
"configs", Map.class);
ReflectionUtils.makeAccessible(configField);
Map<String, Object> configs = (Map<String, Object>) ReflectionUtils
.getField(configField, consumerFactory);
assertThat(configs.containsKey("bootstrap.servers")).isTrue();
List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9092");
bootstrapServers.add("10.98.09.196:9092");
assertThat(((List<String>) configs.get("bootstrap.servers"))
.containsAll(bootstrapServers)).isTrue();
}
private void registerContainers(ConfigurableListableBeanFactory beanFactory,
List<Contract> matchingContracts, String flowName,
StubRunnerKafkaRouter listener) {
// listener's container
ConsumerFactory consumerFactory = beanFactory.getBean(ConsumerFactory.class);
for (Contract matchingContract : matchingContracts) {
if (matchingContract.getInput() == null) {
continue;
}
String destination = MapConverter.getStubSideValuesForNonBody(
matchingContract.getInput().getMessageFrom()).toString();
ContainerProperties containerProperties = new ContainerProperties(
destination);
KafkaMessageListenerContainer container = listenerContainer(consumerFactory,
containerProperties, listener);
String containerName = flowName + ".container";
Object initializedContainer = beanFactory.initializeBean(container,
containerName);
beanFactory.registerSingleton(containerName, initializedContainer);
if (log.isDebugEnabled()) {
log.debug(
"Initialized kafka message container with name [" + containerName
+ "] listening to destination [" + destination + "]");
}
}
}
@Bean
public ConsumerFactory<String, KMessage> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, KMessage.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* 根据consumerProps填写的参数创建消费者工厂
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, Constants.DEFAULT_PRODUCER_GROUP);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaMessageListenerContainer<String, String> commandListenerContainer(KafkaCommandListener commandListener, ConsumerFactory<String, String> consumerFactory) {
ContainerProperties properties = new ContainerProperties(commandTopic);
properties.setGroupId(Constants.DEFAULT_CONSUMER_GROUP);
properties.setMessageListener(commandListener);
properties.setMissingTopicsFatal(false);
return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
@Bean
public KafkaMessageListenerContainer<String, String> domainEventListenerContainer(KafkaDomainEventListener domainEventListener, ConsumerFactory<String, String> consumerFactory) {
ContainerProperties properties = new ContainerProperties(eventTopic);
properties.setGroupId(Constants.DEFAULT_PRODUCER_GROUP);
properties.setMessageListener(domainEventListener);
properties.setMissingTopicsFatal(false);
properties.setAckMode(ContainerProperties.AckMode.MANUAL);
return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
@Bean
public KafkaMessageListenerContainer<String, String> applicationMessageListenerContainer(KafkaApplicationMessageListener applicationMessageListener, ConsumerFactory<String, String> consumerFactory) {
ContainerProperties properties = new ContainerProperties(applicationTopic);
properties.setGroupId(Constants.DEFAULT_PRODUCER_GROUP);
properties.setMessageListener(applicationMessageListener);
properties.setMissingTopicsFatal(false);
properties.setAckMode(ContainerProperties.AckMode.MANUAL);
return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
@Bean
public KafkaMessageListenerContainer<String, String> publishableExceptionListenerContainer(KafkaPublishableExceptionListener publishableExceptionListener, ConsumerFactory<String, String> consumerFactory) {
ContainerProperties properties = new ContainerProperties(exceptionTopic);
properties.setGroupId(Constants.DEFAULT_PRODUCER_GROUP);
properties.setMessageListener(publishableExceptionListener);
properties.setMissingTopicsFatal(false);
properties.setAckMode(ContainerProperties.AckMode.MANUAL);
return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
/**
* 根据consumerProps填写的参数创建消费者工厂
*/
@Bean
public ConsumerFactory consumerFactory() {
// 消费者配置参数
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, DEFAULT_CONSUMER_GROUP);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* 根据consumerProps填写的参数创建消费者工厂
*/
@Bean
public ConsumerFactory consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, QueueProperties.KAFKA_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, QueueProperties.DEFAULT_PRODUCER_GROUP);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
public ConsumerFactory<String, String> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "simulator-java.group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> concurrentKafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
public ConsumerFactory<String, MailContext> mailContextConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("kafka.bootstrap.address"));
props.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("consumer.group.name"));
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MailContext.class));
}
public ReplyConfiguration(KafkaProperties properties,
org.springframework.boot.autoconfigure.kafka.KafkaProperties kafkaProperties,
ObjectProvider<RecordMessageConverter> messageConverter,
ConsumerFactory<Object, Object> consumerFactory,
ProducerFactory<Object, Object> producerFactory,
ProducerListener<Object, Object> producerListener) {
this.kafkaProperties = kafkaProperties;
this.properties = properties;
this.messageConverter = messageConverter.getIfUnique();
this.consumerFactory = consumerFactory;
this.producerFactory = producerFactory;
this.producerListener = producerListener;
}
@Bean
public KafkaMessageListenerContainer<String, String> replyContainer(
ConsumerFactory<String, String> cf) {
ContainerProperties containerProperties = new ContainerProperties("asyncReplies");
containerProperties.setGroupId("async");
return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
public TracingConsumerFactory(ConsumerFactory<K, V> consumerFactory, Tracer tracer,
Collection<SpanDecorator> spanDecorators,
BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider) {
this.tracer = tracer;
this.consumerFactory = consumerFactory;
this.spanDecorators = (spanDecorators == null)
? Collections.singletonList(STANDARD_TAGS)
: spanDecorators;
this.consumerSpanNameProvider = (consumerSpanNameProvider == null)
? ClientSpanNameProvider.CONSUMER_OPERATION_NAME
: consumerSpanNameProvider;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
final Map<String, Object> consumerProps = KafkaTestUtils
.consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka());
consumerProps.put("auto.offset.reset", "earliest");
return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps), tracer());
}
@Bean
public ConsumerFactory<String, PublishedEventWrapper> kafkaConsumerFactory() {
KafkaProperties properties = eventApisConfiguration.getEventBus().clone();
properties.getConsumer().setEnableAutoCommit(false);
return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(),
new StringDeserializer(), new JsonDeserializer<>(PublishedEventWrapper.class, objectMapper));
}
@Bean
public ConsumerFactory<String, Operation> kafkaOperationsFactory() {
KafkaProperties properties = eventApisConfiguration.getEventBus().clone();
properties.getConsumer().setEnableAutoCommit(false);
return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(),
new StringDeserializer(), new JsonDeserializer<>(Operation.class, objectMapper));
}