下面列出了怎么用org.apache.kafka.common.config.types.Password的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void injectKarelDbProperties(Properties props) {
super.injectKarelDbProperties(props);
props.put(KarelDbConfig.LISTENERS_CONFIG, "https://localhost:" + serverPort);
props.put(KarelDbConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
try {
File trustStoreFile = File.createTempFile("truststore", ".jks");
trustStoreFile.deleteOnExit();
List<X509Certificate> clientCerts = new ArrayList<>();
List<KeyPair> keyPairs = new ArrayList<>();
props.putAll(
SecureTestUtils.clientSslConfigsWithKeyStore(1, trustStoreFile, new Password
("TrustPassword"), clientCerts,
keyPairs
));
// Currently REQUIRED cannot be used as client does not use keystore
props.put(KarelDbConfig.SSL_CLIENT_AUTHENTICATION_CONFIG, KarelDbConfig.SSL_CLIENT_AUTHENTICATION_REQUESTED);
} catch (Exception e) {
throw new RuntimeException("Exception creation SSL properties ", e);
}
}
/**
* Configures KeyStore related settings in SslContextFactory.
*/
protected static void configureSslContextFactoryKeyStore(SslContextFactory ssl,
Map<String, Object> sslConfigValues) {
ssl.setKeyStoreType((String)
getOrDefault(sslConfigValues, SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE));
String sslKeystoreLocation = (String) sslConfigValues.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
if (sslKeystoreLocation != null) {
ssl.setKeyStorePath(sslKeystoreLocation);
}
Password sslKeystorePassword =
new Password((String) sslConfigValues.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
if (sslKeystorePassword != null) {
ssl.setKeyStorePassword(sslKeystorePassword.value());
}
Password sslKeyPassword =
new Password((String) sslConfigValues.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
if (sslKeyPassword != null) {
ssl.setKeyManagerPassword(sslKeyPassword.value());
}
}
/**
* Configures TrustStore related settings in SslContextFactory.
*/
protected static void configureSslContextFactoryTrustStore(SslContextFactory ssl,
Map<String, Object> sslConfigValues) {
ssl.setTrustStoreType(
(String) getOrDefault(
sslConfigValues,
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE));
String sslTruststoreLocation = (String) sslConfigValues.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
if (sslTruststoreLocation != null) {
ssl.setTrustStorePath(sslTruststoreLocation);
}
Password sslTruststorePassword =
new Password((String) sslConfigValues.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
if (sslTruststorePassword != null) {
ssl.setTrustStorePassword(sslTruststorePassword.value());
}
}
@Test
public void testSslPasswords() {
ConfigDef def = new ConfigDef();
SslConfigs.addClientSslSupport(def);
Properties props = new Properties();
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "key_password");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "keystore_password");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "truststore_password");
Map<String, Object> vals = def.parse(props);
assertEquals(new Password("key_password"), vals.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
assertEquals(Password.HIDDEN, vals.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG).toString());
assertEquals(new Password("keystore_password"), vals.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
assertEquals(Password.HIDDEN, vals.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG).toString());
assertEquals(new Password("truststore_password"), vals.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
assertEquals(Password.HIDDEN, vals.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG).toString());
}
/**
* Get a new instance of an SSL KafkaCache and initialize it.
*/
public static Cache<String, String> createAndInitSSLKafkaCacheInstance(
String bootstrapServers, Map<String, Object> sslConfigs, boolean requireSSLClientAuth)
throws CacheInitializationException {
Properties props = new Properties();
props.put(KafkaCacheConfig.KAFKACACHE_SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.SSL.toString());
props.put(KafkaCacheConfig.KAFKACACHE_SSL_TRUSTSTORE_LOCATION_CONFIG,
sslConfigs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
props.put(KafkaCacheConfig.KAFKACACHE_SSL_TRUSTSTORE_PASSWORD_CONFIG,
((Password) sslConfigs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
if (requireSSLClientAuth) {
props.put(KafkaCacheConfig.KAFKACACHE_SSL_KEYSTORE_LOCATION_CONFIG,
sslConfigs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
props.put(KafkaCacheConfig.KAFKACACHE_SSL_KEYSTORE_PASSWORD_CONFIG,
((Password) sslConfigs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)).value());
props.put(KafkaCacheConfig.KAFKACACHE_SSL_KEY_PASSWORD_CONFIG,
((Password) sslConfigs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)).value());
}
Cache<String, String> inMemoryCache = new InMemoryCache<>();
return createAndInitKafkaCacheInstance(bootstrapServers, inMemoryCache, props);
}
@Test(timeout = 120000)
public void isKafkaReadyWithSASLAndSSL() throws Exception {
Properties clientSecurityProps = kafka.getClientSecurityConfig();
Map<String, String> config = Utils.propsToStringMap(clientSecurityProps);
config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapBroker
(SecurityProtocol.SASL_SSL));
// Set password and enabled protocol as the Utils.propsToStringMap just returns toString()
// representations and these properties don't have a valid representation.
Password trustStorePassword = (Password) clientSecurityProps.get("ssl.truststore.password");
config.put("ssl.truststore.password", trustStorePassword.value());
config.put("ssl.enabled.protocols", "TLSv1.2");
assertThat(ClusterStatus.isKafkaReady(config, 3, 10000)).isTrue();
}
@Test(timeout = 120000)
public void isKafkaReadyWithSASLAndSSLUsingZK() throws Exception {
Properties clientSecurityProps = kafka.getClientSecurityConfig();
boolean zkReady = ClusterStatus.isZookeeperReady(this.kafka.getZookeeperConnectString(), 30000);
if (!zkReady) {
throw new RuntimeException(
"Could not reach zookeeper " + this.kafka.getZookeeperConnectString());
}
Map<String, String> endpoints = ClusterStatus.getKafkaEndpointFromZookeeper(
this.kafka.getZookeeperConnectString(),
30000
);
String bootstrap_broker = endpoints.get("SASL_SSL");
Map<String, String> config = Utils.propsToStringMap(clientSecurityProps);
config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrap_broker);
// Set password and enabled protocol as the Utils.propsToStringMap just returns toString()
// representations and these properties don't have a valid representation.
Password trustStorePassword = (Password) clientSecurityProps.get("ssl.truststore.password");
config.put("ssl.truststore.password", trustStorePassword.value());
config.put("ssl.enabled.protocols", "TLSv1.2");
assertThat(ClusterStatus.isKafkaReady(config, 3, 10000)).isTrue();
}
private static Map<String, Object> createSslConfig(Mode mode, File keyStoreFile, Password password, Password keyPassword,
File trustStoreFile, Password trustStorePassword) {
Map<String, Object> sslConfigs = new HashMap<>();
sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
if (mode == Mode.SERVER || (mode == Mode.CLIENT && keyStoreFile != null)) {
sslConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath());
sslConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
sslConfigs.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
sslConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
sslConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
}
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath());
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
sslConfigs.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
List<String> enabledProtocols = new ArrayList<>();
enabledProtocols.add("TLSv1.2");
sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
return sslConfigs;
}
protected void initTypeMap() {
javaClassToKafkaType.put(Boolean.class, ConfigDef.Type.BOOLEAN);
javaClassToKafkaType.put(Boolean.TYPE, ConfigDef.Type.BOOLEAN);
javaClassToKafkaType.put(String.class, ConfigDef.Type.STRING);
javaClassToKafkaType.put(Integer.class, ConfigDef.Type.INT);
javaClassToKafkaType.put(Integer.TYPE, ConfigDef.Type.INT);
javaClassToKafkaType.put(Short.class, ConfigDef.Type.SHORT);
javaClassToKafkaType.put(Short.TYPE, ConfigDef.Type.SHORT);
javaClassToKafkaType.put(Long.class, ConfigDef.Type.LONG);
javaClassToKafkaType.put(Long.TYPE, ConfigDef.Type.LONG);
javaClassToKafkaType.put(Double.class, ConfigDef.Type.DOUBLE);
javaClassToKafkaType.put(Double.TYPE, ConfigDef.Type.DOUBLE);
javaClassToKafkaType.put(List.class, ConfigDef.Type.LIST);
javaClassToKafkaType.put(Class.class, ConfigDef.Type.CLASS);
javaClassToKafkaType.put(Password.class, ConfigDef.Type.PASSWORD);
}
@BeforeClass
public static void setUp() throws Exception {
final File trustStore = File.createTempFile("ApiHeadersTest-truststore", ".jks");
final File clientKeystore = File.createTempFile("ApiHeadersTest-client-keystore", ".jks");
final File serverKeystore = File.createTempFile("ApiHeadersTest-server-keystore", ".jks");
clientKeystoreLocation = clientKeystore.getAbsolutePath();
final Map<String, X509Certificate> certs = new HashMap<>();
createKeystoreWithCert(clientKeystore, "client", certs);
createKeystoreWithCert(serverKeystore, "server", certs);
TestSslUtils.createTrustStore(trustStore.getAbsolutePath(), new Password(SSL_PASSWORD), certs);
final Properties props = new Properties();
props.put(RestConfig.LISTENERS_CONFIG, httpUri + "," + httpsUri);
props.put(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG, serverKeystore.getAbsolutePath());
props.put(RestConfig.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_PASSWORD);
props.put(RestConfig.SSL_KEY_PASSWORD_CONFIG, SSL_PASSWORD);
app = new TestApplication(new TestRestConfig(props));
app.start();
}
@Before
public void setUp() throws Exception {
try {
trustStore = File.createTempFile("SslTest-truststore", ".jks");
clientKeystore = File.createTempFile("SslTest-client-keystore", ".jks");
serverKeystore = File.createTempFile("SslTest-server-keystore", ".jks");
serverKeystoreBak = File.createTempFile("SslTest-server-keystore", ".jks.bak");
serverKeystoreErr = File.createTempFile("SslTest-server-keystore", ".jks.err");
} catch (IOException ioe) {
throw new RuntimeException("Unable to create temporary files for trust stores and keystores.");
}
Map<String, X509Certificate> certs = new HashMap<>();
createKeystoreWithCert(clientKeystore, "client", certs);
createKeystoreWithCert(serverKeystore, "server", certs);
TestSslUtils.createTrustStore(trustStore.getAbsolutePath(), new Password(SSL_PASSWORD), certs);
Files.copy(serverKeystore.toPath(), serverKeystoreBak.toPath(), StandardCopyOption.REPLACE_EXISTING);
certs = new HashMap<>();
createWrongKeystoreWithCert(serverKeystoreErr, "server", certs);
}
public static Properties clientSslConfigsWithKeyStore(
int numberOfCerts,
File trustStoreFile,
Password trustPassword,
List<X509Certificate> clientCerts,
List<KeyPair> keyPairs
) throws GeneralSecurityException, IOException {
Map<String, X509Certificate> certificateMap = new HashMap<>();
File clientKSFile = File.createTempFile("CKeystore", ".jks");
clientKSFile.deleteOnExit();
String keyStorePassword = new Password("Client-KS-Password").value();
for (int i = 0; i < numberOfCerts; i++) {
KeyPair kp = TestSslUtils.generateKeyPair("RSA");
X509Certificate cert = TestSslUtils.generateCertificate(
"CN=localhost, O=Client" + i, kp, 30, "SHA1withRSA");
clientCerts.add(cert);
keyPairs.add(kp);
certificateMap.put("client-" + i, cert);
}
createKeyStore(clientKSFile, keyStorePassword, clientCerts, keyPairs);
TestSslUtils.createTrustStore(trustStoreFile.toString(), trustPassword, certificateMap);
Properties sslConfigs =
getClientSslConfigs(trustStoreFile, trustPassword.value(), clientKSFile, keyStorePassword);
return sslConfigs;
}
@Test
public void testBasicTypes() {
ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), Importance.HIGH, "docs")
.define("b", Type.LONG, Importance.HIGH, "docs")
.define("c", Type.STRING, "hello", Importance.HIGH, "docs")
.define("d", Type.LIST, Importance.HIGH, "docs")
.define("e", Type.DOUBLE, Importance.HIGH, "docs")
.define("f", Type.CLASS, Importance.HIGH, "docs")
.define("g", Type.BOOLEAN, Importance.HIGH, "docs")
.define("h", Type.BOOLEAN, Importance.HIGH, "docs")
.define("i", Type.BOOLEAN, Importance.HIGH, "docs")
.define("j", Type.PASSWORD, Importance.HIGH, "docs");
Properties props = new Properties();
props.put("a", "1 ");
props.put("b", 2);
props.put("d", " a , b, c");
props.put("e", 42.5d);
props.put("f", String.class.getName());
props.put("g", "true");
props.put("h", "FalSE");
props.put("i", "TRUE");
props.put("j", "password");
Map<String, Object> vals = def.parse(props);
assertEquals(1, vals.get("a"));
assertEquals(2L, vals.get("b"));
assertEquals("hello", vals.get("c"));
assertEquals(asList("a", "b", "c"), vals.get("d"));
assertEquals(42.5d, vals.get("e"));
assertEquals(String.class, vals.get("f"));
assertEquals(true, vals.get("g"));
assertEquals(false, vals.get("h"));
assertEquals(true, vals.get("i"));
assertEquals(new Password("password"), vals.get("j"));
assertEquals(Password.HIDDEN, vals.get("j").toString());
}
public final GoogleCredentials getCredentials() {
final String credentialsPath = getString(GCS_CREDENTIALS_PATH_CONFIG);
final Password credentialsJsonPwd = getPassword(GCS_CREDENTIALS_JSON_CONFIG);
try {
String credentialsJson = null;
if (credentialsJsonPwd != null) {
credentialsJson = credentialsJsonPwd.value();
}
return GoogleCredentialsBuilder.build(credentialsPath, credentialsJson);
} catch (final Exception e) {
throw new ConfigException("Failed to create GCS credentials: " + e.getMessage());
}
}
@Test
public void testEnvConfigProvider() throws IOException {
KafkaCruiseControlConfig configs = KafkaCruiseControlUtils.readConfig(
Objects.requireNonNull(this.getClass().getClassLoader().getResource(ENV_CONFIG_PROVIDER_TEST_PROPERTIES)).getPath());
// Test password substitution
Password actualSslKeystorePassword = configs.getPassword(WebServerConfig.WEBSERVER_SSL_KEYSTORE_PASSWORD_CONFIG);
Password expectedSslKeystorePassword = new Password(TEST_PASSWORD);
assertEquals(expectedSslKeystorePassword, actualSslKeystorePassword);
// Test when the environment variable is not defined and the password isn't substituted
Password actualSslKeyPassword = configs.getPassword(WebServerConfig.WEBSERVER_SSL_KEY_PASSWORD_CONFIG);
Password expectedSslKeyPassword = new Password(NOT_SUBSTITUTED_CONFIG);
assertEquals(expectedSslKeyPassword, actualSslKeyPassword);
}
public ConfigDefSerializationModule() {
super();
addSerializer(Password.class, new Serializer());
addDeserializer(Password.class, new Deserializer());
addSerializer(ConfigDef.Validator.class, new JsonSerializer<ConfigDef.Validator>() {
@Override
public void serialize(ConfigDef.Validator validator, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeString(validator.toString());
}
});
}
public static void createKeyStore(String filename,
Password password, String alias,
Key privateKey, Certificate cert) throws GeneralSecurityException, IOException {
KeyStore ks = createEmptyKeyStore();
ks.setKeyEntry(alias, privateKey, password.value().toCharArray(),
new Certificate[]{cert});
saveKeyStore(ks, filename, password);
}
public static <T extends Certificate> void createTrustStore(
String filename, Password password, Map<String, T> certs) throws GeneralSecurityException, IOException {
KeyStore ks = KeyStore.getInstance("JKS");
try {
FileInputStream in = new FileInputStream(filename);
ks.load(in, password.value().toCharArray());
in.close();
} catch (EOFException e) {
ks = createEmptyKeyStore();
}
for (Map.Entry<String, T> cert : certs.entrySet()) {
ks.setCertificateEntry(cert.getKey(), cert.getValue());
}
saveKeyStore(ks, filename, password);
}
public static Map<String, Object> createSslConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias, String host)
throws IOException, GeneralSecurityException {
Map<String, X509Certificate> certs = new HashMap<>();
File keyStoreFile = null;
Password password = mode == Mode.SERVER ? new Password("ServerPassword") : new Password("ClientPassword");
Password trustStorePassword = new Password("TrustStorePassword");
if (mode == Mode.CLIENT && useClientCert) {
keyStoreFile = File.createTempFile("clientKS", ".jks");
KeyPair cKP = generateKeyPair("RSA");
X509Certificate cCert = generateCertificate("CN=" + host + ", O=A client", cKP, 30, "SHA1withRSA");
createKeyStore(keyStoreFile.getPath(), password, "client", cKP.getPrivate(), cCert);
certs.put(certAlias, cCert);
keyStoreFile.deleteOnExit();
} else if (mode == Mode.SERVER) {
keyStoreFile = File.createTempFile("serverKS", ".jks");
KeyPair sKP = generateKeyPair("RSA");
X509Certificate sCert = generateCertificate("CN=" + host + ", O=A server", sKP, 30,
"SHA1withRSA");
createKeyStore(keyStoreFile.getPath(), password, password, "server", sKP.getPrivate(), sCert);
certs.put(certAlias, sCert);
keyStoreFile.deleteOnExit();
}
if (trustStore) {
createTrustStore(trustStoreFile.getPath(), trustStorePassword, certs);
trustStoreFile.deleteOnExit();
}
return createSslConfig(mode, keyStoreFile, password, password, trustStoreFile, trustStorePassword);
}
private static void createKeystoreWithCert(File file, String alias, Map<String, X509Certificate> certs) throws Exception {
final KeyPair keypair = TestSslUtils.generateKeyPair("RSA");
final X509Certificate cert = new CertificateBuilder(30, "SHA1withRSA")
.sanDnsName("localhost").generate("CN=mymachine.local, O=A client", keypair);
TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), alias,
keypair.getPrivate(), cert);
certs.put(alias, cert);
}
private void createKeystoreWithCert(File file, String alias, Map<String, X509Certificate> certs) throws Exception {
KeyPair keypair = TestSslUtils.generateKeyPair("RSA");
CertificateBuilder certificateBuilder = new CertificateBuilder(30, "SHA1withRSA");
X509Certificate cCert = certificateBuilder.sanDnsName("localhost")
.generate("CN=mymachine.local, O=A client", keypair);
TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), alias, keypair.getPrivate(), cCert);
certs.put(alias, cCert);
}
private void createWrongKeystoreWithCert(File file, String alias, Map<String, X509Certificate> certs) throws Exception {
KeyPair keypair = TestSslUtils.generateKeyPair("RSA");
CertificateBuilder certificateBuilder = new CertificateBuilder(30, "SHA1withRSA");
X509Certificate cCert = certificateBuilder.sanDnsName("fail")
.generate("CN=mymachine.local, O=A client", keypair);
TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), alias, keypair.getPrivate(), cCert);
certs.put(alias, cCert);
}
@Test
public void testConvertValueToStringPassword() {
assertEquals(Password.HIDDEN, ConfigDef.convertToString(new Password("foobar"), Type.PASSWORD));
assertEquals("foobar", ConfigDef.convertToString("foobar", Type.PASSWORD));
assertNull(ConfigDef.convertToString(null, Type.PASSWORD));
}
public static void main(String... args) throws IOException {
if (args.length != 6) {
System.err.println(
"Usage : <command> <num_kafka_brokers> <num_zookeeper_nodes> " +
"<sasl_ssl_enabled> <client properties path> <jaas_file> " +
"<minikdc_working_dir>"
);
System.exit(1);
}
int numBrokers = Integer.parseInt(args[0]);
int numZKNodes = Integer.parseInt(args[1]);
boolean isSASLSSLEnabled = Boolean.parseBoolean(args[2]);
String clientPropsPath = args[3];
String jaasConfigPath = args[4];
String miniKDCDir = args[5];
System.out.println(
"Starting a " + numBrokers + " node Kafka cluster with " + numZKNodes +
" zookeeper nodes."
);
if (isSASLSSLEnabled) {
System.out.println("SASL_SSL is enabled. jaas.conf=" + jaasConfigPath);
System.out.println("SASL_SSL is enabled. krb.conf=" + miniKDCDir + "/krb.conf");
}
final EmbeddedKafkaCluster kafka = new EmbeddedKafkaCluster(
numBrokers,
numZKNodes,
isSASLSSLEnabled,
jaasConfigPath,
miniKDCDir
);
System.out.println("Writing client properties to " + clientPropsPath);
Properties props = kafka.getClientSecurityConfig();
Password trustStorePassword = (Password) props.get("ssl.truststore.password");
props.put("ssl.truststore.password", trustStorePassword.value());
props.put("ssl.enabled.protocols", "TLSv1.2");
props.store(new FileOutputStream(clientPropsPath), null);
kafka.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
kafka.shutdown();
}
});
}
@Override
public void serialize(Password password, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
jsonGenerator.writeString(password.toString());
}
@Override
public Password deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
String text = jsonParser.readValueAs(String.class);
return new Password(text);
}
private static void saveKeyStore(KeyStore ks, String filename,
Password password) throws GeneralSecurityException, IOException {
try (FileOutputStream out = new FileOutputStream(filename)) {
ks.store(out, password.value().toCharArray());
}
}
/**
* Password of the Couchbase user.
*/
@Importance(HIGH)
@EnvironmentVariable("KAFKA_COUCHBASE_PASSWORD")
Password password();
/**
* Password to verify the integrity of the trust store.
*/
@EnvironmentVariable("KAFKA_COUCHBASE_TRUST_STORE_PASSWORD")
@Default
Password trustStorePassword();
@Default("swordfish")
Password passwordValue();