org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer#org.springframework.kafka.core.KafkaTemplate源码实例Demo

下面列出了org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer#org.springframework.kafka.core.KafkaTemplate 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: spring-cloud-stream-samples   文件: Producers.java
public static void main(String... args) {

		ObjectMapper mapper = new ObjectMapper();
		Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class, mapper);


		Map<String, Object> props = new HashMap<>();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		props.put(ProducerConfig.RETRIES_CONFIG, 0);
		props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
		props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
		props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, domainEventSerde.serializer().getClass());

		DomainEvent ddEvent = new DomainEvent();
		ddEvent.setBoardUuid("12345");
		ddEvent.setEventType("thisisanevent");

		DefaultKafkaProducerFactory<String, DomainEvent> pf = new DefaultKafkaProducerFactory<>(props);
		KafkaTemplate<String, DomainEvent> template = new KafkaTemplate<>(pf, true);
		template.setDefaultTopic("foobar");

		template.sendDefault("", ddEvent);
	}
 
private void sendTombStoneRecordsAndVerifyGracefulHandling() throws Exception {
	Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
	DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
			senderProps);
	try {
		KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
		template.setDefaultTopic("words-1");
		template.sendDefault(null);
		ConsumerRecords<String, String> received = consumer
				.poll(Duration.ofMillis(5000));
		// By asserting that the received record is empty, we are ensuring that the
		// tombstone record
		// was handled by the binder gracefully.
		assertThat(received.isEmpty()).isTrue();
	}
	finally {
		pf.destroy();
	}
}
 
源代码3 项目: spring-boot-tutorial   文件: SpringKafkaTest.java
@Test
public void test() throws InterruptedException {
    log.info("Start auto");
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    final CountDownLatch latch = new CountDownLatch(4);
    containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
        log.info("received: " + message);
        latch.countDown();
    });
    KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
    container.setBeanName("testAuto");
    container.start();
    Thread.sleep(1000); // wait a bit for the container to start
    KafkaTemplate<Integer, String> template = createTemplate();
    template.setDefaultTopic("zptest");
    template.sendDefault(0, "foo");
    template.sendDefault(2, "bar");
    template.sendDefault(0, "baz");
    template.sendDefault(2, "qux");
    template.flush();
    assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
    container.stop();
    log.info("Stop auto");
}
 
private void receiveAndValidate()
		throws Exception {
	Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
	DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
			senderProps);
	KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
	template.setDefaultTopic("words1");
	template.sendDefault("foobar1");
	template.setDefaultTopic("words2");
	template.sendDefault("foobar2");
	// Sleep a bit so that both the messages are processed before reading from the
	// output topic.
	// Else assertions might fail arbitrarily.
	Thread.sleep(5000);
	ConsumerRecords<String, String> received = KafkaTestUtils.getRecords(consumer);
	List<String> wordCounts = new ArrayList<>(2);

	received.records("counts")
			.forEach((consumerRecord) -> wordCounts.add((consumerRecord.value())));
	System.out.println(wordCounts);
	assertThat(wordCounts.contains("{\"word\":\"foobar1\",\"count\":1}")).isTrue();
	assertThat(wordCounts.contains("{\"word\":\"foobar2\",\"count\":1}")).isTrue();
}
 
/**
 * @param kafkaTemplate                           操作类
 * @param concurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory
 * @return 死信批量处理器
 */
@Bean
@ConditionalOnProperty(prefix = "app.kafka.error", name = "dead-letter", havingValue = "true")
@ConditionalOnMissingBean
public GenericErrorHandler kafkaDeadLetterBatchErrorHandler(KafkaTemplate<Object, Object> kafkaTemplate,
                                                            ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory) {
    //此处之所以要获取bean而非获取配置文件进行判断,因为spring-kafka允许注册自定义factory并且设置batchListener为true,此时配置文件参数可为空。
    if (concurrentKafkaListenerContainerFactory.isBatchListener() != null && concurrentKafkaListenerContainerFactory.isBatchListener()) {
        BatchErrorHandler batchErrorHandler = new KafkaDeadLetterBatchErrorHandler(kafkaTemplate);
        concurrentKafkaListenerContainerFactory.setBatchErrorHandler(batchErrorHandler);
        return batchErrorHandler;
    } else {
        ErrorHandler errorHandler = new KafkaDeadLetterErrorHandler(kafkaTemplate);
        concurrentKafkaListenerContainerFactory.setErrorHandler(errorHandler);
        return errorHandler;
    }
}
 
private void receiveAndValidate(String in, String... out) throws InterruptedException {
	Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
	DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
	try {
		KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
		template.setDefaultTopic(in);
		template.sendDefault("coffee");
		ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, out[0]);
		assertThat(cr.value().contains("coffee")).isTrue();

		template.sendDefault("electronics");
		cr = KafkaTestUtils.getSingleRecord(consumer, out[1]);
		assertThat(cr.value().contains("electronics")).isTrue();

		Assert.isTrue(countDownLatch.await(5, TimeUnit.SECONDS), "Analyze (BiConsumer) method didn't receive all the expected records");
	}
	finally {
		pf.destroy();
	}
}
 
源代码7 项目: feast   文件: JobCoordinatorService.java
@Autowired
public JobCoordinatorService(
    JobRepository jobRepository,
    FeatureSetRepository featureSetRepository,
    SpecService specService,
    JobManager jobManager,
    FeastProperties feastProperties,
    JobGroupingStrategy groupingStrategy,
    KafkaTemplate<String, FeatureSetProto.FeatureSetSpec> specPublisher) {
  this.jobRepository = jobRepository;
  this.featureSetRepository = featureSetRepository;
  this.specService = specService;
  this.jobManager = jobManager;
  this.jobProperties = feastProperties.getJobs();
  this.specPublisher = specPublisher;
  this.groupingStrategy = groupingStrategy;
}
 
@Test
@Ignore
public void test() {
	Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
	DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
			senderProps);
	KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
	template.setDefaultTopic("abc-DeserializationErrorHandlerByKafkaTests-In");
	template.sendDefault(1, null, "foobar");

	Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("foobar",
			"false", embeddedKafka);
	consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
	DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
			consumerProps);
	Consumer<String, String> consumer1 = cf.createConsumer();
	embeddedKafka.consumeFromAnEmbeddedTopic(consumer1, "error.abc-DeserializationErrorHandlerByKafkaTests-In.group");

	ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer1,
			"error.abc-DeserializationErrorHandlerByKafkaTests-In.group");
	assertThat(cr.value()).isEqualTo("foobar");
	assertThat(cr.partition()).isEqualTo(0); // custom partition function

	// Ensuring that the deserialization was indeed done by Kafka natively
	verify(conversionDelegate, never()).deserializeOnInbound(any(Class.class),
			any(KStream.class));
	verify(conversionDelegate, never()).serializeOnOutbound(any(KStream.class));
}
 
ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate,
		String topic,
		ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
		ProducerFactory<byte[], byte[]> producerFactory) {

	super(kafkaTemplate);
	if (producerProperties.getExtension().isUseTopicHeader()) {
		setTopicExpression(PARSER.parseExpression("headers['" + KafkaHeaders.TOPIC + "'] ?: '" + topic + "'"));
	}
	else {
		setTopicExpression(new LiteralExpression(topic));
	}
	Expression messageKeyExpression = producerProperties.getExtension().getMessageKeyExpression();
	if (expressionInterceptorNeeded(producerProperties)) {
		messageKeyExpression = PARSER.parseExpression("headers['"
				+ KafkaExpressionEvaluatingInterceptor.MESSAGE_KEY_HEADER
				+ "']");
	}
	setMessageKeyExpression(messageKeyExpression);
	setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
	if (producerProperties.isPartitioned()) {
		setPartitionIdExpression(PARSER.parseExpression(
				"headers['" + BinderHeaders.PARTITION_HEADER + "']"));
	}
	if (producerProperties.getExtension().isSync()) {
		setSync(true);
	}
	if (producerProperties.getExtension().getSendTimeoutExpression() != null) {
		setSendTimeoutExpression(producerProperties.getExtension().getSendTimeoutExpression());
	}
	this.producerFactory = producerFactory;
}
 
private void receiveAndValidate(ConfigurableApplicationContext context) throws Exception {
	Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
	DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
	try {
		KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
		template.setDefaultTopic("words");
		template.sendDefault(1, "foobar");
		Thread.sleep(2000L);
		StateStoreTestApplication processorApplication = context
				.getBean(StateStoreTestApplication.class);

		KeyValueStore<Long, Long> state1 = processorApplication.state1;
		assertThat(processorApplication.processed1).isTrue();
		assertThat(state1 != null).isTrue();
		assertThat(state1.name()).isEqualTo("my-store");
		WindowStore<Long, Long> state2 = processorApplication.state2;
		assertThat(state2 != null).isTrue();
		assertThat(state2.name()).isEqualTo("other-store");
		assertThat(state2.persistent()).isTrue();

		KeyValueStore<Long, Long> state3 = processorApplication.state1;
		assertThat(processorApplication.processed2).isTrue();
		assertThat(state3 != null).isTrue();
		assertThat(state3.name()).isEqualTo("my-store");
		WindowStore<Long, Long> state4 = processorApplication.state2;
		assertThat(state4 != null).isTrue();
		assertThat(state4.name()).isEqualTo("other-store");
		assertThat(state4.persistent()).isTrue();
	}
	finally {
		pf.destroy();
	}
}
 
@Autowired
public KafkaPassengerPublisher(
    KafkaTemplate<Object, Object> kafkaTemplate,
    @Value("${kafka.topic.from-passenger}") String topic) {
  this.kafkaTemplate = Objects.requireNonNull(kafkaTemplate);
  this.topic = Objects.requireNonNull(topic);
}
 
private void receiveAndValidateFoo() {
	Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
	DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
			senderProps);
	KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
	template.setDefaultTopic("foos");
	template.sendDefault("{\"id\":\"123\"}");
	ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer,
			"counts-id");
	assertThat(cr.value().contains("Count for product with ID 123: 1")).isTrue();
}
 
private void receiveAndValidateFoo() {
	Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
	DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
			senderProps);
	KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
	template.setDefaultTopic("foos");
	template.sendDefault("{\"id\":\"123\"}");
	ConsumerRecord<Integer, Long> cr = KafkaTestUtils.getSingleRecord(consumer,
			"counts-id");

	assertThat(cr.key()).isEqualTo(123);
	assertThat(cr.value()).isEqualTo(1L);
}
 
源代码14 项目: integration-patterns   文件: KafkaGateway.java
@Inject
public KafkaGateway(final KafkaTemplate<String, String> kafkaTemplate, final ObjectMapper objectMapper,
    @Value("${eventing.topic.product}") final String topic) {
    this.kafkaTemplate = kafkaTemplate;
    this.objectMapper = objectMapper;
    this.topic = topic;
}
 
源代码15 项目: flink-statefun   文件: KafkaDriverPublisher.java
@Autowired
public KafkaDriverPublisher(
    KafkaTemplate<Object, Object> kafkaTemplateForJson,
    @Value("${kafka.topic.from-driver}") String topic) {
  this.kafkaTemplate = kafkaTemplateForJson;
  this.topic = topic;
}
 
@Test
public void testKafkaStreamsWordCountProcessor() {
	Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
	DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
	try {
		KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
		template.setDefaultTopic("words");
		template.sendDefault("foobar");
		ConsumerRecords<String, String> cr = KafkaTestUtils.getRecords(consumer);
		assertThat(cr.count()).isGreaterThanOrEqualTo(1);
	}
	finally {
		pf.destroy();
	}
}
 
public KafkaTemplate<SpecificRecord, SpecificRecord> kafkaTemplate() {
  KafkaTemplate<SpecificRecord, SpecificRecord> template = new KafkaTemplate<>(producerFactory());

  template.setProducerListener(new ProducerListener<>() {
    @Override
    public void onSuccess(ProducerRecord<SpecificRecord, SpecificRecord> producerRecord, RecordMetadata recordMetadata) {
      log.info("Produced record {}", producerRecord);
      producedEvents.put((AvroKey) producerRecord.key(), (AvroEvent) producerRecord.value());
      producedHeaders.put((AvroKey) producerRecord.key(), producerRecord.headers());
    }
  });

  return template;
}
 
@Test
public void test() {
	SpringApplication app = new SpringApplication(ConsumingApplication.class);
	app.setWebApplicationType(WebApplicationType.NONE);
	ConfigurableApplicationContext context = app.run("--server.port=0",
			"--spring.cloud.stream.bindings.process-out-0.destination=out",
			"--spring.cloud.stream.bindings.process-in-0.destination=in.*",
			"--spring.cloud.stream.bindings.process-in-0.consumer.use-native-decoding=false",
			"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.destinationIsPattern=true",
			"--spring.cloud.stream.kafka.streams.binder.brokers="
					+ embeddedKafka.getBrokersAsString());
	try {
		Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
		DefaultKafkaProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(
				senderProps);
		KafkaTemplate<Integer, String> template = new KafkaTemplate<>(producerFactory, true);

		// send message to both topics that fit the pattern
		template.send("in.1", "foo1");
		assertThat(KafkaTestUtils.getSingleRecord(consumer, "out").value())
				.isEqualTo("foo1");
		template.send("in.2", "foo2");
		assertThat(KafkaTestUtils.getSingleRecord(consumer, "out").value())
				.isEqualTo("foo2");
	}
	finally {
		context.close();
	}
}
 
@Test
public void testOutboundNullValueIsHandledGracefully()
		throws Exception {
	SpringApplication app = new SpringApplication(
			OutboundNullApplication.class);
	app.setWebApplicationType(WebApplicationType.NONE);

	try (ConfigurableApplicationContext context = app.run("--server.port=0",
			"--spring.jmx.enabled=false",
			"--spring.cloud.stream.bindings.input.destination=words",
			"--spring.cloud.stream.bindings.output.destination=counts",
			"--spring.cloud.stream.bindings.output.producer.useNativeEncoding=false",
			"--spring.cloud.stream.kafka.streams.default.consumer.application-id=testOutboundNullValueIsHandledGracefully",
			"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
			"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
					+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
			"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
					+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
			"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
			"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
			"--spring.cloud.stream.kafka.binder.brokers="
					+ embeddedKafka.getBrokersAsString())) {

		Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
		DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
				senderProps);
		try {
			KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
			template.setDefaultTopic("words");
			template.sendDefault("foobar");
			ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer,
					"counts");
			assertThat(cr.value() == null).isTrue();
		}
		finally {
			pf.destroy();
		}
	}
}
 
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(name= BROADCASTER_PROPERTY, havingValue="KAFKA")
public BlockchainEventBroadcaster kafkaBlockchainEventBroadcaster(KafkaTemplate<String, EventeumMessage> kafkaTemplate,
                                                                  KafkaSettings kafkaSettings,
                                                                  CrudRepository<ContractEventFilter, String> filterRepository) {
    final BlockchainEventBroadcaster broadcaster =
            new KafkaBlockchainEventBroadcaster(kafkaTemplate, kafkaSettings, filterRepository);

    return onlyOnceWrap(broadcaster);
}
 
@Bean
@ConditionalOnMissingBean
public KafkaMessagePublisher kafkaMessagePublisher() {
  Map<String, Object> map = Maps.newHashMap();
  map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
  map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  map.put(ProducerConfig.RETRIES_CONFIG, retries);
  map.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  map.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  return new KafkaMessagePublisher(topic,
      new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(map)));
}
 
@Test
public void shouldMatchMessageLogSelectors() {
    final MessageInterceptorRegistry interceptorRegistry = mock(MessageInterceptorRegistry.class);
    final KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);

    final KafkaMessageSenderEndpointFactory factory = new KafkaMessageSenderEndpointFactory(interceptorRegistry, kafkaTemplate);

    assertThat(factory.matches(MessageLog.class), is(true));
    assertThat(factory.matches(Kafka.class), is(true));
}
 
源代码23 项目: synapse   文件: SynapseKafkaAutoConfiguration.java
@Bean
@ConditionalOnMissingBean(name = "kafkaMessageLogSenderEndpointFactory")
public MessageSenderEndpointFactory kafkaMessageLogSenderEndpointFactory(final MessageInterceptorRegistry registry,
                                                                         final KafkaTemplate<String, String> kafkaTemplate) {
    LOG.info("Auto-configuring Kafka MessageSenderEndpointFactory");
    return new KafkaMessageSenderEndpointFactory(registry, kafkaTemplate);
}
 
@Test
public void shouldRegisterMessageInterceptor() {
    final KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);

    final MessageInterceptorRegistry registry = new MessageInterceptorRegistry();
    final MessageInterceptor interceptor = mock(MessageInterceptor.class);
    registry.register(MessageInterceptorRegistration.allChannelsWith(interceptor));

    final KafkaMessageSenderEndpointFactory factory = new KafkaMessageSenderEndpointFactory(registry, kafkaTemplate);

    final MessageSenderEndpoint sender = factory.create("foo-stream", MessageFormat.V1);

    assertThat(sender.getInterceptorChain().getInterceptors(), contains(interceptor));
}
 
@Test
public void shouldNotMatchMessageQueueSelectors() {
    final MessageInterceptorRegistry interceptorRegistry = mock(MessageInterceptorRegistry.class);
    final KafkaTemplate kafkaTemplate = mock(KafkaTemplate.class);

    final KafkaMessageSenderEndpointFactory factory = new KafkaMessageSenderEndpointFactory(interceptorRegistry, kafkaTemplate);

    assertThat(factory.matches(MessageQueue.class), is(false));
}
 
源代码26 项目: reliable   文件: KafkaProducer.java
public KafkaProducer(KafkaTemplate kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}
 
源代码27 项目: spring-examples   文件: KafkaConfiguration.java
@Bean
public KafkaTemplate<String, KMessage> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
 
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
	return new KafkaTemplate<String, String>(producerFactory());
}
 
源代码29 项目: ZTuoExchange_framework   文件: CoinTrader.java
public void setKafkaTemplate(KafkaTemplate<String,String> template){
    this.kafkaTemplate = template;
}
 
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
	return new KafkaTemplate<String, String>(producerFactory());
}