org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer#org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory源码实例Demo

下面列出了org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer#org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: ad   文件: KafkaConfiguration.java
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    containerFactory.setConcurrency(concurrency);

    Map<String, Object> config = Maps.newHashMap();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    // 由于课程原版实现中广告的索引数据是存在于ConcurrentHashMap中, 即每个索引服务实例的jvm中。
    // 所以当每一个索引实例监听kafka topic数据时, 需要保证每个实例都处于不同的消费者组
    // 即各个实例之间需要各不相同的groupId, 保证在部署多实例时, 每个实例都可以加载到完整的索引数据

    // 但在本实现中由于将索引数据单独提出, 存放到了Redis数据库中, 所以应该让所有实例属于同一个消费者组
    // 共同消费kafka topic下的数据, 保证索引数据不会被重复消费。

    // 综上, 若索引数据的存放如果为各个实例自身的jvm, 应该考虑加上以下代码(或自行编写其他实现)保证各实例处于不同的消费者组
    // 若索引数据存放的位置, 是所有检索实例共享的位置, 应该将以下配置取消(或直接删除本类)
    config.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
    DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<String, String>(config);
    containerFactory.setConsumerFactory(consumerFactory);
    return containerFactory;
}
 
源代码2 项目: stateful-functions   文件: KafkaConsumerConfig.java
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
  ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory);
  return factory;
}
 
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
	ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(consumerFactory());
	factory.setConcurrency(concurrency);
	factory.getContainerProperties().setPollTimeout(1500);
	factory.setBatchListener(true);
	return factory;
}
 
/**
 * @param kafkaTemplate                           操作类
 * @param concurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory
 * @return 死信批量处理器
 */
@Bean
@ConditionalOnProperty(prefix = "app.kafka.error", name = "dead-letter", havingValue = "true")
@ConditionalOnMissingBean
public GenericErrorHandler kafkaDeadLetterBatchErrorHandler(KafkaTemplate<Object, Object> kafkaTemplate,
                                                            ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory) {
    //此处之所以要获取bean而非获取配置文件进行判断,因为spring-kafka允许注册自定义factory并且设置batchListener为true,此时配置文件参数可为空。
    if (concurrentKafkaListenerContainerFactory.isBatchListener() != null && concurrentKafkaListenerContainerFactory.isBatchListener()) {
        BatchErrorHandler batchErrorHandler = new KafkaDeadLetterBatchErrorHandler(kafkaTemplate);
        concurrentKafkaListenerContainerFactory.setBatchErrorHandler(batchErrorHandler);
        return batchErrorHandler;
    } else {
        ErrorHandler errorHandler = new KafkaDeadLetterErrorHandler(kafkaTemplate);
        concurrentKafkaListenerContainerFactory.setErrorHandler(errorHandler);
        return errorHandler;
    }
}
 
@Bean(BEAN_KAFKA_BATCH_FACTORY)
@EnableKafkaCollectReceiver
@SuppressWarnings({ "unchecked", "rawtypes" })
public KafkaListenerContainerFactory<?> batchFactory(ReceiverProperties conf) {
	// Create consumer factory.
	Properties properties = conf.getKafka().getProperties();
	Map<String, Object> config = (Map) properties;
	ConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(config);

	// Create concurrent consumer container factory.
	ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(cf);
	factory.setConcurrency(conf.getKafka().getConcurrency());
	factory.setBatchListener(true);

	// Spring kafka container properties.
	ContainerProperties containerProps = factory.getContainerProperties();
	containerProps.setPollTimeout(conf.getKafka().getPollTimeout());
	// Bulk consumption change buffer queue size.
	containerProps.setQueueDepth(conf.getKafka().getQueueDepth());
	containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
	return factory;
}
 
源代码6 项目: eventapis   文件: EventApisFactory.java
@Bean({"eventsKafkaListenerContainerFactory", "kafkaListenerContainerFactory"})
public ConcurrentKafkaListenerContainerFactory<String, PublishedEventWrapper> eventsKafkaListenerContainerFactory(
        EventMessageConverter eventMessageConverter, ConsumerFactory<String, PublishedEventWrapper> consumerFactory) {

    ConcurrentKafkaListenerContainerFactory<String, PublishedEventWrapper> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(eventApisConfiguration.getEventBus().getConsumer().getEventConcurrency());
    factory.setMessageConverter(eventMessageConverter);
    factory.getContainerProperties().setPollTimeout(3000);
    ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
    scheduler.setPoolSize(eventApisConfiguration.getEventBus().getConsumer().getEventSchedulerPoolSize());
    scheduler.setBeanName("EventsFactory-Scheduler");
    scheduler.initialize();

    factory.getContainerProperties().setScheduler(scheduler);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    return factory;
}
 
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> retryKafkaListenerContainerFactory() {
	ConcurrentKafkaListenerContainerFactory<String, String> factory =
			new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(consumerFactory());
	factory.setRetryTemplate(new RetryTemplate());
	factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {

		@Override
		public boolean filter(ConsumerRecord<String, String> consumerRecord) {
			return consumerRecord.value().equals("bar");
		}

	});
	return factory;
}
 
源代码8 项目: ChengFeng1.5   文件: KafkaConfig.java
@Bean
ConcurrentKafkaListenerContainerFactory<String, PurchaseInfoDto> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, PurchaseInfoDto> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}
 
源代码9 项目: spring-examples   文件: KafkaConfiguration.java
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KMessage> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, KMessage> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}
 
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
	ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(consumerFactory());
	factory.setConcurrency(concurrency);
	factory.getContainerProperties().setPollTimeout(1500);
	factory.setBatchListener(true);
	return factory;
}
 
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
	ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(consumerFactory());
	factory.setConcurrency(concurrency);
	factory.getContainerProperties().setPollTimeout(1500);
	factory.setBatchListener(true);
	return factory;
}
 
源代码12 项目: gpmall   文件: KafKaConfig.java
/**
 * 消费则的监听工厂
 * @return
 */
@Bean
public KafkaListenerContainerFactory userRegisterSuccKafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory conFactory = new ConcurrentKafkaListenerContainerFactory<>();
    conFactory.setConsumerFactory(kafKaRegisterSuccConsumerFactory());
    conFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);//  设置消费者消费消息后的提交方式为手动提交
    return conFactory;
}
 
源代码13 项目: ext-opensource-netty   文件: KafkaConsumerConfig.java
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setPollTimeout(1500);
    return factory;
}
 
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
	ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(consumerFactory());
	factory.setConcurrency(concurrency);
	factory.getContainerProperties().setPollTimeout(1500);
	factory.setBatchListener(true);
	return factory;
}
 
源代码15 项目: cubeai   文件: KafkaConfiguration.java
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}
 
源代码16 项目: cubeai   文件: KafkaConfiguration.java
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}
 
源代码17 项目: flink-statefun   文件: KafkaConsumerConfig.java
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
  ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory);
  return factory;
}
 
源代码18 项目: spring-boot-tutorial   文件: DunwuKafkaConfig.java
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> concurrentKafkaListenerContainerFactory(
    ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
    ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    return factory;
}
 
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MailContext> mailContextListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, MailContext> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(mailContextConsumerFactory());
    return factory;
}
 
@Bean
@Primary
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}
 
@Bean(name = "kafkaListenerContainerFactoryWith6Consumer")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryWith3Consumer() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(6); //3 partition -> 6 thread in parallel in a single consumer
    factory.setConsumerFactory(consumerFactory());
    return factory;
}
 
@Bean(name = "kafkaListenerContainerFactoryForBatchConsumer")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryForBatchConsumer() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(1);
    factory.setBatchListener(true);
    factory.setConsumerFactory(consumerFactory());
    return factory;
}
 
源代码23 项目: eventeum   文件: KafkaConfiguration.java
@Bean
public ConcurrentKafkaListenerContainerFactory<String, EventeumMessage> eventeumKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, EventeumMessage> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(eventeumConsumerFactory());
    factory.setConcurrency(1);
    return factory;
}
 
源代码24 项目: java-specialagent   文件: SpringMessagingITest.java
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
  final ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.setReplyTemplate(kafkaTemplate());
  return factory;
}
 
源代码25 项目: java-specialagent   文件: SpringKafkaITest.java
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
  final ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.setReplyTemplate(kafkaTemplate());
  return factory;
}
 
/**
 * @param concurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory
 * @return 普通处理器
 */
@Bean
@ConditionalOnMissingBean
public GenericErrorHandler kafkaBatchErrorHandler(ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory) {
    //此处之所以要获取bean而非获取配置文件进行判断,因为spring-kafka允许注册自定义factory并且设置batchListener为true,此时配置文件参数可为空。
    if (concurrentKafkaListenerContainerFactory.isBatchListener() != null && concurrentKafkaListenerContainerFactory.isBatchListener()) {
        BatchErrorHandler batchErrorHandler = new KafkaBatchErrorHandler();
        concurrentKafkaListenerContainerFactory.setBatchErrorHandler(batchErrorHandler);
        return batchErrorHandler;
    } else {
        ErrorHandler errorHandler = new KafkaErrorHandler();
        concurrentKafkaListenerContainerFactory.setErrorHandler(errorHandler);
        return errorHandler;
    }
}
 
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DebeziumEvent>
kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, DebeziumEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();

  factory.setConsumerFactory(consumerFactory());
  factory.getContainerProperties().setAckMode(MANUAL);
  factory.setBatchListener(true);

  return factory;
}
 
@Bean
public Object kafkaListenerContainerFactory() throws Exception {
    ConcurrentKafkaListenerContainerFactory<String, Message> factory = //
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}
 
源代码29 项目: MicroCommunity   文件: KafkaConsumerConfig.java
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(concurrency);
    factory.getContainerProperties().setPollTimeout(1500);
    return factory;
}
 
源代码30 项目: SpringAll   文件: KafkaConsumerConfig.java
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Message> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // factory.setRecordFilterStrategy(
    //         r -> r.value().contains("fuck")
    // );
    return factory;
}