下面列出了org.springframework.boot.autoconfigure.kafka.KafkaProperties#setBootstrapServers ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("rawtypes")
@Test
public void bootPropertiesOverriddenExceptServers() throws Exception {
KafkaProperties bootConfig = new KafkaProperties();
bootConfig.getProperties().put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"PLAINTEXT");
bootConfig.setBootstrapServers(Collections.singletonList("localhost:1234"));
KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties(
bootConfig);
binderConfig.getConfiguration().put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
"SSL");
ClassPathResource ts = new ClassPathResource("test.truststore.ks");
binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:9092");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig,
bootConfig);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
Map configs = KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder.configs", Map.class);
assertThat(
((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0))
.isEqualTo("localhost:1234");
adminClient.close();
}
@SuppressWarnings("rawtypes")
@Test
public void bootPropertiesOverriddenIncludingServers() throws Exception {
KafkaProperties bootConfig = new KafkaProperties();
bootConfig.getProperties().put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"PLAINTEXT");
bootConfig.setBootstrapServers(Collections.singletonList("localhost:9092"));
KafkaBinderConfigurationProperties binderConfig = new KafkaBinderConfigurationProperties(
bootConfig);
binderConfig.getConfiguration().put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
"SSL");
ClassPathResource ts = new ClassPathResource("test.truststore.ks");
binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:1234");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig,
bootConfig);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
Map configs = KafkaTestUtils.getPropertyValue(adminClient,
"client.selector.channelBuilder.configs", Map.class);
assertThat(
((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0))
.isEqualTo("localhost:1234");
adminClient.close();
}
@Test
public void testAutoCreateTopicDisabledFailsOnConsumerIfTopicNonExistentOnBroker()
throws Throwable {
KafkaProperties kafkaProperties = new TestKafkaProperties();
kafkaProperties.setBootstrapServers(Collections
.singletonList(embeddedKafka.getEmbeddedKafka().getBrokersAsString()));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
kafkaProperties);
// disable auto create topic on the binder.
configurationProperties.setAutoCreateTopics(false);
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider);
final String testTopicName = "nonExistent" + System.currentTimeMillis();
ExtendedConsumerProperties<KafkaConsumerProperties> properties = new ExtendedConsumerProperties<>(
new KafkaConsumerProperties());
expectedException.expect(BinderException.class);
expectedException.expectCause(isA(UnknownTopicOrPartitionException.class));
binder.createConsumerEndpoint(() -> testTopicName, "group", properties);
}
@Test
public void testAutoCreateTopicDisabledFailsOnProducerIfTopicNonExistentOnBroker()
throws Throwable {
KafkaProperties kafkaProperties = new TestKafkaProperties();
kafkaProperties.setBootstrapServers(Collections
.singletonList(embeddedKafka.getEmbeddedKafka().getBrokersAsString()));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
kafkaProperties);
// disable auto create topic on the binder.
configurationProperties.setAutoCreateTopics(false);
// reduce the wait time on the producer blocking operations.
configurationProperties.getConfiguration().put("max.block.ms", "3000");
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(1);
final RetryTemplate metadataRetryOperations = new RetryTemplate();
metadataRetryOperations.setRetryPolicy(simpleRetryPolicy);
provisioningProvider.setMetadataRetryOperations(metadataRetryOperations);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider);
final String testTopicName = "nonExistent" + System.currentTimeMillis();
ExtendedProducerProperties<KafkaProducerProperties> properties = new ExtendedProducerProperties<>(
new KafkaProducerProperties());
expectedException.expect(BinderException.class);
expectedException.expectCause(isA(UnknownTopicOrPartitionException.class));
binder.bindProducer(testTopicName, new DirectChannel(), properties);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testProducerRunsInTx() {
KafkaProperties kafkaProperties = new TestKafkaProperties();
kafkaProperties.setBootstrapServers(Collections
.singletonList(embeddedKafka.getEmbeddedKafka().getBrokersAsString()));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
kafkaProperties);
configurationProperties.getTransaction().setTransactionIdPrefix("foo-");
configurationProperties.getTransaction().getProducer().setUseNativeEncoding(true);
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties);
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
final Producer mockProducer = mock(Producer.class);
given(mockProducer.send(any(), any())).willReturn(new SettableListenableFuture<>());
KafkaProducerProperties extension1 = configurationProperties
.getTransaction().getProducer().getExtension();
extension1.getConfiguration().put(ProducerConfig.RETRIES_CONFIG, "1");
extension1.getConfiguration().put(ProducerConfig.ACKS_CONFIG, "all");
willReturn(Collections.singletonList(new TopicPartition("foo", 0)))
.given(mockProducer).partitionsFor(anyString());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider) {
@Override
protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
String transactionIdPrefix,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, String beanName) {
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = spy(
super.getProducerFactory(transactionIdPrefix,
producerProperties, beanName));
willReturn(mockProducer).given(producerFactory).createProducer("foo-");
return producerFactory;
}
};
GenericApplicationContext applicationContext = new GenericApplicationContext();
applicationContext.refresh();
binder.setApplicationContext(applicationContext);
DirectChannel channel = new DirectChannel();
KafkaProducerProperties extension = new KafkaProducerProperties();
ExtendedProducerProperties<KafkaProducerProperties> properties = new ExtendedProducerProperties<>(
extension);
binder.bindProducer("foo", channel, properties);
channel.send(new GenericMessage<>("foo".getBytes()));
InOrder inOrder = inOrder(mockProducer);
inOrder.verify(mockProducer).beginTransaction();
inOrder.verify(mockProducer).send(any(ProducerRecord.class), any(Callback.class));
inOrder.verify(mockProducer).commitTransaction();
inOrder.verify(mockProducer).close(any());
inOrder.verifyNoMoreInteractions();
assertThat(TestUtils.getPropertyValue(channel,
"dispatcher.theOneHandler.useNativeEncoding", Boolean.class)).isTrue();
}