org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer#org.springframework.kafka.core.ConsumerFactory源码实例Demo

下面列出了org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer#org.springframework.kafka.core.ConsumerFactory 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: stateful-functions   文件: KafkaConsumerConfig.java
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
  ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory);
  return factory;
}
 
private ConsumerFactory<String, Message> consumerFactory() throws Exception {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, app.name);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

    return new TracingConsumerFactory<>( //
            new DefaultKafkaConsumerFactory<String, Message>( //
                    props, //
                    new StringDeserializer(), //
                    new JsonDeserializer<>(Message.class)));
}
 
源代码3 项目: SpringAll   文件: KafkaConsumerConfig.java
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            bootstrapServers);
    props.put(
            ConsumerConfig.GROUP_ID_CONFIG,
            consumerGroupId);
    props.put(
            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
            autoOffsetReset);
    // props.put(
    //         ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    //         StringDeserializer.class);
    // props.put(
    //         ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    //         StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            new JsonDeserializer<>(Message.class));
}
 
源代码4 项目: eventapis   文件: EventApisFactory.java
@Bean({"eventsKafkaListenerContainerFactory", "kafkaListenerContainerFactory"})
public ConcurrentKafkaListenerContainerFactory<String, PublishedEventWrapper> eventsKafkaListenerContainerFactory(
        EventMessageConverter eventMessageConverter, ConsumerFactory<String, PublishedEventWrapper> consumerFactory) {

    ConcurrentKafkaListenerContainerFactory<String, PublishedEventWrapper> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(eventApisConfiguration.getEventBus().getConsumer().getEventConcurrency());
    factory.setMessageConverter(eventMessageConverter);
    factory.getContainerProperties().setPollTimeout(3000);
    ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
    scheduler.setPoolSize(eventApisConfiguration.getEventBus().getConsumer().getEventSchedulerPoolSize());
    scheduler.setBeanName("EventsFactory-Scheduler");
    scheduler.initialize();

    factory.getContainerProperties().setScheduler(scheduler);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
    return factory;
}
 
源代码5 项目: springboot_cwenao   文件: KafkaConsumerConfig.java
public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> properties = new HashMap<String, Object>();

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return new DefaultKafkaConsumerFactory<String, String>(properties);

    }
 
源代码6 项目: synapse   文件: SynapseKafkaAutoConfiguration.java
@Bean
@ConditionalOnMissingBean(name = "kafkaMessageLogReceiverEndpointFactory")
public MessageLogReceiverEndpointFactory kafkaMessageLogReceiverEndpointFactory(final KafkaProperties kafkaProperties,
                                                                                final MessageInterceptorRegistry interceptorRegistry,
                                                                                final ApplicationEventPublisher eventPublisher,
                                                                                final ConsumerFactory<String, String> kafkaConsumerFactory) {
    LOG.info("Auto-configuring Kafka MessageLogReceiverEndpointFactory");
    final ExecutorService executorService = newCachedThreadPool(
            new ThreadFactoryBuilder().setNameFormat("kafka-message-log-%d").build()
    );

    final KafkaConsumer<String, String> kafkaConsumer = (KafkaConsumer<String, String>)kafkaConsumerFactory.createConsumer();

    return new KafkaMessageLogReceiverEndpointFactory(
            interceptorRegistry,
            kafkaConsumer,
            executorService,
            eventPublisher);
}
 
private synchronized  ConsumerFactory<?, ?> createConsumerFactory() {
	if (this.defaultConsumerFactory == null) {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
				ByteArrayDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
				ByteArrayDeserializer.class);
		Map<String, Object> mergedConfig = this.binderConfigurationProperties
				.mergedConsumerConfiguration();
		if (!ObjectUtils.isEmpty(mergedConfig)) {
			props.putAll(mergedConfig);
		}
		if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
			props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
					this.binderConfigurationProperties
							.getKafkaConnectionString());
		}
		this.defaultConsumerFactory = new DefaultKafkaConsumerFactory<>(
				props);
	}
	return this.defaultConsumerFactory;
}
 
@Bean
@ConditionalOnMissingBean(name = "binderClientFactoryCustomizer")
public ClientFactoryCustomizer binderClientFactoryCustomizer(MeterRegistry meterRegistry) {

	return new ClientFactoryCustomizer() {

		@Override
		public void configure(ProducerFactory<?, ?> pf) {
			if (pf instanceof DefaultKafkaProducerFactory) {
				((DefaultKafkaProducerFactory<?, ?>) pf)
						.addListener(new MicrometerProducerListener<>(meterRegistry));
			}
		}

		@Override
		public void configure(ConsumerFactory<?, ?> cf) {
			if (cf instanceof DefaultKafkaConsumerFactory) {
				((DefaultKafkaConsumerFactory<?, ?>) cf)
						.addListener(new MicrometerConsumerListener<>(meterRegistry));
			}
		}

	};

}
 
@Bean
KafkaBinderHealthIndicator kafkaBinderHealthIndicator(
		KafkaMessageChannelBinder kafkaMessageChannelBinder,
		KafkaBinderConfigurationProperties configurationProperties) {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
			ByteArrayDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
			ByteArrayDeserializer.class);
	Map<String, Object> mergedConfig = configurationProperties
			.mergedConsumerConfiguration();
	if (!ObjectUtils.isEmpty(mergedConfig)) {
		props.putAll(mergedConfig);
	}
	if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
				configurationProperties.getKafkaConnectionString());
	}
	ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
	KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(
			kafkaMessageChannelBinder, consumerFactory);
	indicator.setTimeout(configurationProperties.getHealthTimeout());
	return indicator;
}
 
@Test
@SuppressWarnings("unchecked")
public void testKafkaHealthIndicatorProperties() {
	assertThat(this.kafkaBinderHealthIndicator).isNotNull();
	Field consumerFactoryField = ReflectionUtils.findField(
			KafkaBinderHealthIndicator.class, "consumerFactory",
			ConsumerFactory.class);
	ReflectionUtils.makeAccessible(consumerFactoryField);
	DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) ReflectionUtils
			.getField(consumerFactoryField, this.kafkaBinderHealthIndicator);
	Field configField = ReflectionUtils.findField(DefaultKafkaConsumerFactory.class,
			"configs", Map.class);
	ReflectionUtils.makeAccessible(configField);
	Map<String, Object> configs = (Map<String, Object>) ReflectionUtils
			.getField(configField, consumerFactory);
	assertThat(configs.containsKey("bootstrap.servers")).isTrue();
	List<String> bootstrapServers = new ArrayList<>();
	bootstrapServers.add("10.98.09.199:9092");
	bootstrapServers.add("10.98.09.196:9092");
	assertThat(((List<String>) configs.get("bootstrap.servers"))
			.containsAll(bootstrapServers)).isTrue();
}
 
private void registerContainers(ConfigurableListableBeanFactory beanFactory,
		List<Contract> matchingContracts, String flowName,
		StubRunnerKafkaRouter listener) {
	// listener's container
	ConsumerFactory consumerFactory = beanFactory.getBean(ConsumerFactory.class);
	for (Contract matchingContract : matchingContracts) {
		if (matchingContract.getInput() == null) {
			continue;
		}
		String destination = MapConverter.getStubSideValuesForNonBody(
				matchingContract.getInput().getMessageFrom()).toString();
		ContainerProperties containerProperties = new ContainerProperties(
				destination);
		KafkaMessageListenerContainer container = listenerContainer(consumerFactory,
				containerProperties, listener);
		String containerName = flowName + ".container";
		Object initializedContainer = beanFactory.initializeBean(container,
				containerName);
		beanFactory.registerSingleton(containerName, initializedContainer);
		if (log.isDebugEnabled()) {
			log.debug(
					"Initialized kafka message container with name [" + containerName
							+ "] listening to destination [" + destination + "]");
		}
	}
}
 
源代码12 项目: spring-examples   文件: KafkaConfiguration.java
@Bean
public ConsumerFactory<String, KMessage> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, KMessage.class);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}
 
源代码13 项目: enode   文件: EnodeTestKafkaConfig.java
/**
 * 根据consumerProps填写的参数创建消费者工厂
 */
@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_SERVER);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, Constants.DEFAULT_PRODUCER_GROUP);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}
 
源代码14 项目: enode   文件: EnodeTestKafkaConfig.java
@Bean
public KafkaMessageListenerContainer<String, String> commandListenerContainer(KafkaCommandListener commandListener, ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties properties = new ContainerProperties(commandTopic);
    properties.setGroupId(Constants.DEFAULT_CONSUMER_GROUP);
    properties.setMessageListener(commandListener);
    properties.setMissingTopicsFatal(false);
    return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
 
源代码15 项目: enode   文件: EnodeTestKafkaConfig.java
@Bean
public KafkaMessageListenerContainer<String, String> domainEventListenerContainer(KafkaDomainEventListener domainEventListener, ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties properties = new ContainerProperties(eventTopic);
    properties.setGroupId(Constants.DEFAULT_PRODUCER_GROUP);
    properties.setMessageListener(domainEventListener);
    properties.setMissingTopicsFatal(false);
    properties.setAckMode(ContainerProperties.AckMode.MANUAL);
    return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
 
源代码16 项目: enode   文件: EnodeTestKafkaConfig.java
@Bean
public KafkaMessageListenerContainer<String, String> applicationMessageListenerContainer(KafkaApplicationMessageListener applicationMessageListener, ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties properties = new ContainerProperties(applicationTopic);
    properties.setGroupId(Constants.DEFAULT_PRODUCER_GROUP);
    properties.setMessageListener(applicationMessageListener);
    properties.setMissingTopicsFatal(false);
    properties.setAckMode(ContainerProperties.AckMode.MANUAL);
    return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
 
源代码17 项目: enode   文件: EnodeTestKafkaConfig.java
@Bean
public KafkaMessageListenerContainer<String, String> publishableExceptionListenerContainer(KafkaPublishableExceptionListener publishableExceptionListener, ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties properties = new ContainerProperties(exceptionTopic);
    properties.setGroupId(Constants.DEFAULT_PRODUCER_GROUP);
    properties.setMessageListener(publishableExceptionListener);
    properties.setMissingTopicsFatal(false);
    properties.setAckMode(ContainerProperties.AckMode.MANUAL);
    return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
 
源代码18 项目: enode   文件: KafkaCommandConfig.java
/**
 * 根据consumerProps填写的参数创建消费者工厂
 */
@Bean
public ConsumerFactory consumerFactory() {
    // 消费者配置参数
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, DEFAULT_CONSUMER_GROUP);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}
 
源代码19 项目: enode   文件: KafkaEventConfig.java
/**
 * 根据consumerProps填写的参数创建消费者工厂
 */
@Bean
public ConsumerFactory consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, QueueProperties.KAFKA_SERVER);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, QueueProperties.DEFAULT_PRODUCER_GROUP);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}
 
源代码20 项目: tutorials   文件: KafkaConsumerConfig.java
public ConsumerFactory<String, String> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}
 
源代码21 项目: flink-statefun   文件: KafkaConsumerConfig.java
@Bean
public ConsumerFactory<String, String> consumerFactory() {
  Map<String, Object> props = new HashMap<>();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "simulator-java.group");
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  return new DefaultKafkaConsumerFactory<>(props);
}
 
源代码22 项目: flink-statefun   文件: KafkaConsumerConfig.java
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
  ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory);
  return factory;
}
 
源代码23 项目: spring-boot-tutorial   文件: DunwuKafkaConfig.java
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> concurrentKafkaListenerContainerFactory(
    ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
    ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    return factory;
}
 
public ConsumerFactory<String, MailContext> mailContextConsumerFactory() {

      Map<String, Object> props = new HashMap<>();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("kafka.bootstrap.address"));
      props.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty("consumer.group.name"));
      
      return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MailContext.class));
  }
 
public ReplyConfiguration(KafkaProperties properties,
                          org.springframework.boot.autoconfigure.kafka.KafkaProperties kafkaProperties,
                          ObjectProvider<RecordMessageConverter> messageConverter,
                          ConsumerFactory<Object, Object> consumerFactory,
                          ProducerFactory<Object, Object> producerFactory,
                          ProducerListener<Object, Object> producerListener) {
    this.kafkaProperties = kafkaProperties;
    this.properties = properties;
    this.messageConverter = messageConverter.getIfUnique();
    this.consumerFactory = consumerFactory;
    this.producerFactory = producerFactory;
    this.producerListener = producerListener;
}
 
@Bean
public KafkaMessageListenerContainer<String, String> replyContainer(
        ConsumerFactory<String, String> cf) {
    ContainerProperties containerProperties = new ContainerProperties("asyncReplies");
    containerProperties.setGroupId("async");
    return new KafkaMessageListenerContainer<>(cf, containerProperties);
}
 
源代码27 项目: java-kafka-client   文件: TracingConsumerFactory.java
public TracingConsumerFactory(ConsumerFactory<K, V> consumerFactory, Tracer tracer,
    Collection<SpanDecorator> spanDecorators,
    BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider) {
  this.tracer = tracer;
  this.consumerFactory = consumerFactory;
  this.spanDecorators = (spanDecorators == null)
      ? Collections.singletonList(STANDARD_TAGS)
      : spanDecorators;
  this.consumerSpanNameProvider = (consumerSpanNameProvider == null)
      ? ClientSpanNameProvider.CONSUMER_OPERATION_NAME
      : consumerSpanNameProvider;
}
 
源代码28 项目: java-kafka-client   文件: TestConfiguration.java
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
  final Map<String, Object> consumerProps = KafkaTestUtils
      .consumerProps("sampleRawConsumer", "false", embeddedKafka.getEmbeddedKafka());
  consumerProps.put("auto.offset.reset", "earliest");

  return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps), tracer());
}
 
源代码29 项目: eventapis   文件: EventApisFactory.java
@Bean
public ConsumerFactory<String, PublishedEventWrapper> kafkaConsumerFactory() {
    KafkaProperties properties = eventApisConfiguration.getEventBus().clone();
    properties.getConsumer().setEnableAutoCommit(false);
    return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(),
            new StringDeserializer(), new JsonDeserializer<>(PublishedEventWrapper.class, objectMapper));
}
 
源代码30 项目: eventapis   文件: EventApisFactory.java
@Bean
public ConsumerFactory<String, Operation> kafkaOperationsFactory() {
    KafkaProperties properties = eventApisConfiguration.getEventBus().clone();
    properties.getConsumer().setEnableAutoCommit(false);
    return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(),
            new StringDeserializer(), new JsonDeserializer<>(Operation.class, objectMapper));
}