下面列出了怎么用org.apache.kafka.common.network.Mode的API类实例代码及写法,或者点击链接到github查看源代码。
protected void setSecurityConfigs(Properties clientProps, String certAlias) {
SecurityProtocol protocol = securityProtocol();
if (protocol == SecurityProtocol.SSL) {
File trustStoreFile = trustStoreFile();
if (trustStoreFile == null) {
throw new AssertionError("ssl set but no trust store provided");
}
clientProps.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.name);
clientProps.setProperty(KafkaConfig.SslEndpointIdentificationAlgorithmProp(), "");
try {
clientProps.putAll(TestSslUtils.createSslConfig(true, true, Mode.CLIENT, trustStoreFile, certAlias));
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
private static Map<String, Object> createSslConfig(Mode mode, File keyStoreFile, Password password, Password keyPassword,
File trustStoreFile, Password trustStorePassword) {
Map<String, Object> sslConfigs = new HashMap<>();
sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
if (mode == Mode.SERVER || (mode == Mode.CLIENT && keyStoreFile != null)) {
sslConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath());
sslConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
sslConfigs.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
sslConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
sslConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
}
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath());
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
sslConfigs.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
List<String> enabledProtocols = new ArrayList<>();
enabledProtocols.add("TLSv1.2");
sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
return sslConfigs;
}
public KsqlSchemaRegistryClientFactory(final KsqlConfig config) {
this(config, () -> new RestService(config.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY)),
new SslFactory(Mode.CLIENT));
// Force config exception now:
config.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY);
}
private static SSLContext getTestSslContext() {
final SslFactory sslFactory = new SslFactory(Mode.CLIENT);
final Map<String, Object> configs = new KsqlConfig(Collections.emptyMap())
.valuesWithPrefixOverride(KsqlConfig.KSQL_SCHEMA_REGISTRY_PREFIX);
sslFactory.configure(configs);
return sslFactory.sslContext();
}
@Override
protected KafkaConfig getKafkaConfig(int brokerId) {
File trustStoreFile;
try {
trustStoreFile = File.createTempFile("SSLClusterTestHarness-truststore", ".jks");
} catch (IOException ioe) {
throw new RuntimeException("Unable to create temporary file for the truststore.");
}
final Option<File> trustStoreFileOption = Option.apply(trustStoreFile);
final Option<SecurityProtocol> sslInterBrokerSecurityProtocol = Option.apply(SecurityProtocol.SSL);
Properties props = TestUtils.createBrokerConfig(
brokerId, zkConnect, false, false, TestUtils.RandomPort(), sslInterBrokerSecurityProtocol,
trustStoreFileOption, EMPTY_SASL_PROPERTIES, false, false, TestUtils.RandomPort(),
true, TestUtils.RandomPort(), false, TestUtils.RandomPort(), Option.<String>empty(), 1, false,
1, (short) 1);
// setup client SSL. Needs to happen before the broker is initialized, because the client's cert
// needs to be added to the broker's trust store.
Map<String, Object> sslConfigs;
try {
this.clientSslConfigs = TestSslUtils.createSslConfig(true, true, Mode.CLIENT,
trustStoreFile, "client", "localhost");
} catch (Exception e) {
throw new RuntimeException(e);
}
injectProperties(props);
if (requireSSLClientAuth()) {
props.setProperty("ssl.client.auth", "required");
}
return KafkaConfig.fromProps(props);
}
public static Map<String, Object> createSslConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias, String host)
throws IOException, GeneralSecurityException {
Map<String, X509Certificate> certs = new HashMap<>();
File keyStoreFile = null;
Password password = mode == Mode.SERVER ? new Password("ServerPassword") : new Password("ClientPassword");
Password trustStorePassword = new Password("TrustStorePassword");
if (mode == Mode.CLIENT && useClientCert) {
keyStoreFile = File.createTempFile("clientKS", ".jks");
KeyPair cKP = generateKeyPair("RSA");
X509Certificate cCert = generateCertificate("CN=" + host + ", O=A client", cKP, 30, "SHA1withRSA");
createKeyStore(keyStoreFile.getPath(), password, "client", cKP.getPrivate(), cCert);
certs.put(certAlias, cCert);
keyStoreFile.deleteOnExit();
} else if (mode == Mode.SERVER) {
keyStoreFile = File.createTempFile("serverKS", ".jks");
KeyPair sKP = generateKeyPair("RSA");
X509Certificate sCert = generateCertificate("CN=" + host + ", O=A server", sKP, 30,
"SHA1withRSA");
createKeyStore(keyStoreFile.getPath(), password, password, "server", sKP.getPrivate(), sCert);
certs.put(certAlias, sCert);
keyStoreFile.deleteOnExit();
}
if (trustStore) {
createTrustStore(trustStoreFile.getPath(), trustStorePassword, certs);
trustStoreFile.deleteOnExit();
}
return createSslConfig(mode, keyStoreFile, password, password, trustStoreFile, trustStorePassword);
}
protected void setSecurityConfigs(Properties clientProps, String certAlias) {
SecurityProtocol protocol = securityProtocol();
if (protocol == SecurityProtocol.SSL) {
File trustStoreFile = trustStoreFile();
if (trustStoreFile == null) {
throw new AssertionError("ssl set but no trust store provided");
}
clientProps.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.name);
try {
clientProps.putAll(TestSslUtils.createSslConfig(true, true, Mode.CLIENT, trustStoreFile, certAlias));
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
/**
* @return Config properties.
*/
public Map<Object, Object> buildConfig() {
applyDefaults();
validate();
Map<Object, Object> props = new HashMap<>();
StringJoiner csvJoiner = new StringJoiner(",");
if (_plaintextPort >= 0) {
csvJoiner.add(SecurityProtocol.PLAINTEXT.name + "://localhost:" + _plaintextPort);
}
if (_sslPort >= 0) {
csvJoiner.add(SecurityProtocol.SSL.name + "://localhost:" + _sslPort);
}
props.put(KafkaConfig.BrokerIdProp(), Integer.toString(_nodeId));
props.put(KafkaConfig.ListenersProp(), csvJoiner.toString());
props.put(KafkaConfig.LogDirProp(), _logDirectory.getAbsolutePath());
props.put(KafkaConfig.ZkConnectProp(), _zkConnect);
props.put(KafkaConfig.ReplicaSocketTimeoutMsProp(), Long.toString(_socketTimeout));
props.put(KafkaConfig.ControllerSocketTimeoutMsProp(), Long.toString(_socketTimeout));
props.put(KafkaConfig.ControlledShutdownEnableProp(), Boolean.toString(_enableControlledShutdown));
props.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.toString(_enableDeleteTopic));
props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), Long.toString(_controlledShutdownRetryBackoff));
props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp(), Long.toString(_logCleanerDedupBufferSize));
props.put(KafkaConfig.LogCleanerEnableProp(), Boolean.toString(_enableLogCleaner));
props.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
props.put(KafkaConfig.SslEndpointIdentificationAlgorithmProp(), "");
if (_rack != null) {
props.put(KafkaConfig.RackProp(), _rack);
}
if (_trustStore != null || _sslPort > 0) {
try {
props.putAll(TestSslUtils.createSslConfig(false, true, Mode.SERVER, _trustStore, "server" + _nodeId));
// Switch interbroker to ssl
props.put(KafkaConfig.InterBrokerSecurityProtocolProp(), SecurityProtocol.SSL.name);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
return props;
}
public Map<Object, Object> buildConfig() {
applyDefaults();
validate();
Map<Object, Object> props = new HashMap<>();
StringJoiner csvJoiner = new StringJoiner(",");
if (plaintextPort >= 0) {
csvJoiner.add(SecurityProtocol.PLAINTEXT.name + "://localhost:" + plaintextPort);
}
if (sslPort >= 0) {
csvJoiner.add(SecurityProtocol.SSL.name + "://localhost:" + sslPort);
}
props.put("broker.id", Integer.toString(nodeId));
props.put("listeners", csvJoiner.toString());
props.put("log.dir", logDirectory.getAbsolutePath());
props.put("log.retention.ms", Long.toString(logRetentionMs));
props.put("zookeeper.connect", zkConnect);
props.put("replica.socket.timeout.ms", Long.toString(socketTimeout));
props.put("controller.socket.timeout.ms", Long.toString(socketTimeout));
props.put("controlled.shutdown.enable", Boolean.toString(enableControlledShutdown));
props.put("delete.topic.enable", Boolean.toString(enableDeleteTopic));
props.put("controlled.shutdown.retry.backoff.ms", Long.toString(controlledShutdownRetryBackoff));
props.put("log.cleaner.dedupe.buffer.size", Long.toString(logCleanerDedupBufferSize));
props.put("log.cleaner.enable", Boolean.toString(enableLogCleaner));
props.put("offsets.topic.replication.factor", "1");
props.put("offsets.topic.num.partitions", "2");
props.put("group.initial.rebalance.delay.ms", "100");
if (rack != null) {
props.put("broker.rack", rack);
}
if (trustStore != null || sslPort > 0) {
try {
props.putAll(TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStore, "server" + nodeId));
//switch interbroker to ssl
props.put("security.inter.broker.protocol", "SSL");
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
return props;
}
public static Map<String, Object> createSslConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias)
throws IOException, GeneralSecurityException {
return createSslConfig(useClientCert, trustStore, mode, trustStoreFile, certAlias, "localhost");
}