下面列出了org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer#org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConcurrency(concurrency);
Map<String, Object> config = Maps.newHashMap();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 由于课程原版实现中广告的索引数据是存在于ConcurrentHashMap中, 即每个索引服务实例的jvm中。
// 所以当每一个索引实例监听kafka topic数据时, 需要保证每个实例都处于不同的消费者组
// 即各个实例之间需要各不相同的groupId, 保证在部署多实例时, 每个实例都可以加载到完整的索引数据
// 但在本实现中由于将索引数据单独提出, 存放到了Redis数据库中, 所以应该让所有实例属于同一个消费者组
// 共同消费kafka topic下的数据, 保证索引数据不会被重复消费。
// 综上, 若索引数据的存放如果为各个实例自身的jvm, 应该考虑加上以下代码(或自行编写其他实现)保证各实例处于不同的消费者组
// 若索引数据存放的位置, 是所有检索实例共享的位置, 应该将以下配置取消(或直接删除本类)
config.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<String, String>(config);
containerFactory.setConsumerFactory(consumerFactory);
return containerFactory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.setBatchListener(true);
return factory;
}
/**
* @param kafkaTemplate 操作类
* @param concurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory
* @return 死信批量处理器
*/
@Bean
@ConditionalOnProperty(prefix = "app.kafka.error", name = "dead-letter", havingValue = "true")
@ConditionalOnMissingBean
public GenericErrorHandler kafkaDeadLetterBatchErrorHandler(KafkaTemplate<Object, Object> kafkaTemplate,
ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory) {
//此处之所以要获取bean而非获取配置文件进行判断,因为spring-kafka允许注册自定义factory并且设置batchListener为true,此时配置文件参数可为空。
if (concurrentKafkaListenerContainerFactory.isBatchListener() != null && concurrentKafkaListenerContainerFactory.isBatchListener()) {
BatchErrorHandler batchErrorHandler = new KafkaDeadLetterBatchErrorHandler(kafkaTemplate);
concurrentKafkaListenerContainerFactory.setBatchErrorHandler(batchErrorHandler);
return batchErrorHandler;
} else {
ErrorHandler errorHandler = new KafkaDeadLetterErrorHandler(kafkaTemplate);
concurrentKafkaListenerContainerFactory.setErrorHandler(errorHandler);
return errorHandler;
}
}
@Bean(BEAN_KAFKA_BATCH_FACTORY)
@EnableKafkaCollectReceiver
@SuppressWarnings({ "unchecked", "rawtypes" })
public KafkaListenerContainerFactory<?> batchFactory(ReceiverProperties conf) {
// Create consumer factory.
Properties properties = conf.getKafka().getProperties();
Map<String, Object> config = (Map) properties;
ConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(config);
// Create concurrent consumer container factory.
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf);
factory.setConcurrency(conf.getKafka().getConcurrency());
factory.setBatchListener(true);
// Spring kafka container properties.
ContainerProperties containerProps = factory.getContainerProperties();
containerProps.setPollTimeout(conf.getKafka().getPollTimeout());
// Bulk consumption change buffer queue size.
containerProps.setQueueDepth(conf.getKafka().getQueueDepth());
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean({"eventsKafkaListenerContainerFactory", "kafkaListenerContainerFactory"})
public ConcurrentKafkaListenerContainerFactory<String, PublishedEventWrapper> eventsKafkaListenerContainerFactory(
EventMessageConverter eventMessageConverter, ConsumerFactory<String, PublishedEventWrapper> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, PublishedEventWrapper> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(eventApisConfiguration.getEventBus().getConsumer().getEventConcurrency());
factory.setMessageConverter(eventMessageConverter);
factory.getContainerProperties().setPollTimeout(3000);
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(eventApisConfiguration.getEventBus().getConsumer().getEventSchedulerPoolSize());
scheduler.setBeanName("EventsFactory-Scheduler");
scheduler.initialize();
factory.getContainerProperties().setScheduler(scheduler);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> retryKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(new RetryTemplate());
factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {
return consumerRecord.value().equals("bar");
}
});
return factory;
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, PurchaseInfoDto> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PurchaseInfoDto> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.setBatchListener(true);
return factory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.setBatchListener(true);
return factory;
}
/**
* 消费则的监听工厂
* @return
*/
@Bean
public KafkaListenerContainerFactory userRegisterSuccKafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory conFactory = new ConcurrentKafkaListenerContainerFactory<>();
conFactory.setConsumerFactory(kafKaRegisterSuccConsumerFactory());
conFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);// 设置消费者消费消息后的提交方式为手动提交
return conFactory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.setBatchListener(true);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> concurrentKafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MailContext> mailContextListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MailContext> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(mailContextConsumerFactory());
return factory;
}
@Bean
@Primary
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean(name = "kafkaListenerContainerFactoryWith6Consumer")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryWith3Consumer() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(6); //3 partition -> 6 thread in parallel in a single consumer
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean(name = "kafkaListenerContainerFactoryForBatchConsumer")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryForBatchConsumer() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setBatchListener(true);
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, EventeumMessage> eventeumKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, EventeumMessage> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(eventeumConsumerFactory());
factory.setConcurrency(1);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate());
return factory;
}
/**
* @param concurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory
* @return 普通处理器
*/
@Bean
@ConditionalOnMissingBean
public GenericErrorHandler kafkaBatchErrorHandler(ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory) {
//此处之所以要获取bean而非获取配置文件进行判断,因为spring-kafka允许注册自定义factory并且设置batchListener为true,此时配置文件参数可为空。
if (concurrentKafkaListenerContainerFactory.isBatchListener() != null && concurrentKafkaListenerContainerFactory.isBatchListener()) {
BatchErrorHandler batchErrorHandler = new KafkaBatchErrorHandler();
concurrentKafkaListenerContainerFactory.setBatchErrorHandler(batchErrorHandler);
return batchErrorHandler;
} else {
ErrorHandler errorHandler = new KafkaErrorHandler();
concurrentKafkaListenerContainerFactory.setErrorHandler(errorHandler);
return errorHandler;
}
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DebeziumEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, DebeziumEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(MANUAL);
factory.setBatchListener(true);
return factory;
}
@Bean
public Object kafkaListenerContainerFactory() throws Exception {
ConcurrentKafkaListenerContainerFactory<String, Message> factory = //
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// factory.setRecordFilterStrategy(
// r -> r.value().contains("fuck")
// );
return factory;
}