org.springframework.boot.autoconfigure.kafka.KafkaProperties#setBootstrapServers ( )源码实例Demo

下面列出了org.springframework.boot.autoconfigure.kafka.KafkaProperties#setBootstrapServers ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@SuppressWarnings("rawtypes")
@Test
public void bootPropertiesOverriddenExceptServers() throws Exception {
	KafkaProperties bootConfig = new KafkaProperties();
	bootConfig.getProperties().put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
			"PLAINTEXT");
	bootConfig.setBootstrapServers(Collections.singletonList("localhost:1234"));
	KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties(
			bootConfig);
	binderConfig.getConfiguration().put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
			"SSL");
	ClassPathResource ts = new ClassPathResource("test.truststore.ks");
	binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
			ts.getFile().getAbsolutePath());
	binderConfig.setBrokers("localhost:9092");
	KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig,
			bootConfig);
	AdminClient adminClient = provisioner.createAdminClient();
	assertThat(KafkaTestUtils.getPropertyValue(adminClient,
			"client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
	Map configs = KafkaTestUtils.getPropertyValue(adminClient,
			"client.selector.channelBuilder.configs", Map.class);
	assertThat(
			((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0))
					.isEqualTo("localhost:1234");
	adminClient.close();
}
 
@SuppressWarnings("rawtypes")
@Test
public void bootPropertiesOverriddenIncludingServers() throws Exception {
	KafkaProperties bootConfig = new KafkaProperties();
	bootConfig.getProperties().put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
			"PLAINTEXT");
	bootConfig.setBootstrapServers(Collections.singletonList("localhost:9092"));
	KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties(
			bootConfig);
	binderConfig.getConfiguration().put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
			"SSL");
	ClassPathResource ts = new ClassPathResource("test.truststore.ks");
	binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
			ts.getFile().getAbsolutePath());
	binderConfig.setBrokers("localhost:1234");
	KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig,
			bootConfig);
	AdminClient adminClient = provisioner.createAdminClient();
	assertThat(KafkaTestUtils.getPropertyValue(adminClient,
			"client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
	Map configs = KafkaTestUtils.getPropertyValue(adminClient,
			"client.selector.channelBuilder.configs", Map.class);
	assertThat(
			((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0))
					.isEqualTo("localhost:1234");
	adminClient.close();
}
 
@Test
public void testAutoCreateTopicDisabledFailsOnConsumerIfTopicNonExistentOnBroker()
		throws Throwable {

	KafkaProperties kafkaProperties = new TestKafkaProperties();
	kafkaProperties.setBootstrapServers(Collections
			.singletonList(embeddedKafka.getEmbeddedKafka().getBrokersAsString()));
	KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
			kafkaProperties);
	// disable auto create topic on the binder.
	configurationProperties.setAutoCreateTopics(false);

	KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
			configurationProperties, kafkaProperties);
	provisioningProvider.setMetadataRetryOperations(new RetryTemplate());

	KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
			configurationProperties, provisioningProvider);

	final String testTopicName = "nonExistent" + System.currentTimeMillis();

	ExtendedConsumerProperties<KafkaConsumerProperties> properties = new ExtendedConsumerProperties<>(
			new KafkaConsumerProperties());

	expectedException.expect(BinderException.class);
	expectedException.expectCause(isA(UnknownTopicOrPartitionException.class));
	binder.createConsumerEndpoint(() -> testTopicName, "group", properties);
}
 
@Test
public void testAutoCreateTopicDisabledFailsOnProducerIfTopicNonExistentOnBroker()
		throws Throwable {

	KafkaProperties kafkaProperties = new TestKafkaProperties();
	kafkaProperties.setBootstrapServers(Collections
			.singletonList(embeddedKafka.getEmbeddedKafka().getBrokersAsString()));

	KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
			kafkaProperties);
	// disable auto create topic on the binder.
	configurationProperties.setAutoCreateTopics(false);
	// reduce the wait time on the producer blocking operations.
	configurationProperties.getConfiguration().put("max.block.ms", "3000");

	KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
			configurationProperties, kafkaProperties);
	SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(1);
	final RetryTemplate metadataRetryOperations = new RetryTemplate();
	metadataRetryOperations.setRetryPolicy(simpleRetryPolicy);
	provisioningProvider.setMetadataRetryOperations(metadataRetryOperations);

	KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
			configurationProperties, provisioningProvider);

	final String testTopicName = "nonExistent" + System.currentTimeMillis();

	ExtendedProducerProperties<KafkaProducerProperties> properties = new ExtendedProducerProperties<>(
			new KafkaProducerProperties());

	expectedException.expect(BinderException.class);
	expectedException.expectCause(isA(UnknownTopicOrPartitionException.class));

	binder.bindProducer(testTopicName, new DirectChannel(), properties);

}
 
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testProducerRunsInTx() {
	KafkaProperties kafkaProperties = new TestKafkaProperties();
	kafkaProperties.setBootstrapServers(Collections
			.singletonList(embeddedKafka.getEmbeddedKafka().getBrokersAsString()));
	KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
			kafkaProperties);
	configurationProperties.getTransaction().setTransactionIdPrefix("foo-");
	configurationProperties.getTransaction().getProducer().setUseNativeEncoding(true);
	KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
			configurationProperties, kafkaProperties);
	provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
	final Producer mockProducer = mock(Producer.class);
	given(mockProducer.send(any(), any())).willReturn(new SettableListenableFuture<>());

	KafkaProducerProperties extension1 = configurationProperties
			.getTransaction().getProducer().getExtension();
	extension1.getConfiguration().put(ProducerConfig.RETRIES_CONFIG, "1");
	extension1.getConfiguration().put(ProducerConfig.ACKS_CONFIG, "all");

	willReturn(Collections.singletonList(new TopicPartition("foo", 0)))
			.given(mockProducer).partitionsFor(anyString());
	KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
			configurationProperties, provisioningProvider) {

		@Override
		protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
				String transactionIdPrefix,
				ExtendedProducerProperties<KafkaProducerProperties> producerProperties, String beanName) {
			DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = spy(
					super.getProducerFactory(transactionIdPrefix,
							producerProperties, beanName));
			willReturn(mockProducer).given(producerFactory).createProducer("foo-");
			return producerFactory;
		}

	};
	GenericApplicationContext applicationContext = new GenericApplicationContext();
	applicationContext.refresh();
	binder.setApplicationContext(applicationContext);
	DirectChannel channel = new DirectChannel();
	KafkaProducerProperties extension = new KafkaProducerProperties();
	ExtendedProducerProperties<KafkaProducerProperties> properties = new ExtendedProducerProperties<>(
			extension);
	binder.bindProducer("foo", channel, properties);
	channel.send(new GenericMessage<>("foo".getBytes()));
	InOrder inOrder = inOrder(mockProducer);
	inOrder.verify(mockProducer).beginTransaction();
	inOrder.verify(mockProducer).send(any(ProducerRecord.class), any(Callback.class));
	inOrder.verify(mockProducer).commitTransaction();
	inOrder.verify(mockProducer).close(any());
	inOrder.verifyNoMoreInteractions();
	assertThat(TestUtils.getPropertyValue(channel,
			"dispatcher.theOneHandler.useNativeEncoding", Boolean.class)).isTrue();
}