下面列出了怎么用org.apache.zookeeper.client.ConnectStringParser的API类实例代码及写法,或者点击链接到github查看源代码。
public CloudSolrClient createCloudSolrClient(String zookeeperConnectionString) {
NoOpResponseParser responseParser = new NoOpResponseParser();
responseParser.setWriterType("json");
ConnectStringParser parser = new ConnectStringParser(zookeeperConnectionString);
CloudSolrClient.Builder cloudBuilder = new CloudSolrClient.Builder(
parser.getServerAddresses().stream()
.map(address -> String.format(Locale.ROOT, "%s:%s", address.getHostString(), address.getPort()))
.collect(Collectors.toList()),
Optional.ofNullable(parser.getChrootPath()));
cloudBuilder.withConnectionTimeout(settings.getHttpConnectionTimeout())
.withSocketTimeout(settings.getHttpReadTimeout());
CloudSolrClient client = cloudBuilder.build();
client.setParser(responseParser);
client.connect();
return client;
}
/**
* if ZkConnectString contains namespace path at the end, but it does not exist we should fail
* @param zkConnect - connect string
* @param zkClient - zkClient object to talk to the ZK
*/
public static void validateZkNameSpace(String zkConnect, ZkClient zkClient) {
ConnectStringParser parser = new ConnectStringParser(zkConnect);
String path = parser.getChrootPath();
if (Strings.isNullOrEmpty(path)) {
return; // no namespace path
}
LOG.info("connectString = " + zkConnect + "; path =" + path);
// if namespace specified (path above) but "/" does not exists, we will fail
if (!zkClient.exists("/")) {
throw new SamzaException("Zookeeper namespace: " + path + " does not exist for zk at " + zkConnect);
}
}
protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) {
super(connection, monitorTargets, useRegExp,
sink, executor, treatFailureAsError, allowedFailures);
Configuration configuration = connection.getConfiguration();
znode =
configuration.get(ZOOKEEPER_ZNODE_PARENT,
DEFAULT_ZOOKEEPER_ZNODE_PARENT);
timeout = configuration
.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
ConnectStringParser parser =
new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
hosts = Lists.newArrayList();
for (InetSocketAddress server : parser.getServerAddresses()) {
hosts.add(server.toString());
}
if (allowedFailures > (hosts.size() - 1) / 2) {
LOG.warn(
"Confirm allowable number of failed ZooKeeper nodes, as quorum will "
+ "already be lost. Setting of {} failures is unexpected for {} ensemble size.",
allowedFailures, hosts.size());
}
}
public void configMutliCluster(ZooKeeper zk) {
if (_serversList.size() == 1) {
return;
}
String cluster1 = _serversList.get(0);
try {
if (_serversList.size() > 1) {
// 强制的声明accessible
ReflectionUtils.makeAccessible(clientCnxnField);
ReflectionUtils.makeAccessible(hostProviderField);
ReflectionUtils.makeAccessible(serverAddressesField);
// 添加第二组集群列表
for (int i = 1; i < _serversList.size(); i++) {
String cluster = _serversList.get(i);
// 强制获取zk中的地址信息
ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk);
HostProvider hostProvider = (HostProvider) ReflectionUtils.getField(hostProviderField, cnxn);
List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils.getField(serverAddressesField,
hostProvider);
// 添加第二组集群列表
serverAddrs.addAll(new ConnectStringParser(cluster).getServerAddresses());
}
}
} catch (Exception e) {
try {
if (zk != null) {
zk.close();
}
} catch (InterruptedException ie) {
// ignore interrupt
}
throw new ZkException("zookeeper_create_error, serveraddrs=" + cluster1, e);
}
}
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
final String connectionString = context.getProperty(CONNECTION_STRING).getValue();
try {
new ConnectStringParser(connectionString);
} catch (Exception e) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Invalid Connect String: " + connectionString).valid(false).build();
}
return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid Connect String").valid(true).build();
}
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
final String connectionString = context.getProperty(CONNECTION_STRING).getValue();
try {
new ConnectStringParser(connectionString);
} catch (Exception e) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Invalid Connect String: " + connectionString).valid(false).build();
}
return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid Connect String").valid(true).build();
}
public void configMutliCluster(ZooKeeper zk) {
if (_serversList.size() == 1) {
return;
}
String cluster1 = _serversList.get(0);
try {
if (_serversList.size() > 1) {
// 强制的声明accessible
ReflectionUtils.makeAccessible(clientCnxnField);
ReflectionUtils.makeAccessible(hostProviderField);
ReflectionUtils.makeAccessible(serverAddressesField);
// 添加第二组集群列表
for (int i = 1; i < _serversList.size(); i++) {
String cluster = _serversList.get(i);
// 强制获取zk中的地址信息
ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk);
HostProvider hostProvider = (HostProvider) ReflectionUtils.getField(hostProviderField, cnxn);
List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils.getField(serverAddressesField,
hostProvider);
// 添加第二组集群列表
serverAddrs.addAll(new ConnectStringParser(cluster).getServerAddresses());
}
}
} catch (Exception e) {
try {
if (zk != null) {
zk.close();
}
} catch (InterruptedException ie) {
// ignore interrupt
}
throw new ZkException("zookeeper_create_error, serveraddrs=" + cluster1, e);
}
}
@VisibleForTesting
ResolvingEnsembleProviderDelegate(String connectString, Resolver resolver) {
_resolver = resolver;
_connectStringParser = new ConnectStringParser(connectString);
}
@Override
public void setConnectionString(String connectString) {
_connectStringParser = new ConnectStringParser(connectString);
}