下面列出了怎么用org.apache.kafka.common.config.SslConfigs的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Configures KeyStore related settings in SslContextFactory.
*/
protected static void configureSslContextFactoryKeyStore(SslContextFactory ssl,
Map<String, Object> sslConfigValues) {
ssl.setKeyStoreType((String)
getOrDefault(sslConfigValues, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE));
String sslKeystoreLocation = (String) sslConfigValues.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
if (sslKeystoreLocation != null) {
ssl.setKeyStorePath(sslKeystoreLocation);
}
Password sslKeystorePassword =
new Password((String) sslConfigValues.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
if (sslKeystorePassword != null) {
ssl.setKeyStorePassword(sslKeystorePassword.value());
}
Password sslKeyPassword =
new Password((String) sslConfigValues.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
if (sslKeyPassword != null) {
ssl.setKeyManagerPassword(sslKeyPassword.value());
}
}
/**
* Configures TrustStore related settings in SslContextFactory.
*/
protected static void configureSslContextFactoryTrustStore(SslContextFactory ssl,
Map<String, Object> sslConfigValues) {
ssl.setTrustStoreType(
(String) getOrDefault(
sslConfigValues,
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE));
String sslTruststoreLocation = (String) sslConfigValues.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
if (sslTruststoreLocation != null) {
ssl.setTrustStorePath(sslTruststoreLocation);
}
Password sslTruststorePassword =
new Password((String) sslConfigValues.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
if (sslTruststorePassword != null) {
ssl.setTrustStorePassword(sslTruststorePassword.value());
}
}
@Test
public void testSslPasswords() {
ConfigDef def = new ConfigDef();
SslConfigs.addClientSslSupport(def);
Properties props = new Properties();
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "key_password");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "keystore_password");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "truststore_password");
Map<String, Object> vals = def.parse(props);
assertEquals(new Password("key_password"), vals.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
assertEquals(Password.HIDDEN, vals.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG).toString());
assertEquals(new Password("keystore_password"), vals.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
assertEquals(Password.HIDDEN, vals.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG).toString());
assertEquals(new Password("truststore_password"), vals.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
assertEquals(Password.HIDDEN, vals.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).toString());
}
@Test
public void shouldPickUpNonPrefixedSslConfig() {
// Given:
final KsqlConfig config = config(
SslConfigs.SSL_PROTOCOL_CONFIG, "SSLv3"
);
final Map<String, Object> expectedConfigs = defaultConfigs();
expectedConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "SSLv3");
setUpMocksWithExpectedConfig(expectedConfigs);
// When:
final SchemaRegistryClient client =
new KsqlSchemaRegistryClientFactory(config, restServiceSupplier, sslFactory).create();
// Then:
assertThat(client, is(notNullValue()));
EasyMock.verify(restService);
}
@Test
public void shouldPickUpPrefixedSslConfig() {
// Given:
final KsqlConfig config = config(
"ksql.schema.registry." + SslConfigs.SSL_PROTOCOL_CONFIG, "SSLv3"
);
final Map<String, Object> expectedConfigs = defaultConfigs();
expectedConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "SSLv3");
setUpMocksWithExpectedConfig(expectedConfigs);
// When:
final SchemaRegistryClient client =
new KsqlSchemaRegistryClientFactory(config, restServiceSupplier, sslFactory).create();
// Then:
assertThat(client, is(notNullValue()));
EasyMock.verify(restService);
}
/**
* Get a new instance of an SSL KafkaCache and initialize it.
*/
public static Cache<String, String> createAndInitSSLKafkaCacheInstance(
String bootstrapServers, Map<String, Object> sslConfigs, boolean requireSSLClientAuth)
throws CacheInitializationException {
Properties props = new Properties();
props.put(KafkaCacheConfig.KAFKACACHE_SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.SSL.toString());
props.put(KafkaCacheConfig.KAFKACACHE_SSL_TRUSTSTORE_LOCATION_CONFIG,
sslConfigs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
props.put(KafkaCacheConfig.KAFKACACHE_SSL_TRUSTSTORE_PASSWORD_CONFIG,
((Password) sslConfigs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
if (requireSSLClientAuth) {
props.put(KafkaCacheConfig.KAFKACACHE_SSL_KEYSTORE_LOCATION_CONFIG,
sslConfigs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
props.put(KafkaCacheConfig.KAFKACACHE_SSL_KEYSTORE_PASSWORD_CONFIG,
((Password) sslConfigs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).value());
props.put(KafkaCacheConfig.KAFKACACHE_SSL_KEY_PASSWORD_CONFIG,
((Password) sslConfigs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).value());
}
Cache<String, String> inMemoryCache = new InMemoryCache<>();
return createAndInitKafkaCacheInstance(bootstrapServers, inMemoryCache, props);
}
/**
* If SSL is configured for this cluster, apply the settings.
* @param clusterConfig Cluster configuration definition to source values from.
* @param config Config map to apply settings to.
*/
private void applySslSettings(final ClusterConfig clusterConfig, final Map<String, Object> config) {
// Optionally configure SSL
if (!clusterConfig.isUseSsl()) {
return;
}
if (clusterConfig.isUseSasl()) {
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
} else {
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
// KeyStore and KeyStore password only needed if NOT using SASL
config.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreRootPath + "/" + clusterConfig.getKeyStoreFile());
config.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, clusterConfig.getKeyStorePassword());
}
// Only put Trust properties if one is defined
if (clusterConfig.getTrustStoreFile() != null) {
config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, keyStoreRootPath + "/" + clusterConfig.getTrustStoreFile());
config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, clusterConfig.getTrustStorePassword());
}
}
/**
* If SASL is configured for this cluster, apply the settings.
* @param clusterConfig Cluster configuration definition to source values from.
* @param config Config map to apply settings to.
*/
private void applySaslSettings(final ClusterConfig clusterConfig, final Map<String, Object> config) {
// If we're using SSL, we've already configured everything for SASL too...
if (!clusterConfig.isUseSasl()) {
return;
}
// If not using SSL
if (clusterConfig.isUseSsl()) {
// SASL+SSL
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
// Keystore and keystore password not required if using SASL+SSL
config.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
config.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
} else {
// Just SASL PLAINTEXT
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
}
config.put(SaslConfigs.SASL_MECHANISM, clusterConfig.getSaslMechanism());
config.put(SaslConfigs.SASL_JAAS_CONFIG, clusterConfig.getSaslJaas());
}
private void validateSsl(
final Map<String, Object> config,
final String expectedSecurityProtocol,
final boolean shouldHaveKeyStoreConfiguration,
final boolean shouldHaveTrustStoreConfiguration
) {
assertNotNull(config);
validateKey(config, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, expectedSecurityProtocol);
if (shouldHaveTrustStoreConfiguration) {
validateKey(config, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/tmp/" + expectedTrustStoreFile);
validateKey(config, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, expectedTrustStorePassword);
} else {
validateNoKey(config, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
validateNoKey(config, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
}
if (shouldHaveKeyStoreConfiguration) {
validateKey(config, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/tmp/" + expectedKeyStoreFile);
validateKey(config, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, expectedKeyStorePassword);
} else {
validateNoKey(config, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
validateNoKey(config, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
}
}
static void buildSslOptions(Ssl ssl, Map<String, Object> properties) {
if (ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
ssl.getKeyPassword());
}
if (ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
ssl.getKeystoreLocation());
}
if (ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
ssl.getKeystorePassword());
}
if (ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
ssl.getTruststoreLocation());
}
if (ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
ssl.getTruststorePassword());
}
}
private static Map<String, Object> createSslConfig(Mode mode, File keyStoreFile, Password password, Password keyPassword,
File trustStoreFile, Password trustStorePassword) {
Map<String, Object> sslConfigs = new HashMap<>();
sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
if (mode == Mode.SERVER || (mode == Mode.CLIENT && keyStoreFile != null)) {
sslConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath());
sslConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
sslConfigs.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
sslConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
sslConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
}
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath());
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
sslConfigs.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
List<String> enabledProtocols = new ArrayList<>();
enabledProtocols.add("TLSv1.2");
sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
return sslConfigs;
}
@Provides
@Singleton
AdminClient provideAdminClient(@BootstrapServers String bootstrapServers, @Nullable SslConfiguration sslConfiguration) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
if (null != sslConfiguration) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SslConfiguration.PROTOCOL);
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfiguration.getTrustStoreLocation());
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslConfiguration.getTrustStorePassword());
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslConfiguration.getKeyStoreLocation());
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslConfiguration.getKeyStorePassword());
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslConfiguration.getKeyPassword());
}
return AdminClient.create(properties);
}
public KafkaClientPropertiesBuilder withSaslJassConfigAndTls(String clientId, String clientSecretName, String oauthTokenEndpointUri) {
try {
importKeycloakCertificateToTruststore(properties);
fixBadlyImportedAuthzSettings();
} catch (Exception e) {
e.printStackTrace();
}
if (clientId.isEmpty() || clientSecretName.isEmpty() || oauthTokenEndpointUri.isEmpty()) {
throw new InvalidParameterException("You do not specify client-id, client-secret name or oauth-token-endpoint-uri inside kafka client!");
}
properties.setProperty(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule " +
"required " +
"oauth.client.id=\"" + clientId + "\" " +
"oauth.client.secret=\"" + clientSecretName + "\" " +
"oauth.token.endpoint.uri=\"" + oauthTokenEndpointUri + "\" " +
"oauth.ssl.endpoint.identification.algorithm=\"\"" +
"oauth.ssl.truststore.location=\"" + properties.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG) + "\" " +
"oauth.ssl.truststore.password=\"" + properties.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG) + "\" " +
"oauth.ssl.truststore.type=\"" + properties.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG) + "\" ;");
return this;
}
@Override
protected Properties getDefaultKafkaProducerProperties(String kafkaServer, String kafkaTopic, String kafkaProducerID, String schemaFilename){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProducerID);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
com.bbn.tc.schema.serialization.kafka.KafkaAvroGenericSerializer.class);
properties.put(AvroConfig.SCHEMA_WRITER_FILE, schemaFilename);
properties.put(AvroConfig.SCHEMA_SERDE_IS_SPECIFIC, true);
if(useSsl){
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreLocation);
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassword);
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
}
return properties;
}
public static Map<String, String> createConnMaps(KafkaDatastoreProperties datastore, boolean isBeam) {
Map<String, String> props = new HashMap<>();
if (datastore != null) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, datastore.brokers.getValue());
if (!isBeam) {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
if (datastore.useSsl.getValue()) {
props.put("security.protocol", "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, datastore.trustStoreType.getValue().toString());
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, datastore.trustStorePath.getValue());
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, datastore.trustStorePassword.getValue());
if (datastore.needClientAuth.getValue()) {
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, datastore.keyStoreType.getValue().toString());
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, datastore.keyStorePath.getValue());
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, datastore.keyStorePassword.getValue());
}
}
}
return props;
}
@Test
public void testAuthorizedWrite() throws Exception {
// Create the Producer
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:" + port);
producerProps.put("acks", "all");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath);
producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass");
producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath);
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security");
final Producer<String, String> producer = new KafkaProducer<>(producerProps);
// Send a message
Future<RecordMetadata> record =
producer.send(new ProducerRecord<String, String>("dev", "somekey", "somevalue"));
producer.flush();
record.get();
producer.close();
}
private KafkaProducer<String, String> createKafkaProducer(String user) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "SentryKafkaProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".keystore.jks").getPath());
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, user + "-ks-passwd");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, user + "-key-passwd");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".truststore.jks").getPath());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, user + "-ts-passwd");
return new KafkaProducer<String, String>(props);
}
private KafkaConsumer<String, String> createKafkaConsumer(String user) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sentrykafkaconsumer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".keystore.jks").getPath());
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, user + "-ks-passwd");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, user + "-key-passwd");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".truststore.jks").getPath());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, user + "-ts-passwd");
return new KafkaConsumer<String, String>(props);
}
/**
* Configures Protocol, Algorithm and Provider related settings in SslContextFactory.
*/
protected static void configureSslContextFactoryAlgorithms(SslContextFactory ssl,
Map<String, Object> sslConfigValues) {
Set<String> sslEnabledProtocols =
(Set<String>) getOrDefault(
sslConfigValues,
SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
Arrays.asList(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS.split("\\s*,\\s*")));
ssl.setIncludeProtocols(sslEnabledProtocols.toArray(new String[sslEnabledProtocols.size()]));
String sslProvider = (String) sslConfigValues.get(SslConfigs.SSL_PROVIDER_CONFIG);
if (sslProvider != null) {
ssl.setProvider(sslProvider);
}
ssl.setProtocol(
(String) getOrDefault(sslConfigValues, SslConfigs.SSL_PROTOCOL_CONFIG, SslConfigs.DEFAULT_SSL_PROTOCOL));
Set<String> sslCipherSuites = (Set<String>) sslConfigValues.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
if (sslCipherSuites != null) {
ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[sslCipherSuites.size()]));
}
ssl.setKeyManagerFactoryAlgorithm((String) getOrDefault(
sslConfigValues,
SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG,
SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM));
String sslSecureRandomImpl = (String) sslConfigValues.get(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);
if (sslSecureRandomImpl != null) {
ssl.setSecureRandomAlgorithm(sslSecureRandomImpl);
}
ssl.setTrustManagerFactoryAlgorithm((String) getOrDefault(
sslConfigValues,
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM));
}
/**
* @return props consumers and producers will need to connect to a secure Kafka cluster over SSL.
*/
public static Map<String, ?> trustStoreProps() {
return ImmutableMap.of(
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStorePath(),
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword()
);
}
/**
* @return props brokers will need to connect to support SSL connections.
*/
public static Map<String, ?> keyStoreProps() {
return ImmutableMap.of(
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStorePath(),
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, KEYSTORE_PASSWORD,
SslConfigs.SSL_KEY_PASSWORD_CONFIG, KEY_PASSWORD,
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, keyStorePath(),
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, TRUSTSTORE_PASSWORD
);
}
static void buildCommonKafkaProperties(final ProcessContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
// Translate SSLContext Service configuration into Kafka properties
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && sslContextService.isKeyStoreConfigured()) {
mapToPopulate.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile());
mapToPopulate.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslContextService.getKeyStorePassword());
final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword();
mapToPopulate.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPass);
mapToPopulate.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, sslContextService.getKeyStoreType());
}
if (sslContextService != null && sslContextService.isTrustStoreConfigured()) {
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslContextService.getTrustStoreFile());
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslContextService.getTrustStorePassword());
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
}
}
String propertyName = propertyDescriptor.getName();
String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
: context.getProperty(propertyDescriptor).getValue();
if (propertyValue != null) {
// If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
// or the standard NiFi time period such as "5 secs"
if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
}
if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
mapToPopulate.put(propertyName, propertyValue);
}
}
}
}
static void buildCommonKafkaProperties(final ProcessContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
// Translate SSLContext Service configuration into Kafka properties
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && sslContextService.isKeyStoreConfigured()) {
mapToPopulate.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile());
mapToPopulate.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslContextService.getKeyStorePassword());
final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword();
mapToPopulate.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPass);
mapToPopulate.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, sslContextService.getKeyStoreType());
}
if (sslContextService != null && sslContextService.isTrustStoreConfigured()) {
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslContextService.getTrustStoreFile());
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslContextService.getTrustStorePassword());
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
}
}
String propertyName = propertyDescriptor.getName();
String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
: context.getProperty(propertyDescriptor).getValue();
if (propertyValue != null) {
// If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
// or the standard NiFi time period such as "5 secs"
if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
}
if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
mapToPopulate.put(propertyName, propertyValue);
}
}
}
}
private static void addSSL(Properties props) {
try {
File sslDir = KafkaTestResource.sslDir(null, false);
File tsFile = new File(sslDir, "ks-truststore.p12");
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tsFile.getPath());
props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private static void addSSL(Properties props) {
File sslDir = new File("src/test/resources");
File tsFile = new File(sslDir, "kafka-truststore.p12");
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tsFile.getPath());
props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
}
private static void addSsl(Properties props) {
try {
File sslDir = KafkaSSLTestResource.sslDir(null, false);
File tsFile = new File(sslDir, "kafka-truststore.p12");
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tsFile.getPath());
props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@SuppressWarnings("Duplicates")
private Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
if (this.ssl.getKeyPassword() != null) {
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
}
if (this.ssl.getKeystoreLocation() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getKeystoreLocation()));
}
if (this.ssl.getKeystorePassword() != null) {
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.ssl.getKeystorePassword());
}
if (this.ssl.getTruststoreLocation() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, resourceToPath(this.ssl.getTruststoreLocation()));
}
if (this.ssl.getTruststorePassword() != null) {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.ssl.getTruststorePassword());
}
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
}
return properties;
}
/**
* Parse AdminClient configs based on the given {@link KafkaCruiseControlConfig configs}.
*
* @param configs Configs to be used for parsing AdminClient configs.
* @return AdminClient configs.
*/
public static Map<String, Object> parseAdminClientConfigs(KafkaCruiseControlConfig configs) {
Map<String, Object> adminClientConfigs = new HashMap<>();
// Add bootstrap server.
List<String> bootstrapServers = configs.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG);
String bootstrapServersString = bootstrapServers.toString()
.replace(" ", "")
.replace("[", "")
.replace("]", "");
adminClientConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersString);
// Add security protocol (if specified).
try {
String securityProtocol = configs.getString(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
adminClientConfigs.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);
setStringConfigIfExists(configs, adminClientConfigs, SaslConfigs.SASL_MECHANISM);
setClassConfigIfExists(configs, adminClientConfigs, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS);
setPasswordConfigIfExists(configs, adminClientConfigs, SaslConfigs.SASL_JAAS_CONFIG);
// Configure SSL configs (if security protocol is SSL or SASL_SSL)
if (securityProtocol.equals(SecurityProtocol.SSL.name) || securityProtocol.equals(SecurityProtocol.SASL_SSL.name)) {
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
setPasswordConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
setPasswordConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEY_PASSWORD_CONFIG);
setPasswordConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
}
} catch (ConfigException ce) {
// let it go.
}
return adminClientConfigs;
}
/**
* Parse AdminClient configs based on the given {@link CruiseControlMetricsReporterConfig configs}.
*
* @param adminClientConfigs Configs that will be return with SSL configs.
* @param configs Configs to be used for parsing AdminClient SSL configs.
* @return AdminClient configs.
*/
public static Properties addSslConfigs(Properties adminClientConfigs, CruiseControlMetricsReporterConfig configs) {
// Add security protocol (if specified).
try {
String securityProtocol = configs.getString(AdminClientConfig.SECURITY_PROTOCOL_CONFIG);
adminClientConfigs.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);
setStringConfigIfExists(configs, adminClientConfigs, SaslConfigs.SASL_MECHANISM);
setPasswordConfigIfExists(configs, adminClientConfigs, SaslConfigs.SASL_JAAS_CONFIG);
// Configure SSL configs (if security protocol is SSL or SASL_SSL)
if (securityProtocol.equals(SecurityProtocol.SSL.name) || securityProtocol.equals(SecurityProtocol.SASL_SSL.name)) {
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
setStringConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);
setPasswordConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
setPasswordConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_KEY_PASSWORD_CONFIG);
setPasswordConfigIfExists(configs, adminClientConfigs, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
}
} catch (ConfigException ce) {
// let it go.
}
return adminClientConfigs;
}
/**
* Set topic ssl.
*/
public void ssl(Properties props, String clusterAlias) {
// configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SystemConfigUtils.getProperty(clusterAlias + ".kafka.eagle.ssl.protocol"));
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, SystemConfigUtils.getProperty(clusterAlias + ".kafka.eagle.ssl.truststore.location"));
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SystemConfigUtils.getProperty(clusterAlias + ".kafka.eagle.ssl.truststore.password"));
// configure the following three settings for SSL Authentication
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SystemConfigUtils.getProperty(clusterAlias + ".kafka.eagle.ssl.keystore.location"));
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SystemConfigUtils.getProperty(clusterAlias + ".kafka.eagle.ssl.keystore.password"));
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SystemConfigUtils.getProperty(clusterAlias + ".kafka.eagle.ssl.key.password"));
}