下面列出了org.apache.hadoop.hbase.mapreduce.TableMapper#org.apache.zookeeper.ZooKeeper 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static String[] getSortedTree(ZooKeeper zk, String path) throws Exception {
if (zk.exists(path, false) == null) {
return new String[0];
}
List<String> dealList = new ArrayList<String>();
dealList.add(path);
int index = 0;
while (index < dealList.size()) {
String tempPath = dealList.get(index);
List<String> children = zk.getChildren(tempPath, false);
if (tempPath.equalsIgnoreCase("/") == false) {
tempPath = tempPath + "/";
}
Collections.sort(children);
for (int i = children.size() - 1; i >= 0; i--) {
dealList.add(index + 1, tempPath + children.get(i));
}
index++;
}
return (String[]) dealList.toArray(new String[0]);
}
/**
* 广度搜索法:搜索分布式配置对应的两层数据
*
* @return
*
* @throws InterruptedException
* @throws KeeperException
*/
private Map<String, ZkDisconfData> getDisconfData(String path) throws KeeperException, InterruptedException {
Map<String, ZkDisconfData> ret = new HashMap<String, ZkDisconfData>();
ZookeeperMgr zooKeeperMgr = ZookeeperMgr.getInstance();
ZooKeeper zooKeeper = zooKeeperMgr.getZk();
if (zooKeeper.exists(path, false) == null) {
return ret;
}
List<String> children = zooKeeper.getChildren(path, false);
for (String firstKey : children) {
ZkDisconfData zkDisconfData = getDisconfData(path, firstKey, zooKeeper);
if (zkDisconfData != null) {
ret.put(firstKey, zkDisconfData);
}
}
return ret;
}
@Override
@Synchronized
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception {
// prevent creating a new client, stick to the same client created earlier
// this trick prevents curator from re-creating ZK client on session expiry
if (client == null) {
Exceptions.checkNotNullOrEmpty(connectString, "connectString");
Preconditions.checkArgument(sessionTimeout > 0, "sessionTimeout should be a positive integer");
this.connectString = connectString;
this.sessionTimeout = sessionTimeout;
this.canBeReadOnly = canBeReadOnly;
this.client = new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
} else {
try {
Preconditions.checkArgument(this.connectString.equals(connectString), "connectString differs");
Preconditions.checkArgument(this.sessionTimeout == sessionTimeout, "sessionTimeout differs");
Preconditions.checkArgument(this.canBeReadOnly == canBeReadOnly, "canBeReadOnly differs");
this.client.register(watcher);
} catch (IllegalArgumentException e) {
log.warn("Input argument for new ZooKeeper client ({}, {}, {}) changed with respect to existing client ({}, {}, {}).",
connectString, sessionTimeout, canBeReadOnly, this.connectString, this.sessionTimeout, this.canBeReadOnly);
closeClient(client);
}
}
return this.client;
}
public static void checkParent(ZooKeeper zk, String path) throws Exception {
String[] list = path.split("/");
String zkPath = "";
for (int i =0;i< list.length -1;i++){
String str = list[i];
if (str.equals("") == false) {
zkPath = zkPath + "/" + str;
if (zk.exists(zkPath, false) != null) {
byte[] value = zk.getData(zkPath, false, null);
if(value != null){
String tmpVersion = new String(value);
if(tmpVersion.indexOf("taobao-pamirs-schedule-") >=0){
throw new Exception("\"" + zkPath +"\" is already a schedule instance's root directory, its any subdirectory cannot as the root directory of others");
}
}
}
}
}
}
private Boolean isZookeeperRunning(String host, int timeout) {
final CountDownLatch connectedSignal = new CountDownLatch(1);
try {
new ZooKeeper(host, timeout,
new Watcher() {
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
});
return connectedSignal.await(timeout,TimeUnit.MILLISECONDS);
} catch (Throwable e) {
return Boolean.FALSE;
}
}
public void backup(OutputStream os) throws InterruptedException, IOException, KeeperException {
JsonGenerator jgen = null;
ZooKeeper zk = null;
try {
zk = options.createZooKeeper(LOGGER);
jgen = JSON_FACTORY.createGenerator(os);
if (options.prettyPrint) {
jgen.setPrettyPrinter(new DefaultPrettyPrinter());
}
jgen.writeStartObject();
if (zk.exists(options.rootPath, false) == null) {
LOGGER.warn("Root path not found: {}", options.rootPath);
} else {
doBackup(zk, jgen, options.rootPath);
}
jgen.writeEndObject();
} finally {
if (jgen != null) {
jgen.close();
}
if (zk != null) {
zk.close();
}
}
}
@Test
public void testBasic() throws Exception
{
ZooKeeper client = mock(ZooKeeper.class, Mockito.RETURNS_MOCKS);
CuratorZookeeperClient curator = mock(CuratorZookeeperClient.class);
RetryPolicy retryPolicy = new RetryOneTime(1);
RetryLoop retryLoop = new RetryLoop(retryPolicy, null);
when(curator.getZooKeeper()).thenReturn(client);
when(curator.getRetryPolicy()).thenReturn(retryPolicy);
when(curator.newRetryLoop()).thenReturn(retryLoop);
Stat fakeStat = mock(Stat.class);
when(client.exists(Mockito.<String>any(), anyBoolean())).thenReturn(fakeStat);
EnsurePath ensurePath = new EnsurePath("/one/two/three");
ensurePath.ensure(curator);
verify(client, times(3)).exists(Mockito.<String>any(), anyBoolean());
ensurePath.ensure(curator);
verifyNoMoreInteractions(client);
ensurePath.ensure(curator);
verifyNoMoreInteractions(client);
}
public ZKTestEnv(Path path) throws Exception {
zkServer = new TestingServer(1282, path.toFile(), true);
// waiting for ZK to be reachable
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper(zkServer.getConnectString(),
herddb.server.ServerConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT_DEFAULT, (WatchedEvent event) -> {
LOG.log(Level.INFO, "ZK EVENT {0}", event);
if (event.getState() == KeeperState.SyncConnected) {
latch.countDown();
}
});
try {
if (!latch.await(herddb.server.ServerConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT_DEFAULT, TimeUnit.MILLISECONDS)) {
LOG.log(Level.INFO, "ZK client did not connect withing {0} seconds, maybe the server did not start up",
herddb.server.ServerConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT_DEFAULT);
}
} finally {
zk.close(1000);
}
this.path = path;
}
private void watchNode(final ZooKeeper zk) {
try {
List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
watchNode(zk);
}
}
});
List<String> dataList = new ArrayList<>();
for (String node : nodeList) {
byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, false, null);
dataList.add(new String(bytes));
}
Logger.info("node data: {}", dataList);
this.dataList = dataList;
} catch (KeeperException | InterruptedException e) {
Logger.error("", e);
}
}
private void getDataInputStream(String currentZnode, List<InputStream> seqIsCollector, boolean regressionZnodes)
throws KeeperException, InterruptedException {
ZooKeeper zk = getZk();
seqIsCollector.add(new ByteArrayInputStream(zk.getData(currentZnode, true, null)));
// need add return between every stream otherwise top/last line will be
// join to one line.
seqIsCollector.add(new ByteArrayInputStream("\n".getBytes()));
if (regressionZnodes) {
List<String> children = zk.getChildren(currentZnode, true);
if (children != null) {
for (String child : children) {
String childZnode = currentZnode + "/" + child;
getDataInputStream(childZnode, seqIsCollector, regressionZnodes);
}
}
}
}
/**
* Create an empty normal (persistent) Znode. If the znode already exists, do nothing.
*
* @param zookeeper ZooKeeper instance to work with.
* @param znode Znode to create.
* @throws KeeperException
* @throws InterruptedException
*/
static void createIfNotThere(ZooKeeper zookeeper, String znode) throws KeeperException, InterruptedException {
try {
create(zookeeper, znode);
} catch (KeeperException e) {
if (e.code() != KeeperException.Code.NODEEXISTS) {
// Rethrow all exceptions, except "node exists",
// because if the node exists, this method reached its goal.
throw e;
}
}
}
private void setData2ZkDevBack() throws FileNotFoundException, IOException, KeeperException, InterruptedException {
log.info("set data to zk dev back.");
ZookeeperResource zkResource = getZkResource();
ZooKeeper zk = zkResource.getZk();
String fileName = "/zk_cnfig_cn_test.txt";
String fileContent = getFileContent(fileName);
zk.setData("/cn_dev", fileContent.getBytes(), -1);
}
public LoadReportWatcher(final String path, final ZooKeeper zkClient, final ShellArguments arguments) {
this.path = path;
this.zkClient = zkClient;
this.arguments = arguments;
// Get initial topics and set this up as a watch by calling process.
process(null);
}
@Test
public void createOrSetNodeTest() throws Exception {
ZooKeeper zooKeeper = createZookeeper();
try {
String message = createTestMessage();
String testNodePath = createTestNodePath() + "/temp";
ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(testNodePath);
String path = pathAndNode.getPath();
String node = pathAndNode.getNode();
try {
curatorZookeeperClient.createOrSetNode(new CreateNodeMessage(testNodePath, message.getBytes()));
Assert.fail();
} catch (Exception e) {
}
boolean existNode = isExistNode(zooKeeper, path);
Assert.assertFalse(existNode);
existNode = isExistNode(zooKeeper, testNodePath);
Assert.assertFalse(existNode);
curatorZookeeperClient.createOrSetNode(new CreateNodeMessage(testNodePath, message.getBytes(), true));
existNode = isExistNode(zooKeeper, testNodePath);
Assert.assertTrue(existNode);
curatorZookeeperClient.delete(testNodePath);
} finally {
if (zooKeeper != null) {
zooKeeper.close();
}
}
}
@Override
public Supplier<ZooKeeper> getZooKeeperSupplier() {
return new Supplier<ZooKeeper>() {
@Override
public ZooKeeper get() {
return getZooKeeper();
}
};
}
public StateWatchingZooKeeper(String connectString, int sessionTimeout, int startupTimeOut) throws IOException {
this.requestedSessionTimeout = sessionTimeout;
this.sessionTimeout = sessionTimeout;
ZooKeeper zk = new ZooKeeper(connectString, sessionTimeout, new MyWatcher());
setDelegate(zk);
ready = true;
// Wait for connection to come up: if we fail to connect to ZK now, we do not want to continue
// starting up the Indexer node.
long waitUntil = System.currentTimeMillis() + startupTimeOut;
int count = 0;
while (zk.getState() != CONNECTED && waitUntil > System.currentTimeMillis()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
count++;
if (count == 30) {
// Output a message every 3s
log.info("Waiting for ZooKeeper connection to be established");
count = 0;
}
}
if (zk.getState() != CONNECTED) {
stopping = true;
try {
zk.close();
} catch (Throwable t) {
// ignore
}
throw new IOException("Failed to connect with Zookeeper within timeout " + startupTimeOut +
", connection string: " + connectString);
}
log.info("ZooKeeper session ID is 0x" + Long.toHexString(zk.getSessionId()));
}
@VisibleForTesting
static CompletableFuture<List<String>> getMissingPaths(ZooKeeper zk, String basePath, String logStreamPath) {
LinkedList<String> missingPaths = Lists.newLinkedList();
CompletableFuture<List<String>> future = FutureUtils.createFuture();
existPath(zk, logStreamPath, basePath, missingPaths, future);
return future;
}
@Test
public void testSessionExpired() throws Exception {
try (ZookeeperMetadataStorageManager man = new ZookeeperMetadataStorageManager(testEnv.getAddress(),
testEnv.getTimeout(), testEnv.getPath())) {
man.start();
TableSpace tableSpace = TableSpace
.builder()
.leader("test")
.replica("test")
.name(TableSpace.DEFAULT)
.build();
man.registerTableSpace(tableSpace);
assertEquals(1, man.listTableSpaces().size());
ZooKeeper actual = man.getZooKeeper();
long sessionId = actual.getSessionId();
byte[] passwd = actual.getSessionPasswd();
expireZkSession(sessionId, passwd);
for (int i = 0; i < 10; i++) {
try {
man.listTableSpaces();
fail("session should be expired or not connected");
} catch (MetadataStorageManagerException ok) {
System.out.println("ok: " + ok);
assertTrue(ok.getCause() instanceof KeeperException.ConnectionLossException
|| ok.getCause() instanceof KeeperException.SessionExpiredException);
if (ok.getCause() instanceof KeeperException.SessionExpiredException) {
break;
}
}
Thread.sleep(500);
}
assertNotSame(actual, man.getZooKeeper());
assertEquals(1, man.listTableSpaces().size());
assertNotNull(tableSpace = man.describeTableSpace(TableSpace.DEFAULT));
man.dropTableSpace(TableSpace.DEFAULT, tableSpace);
}
}
/**
* @param spi Spi instance.
*/
private static void closeZkClient(ZookeeperDiscoverySpi spi) {
ZooKeeper zk = ZookeeperDiscoverySpiTestHelper.zkClient(spi);
try {
zk.close();
}
catch (Exception e) {
fail("Unexpected error: " + e);
}
}
public void configMutliCluster(ZooKeeper zk) {
if (_serversList.size() == 1) {
return;
}
String cluster1 = _serversList.get(0);
try {
if (_serversList.size() > 1) {
// 强制的声明accessible
ReflectionUtils.makeAccessible(clientCnxnField);
ReflectionUtils.makeAccessible(hostProviderField);
ReflectionUtils.makeAccessible(serverAddressesField);
// 添加第二组集群列表
for (int i = 1; i < _serversList.size(); i++) {
String cluster = _serversList.get(i);
// 强制获取zk中的地址信息
ClientCnxn cnxn = (ClientCnxn) ReflectionUtils.getField(clientCnxnField, zk);
HostProvider hostProvider = (HostProvider) ReflectionUtils.getField(hostProviderField, cnxn);
List<InetSocketAddress> serverAddrs = (List<InetSocketAddress>) ReflectionUtils.getField(serverAddressesField,
hostProvider);
// 添加第二组集群列表
serverAddrs.addAll(new ConnectStringParser(cluster).getServerAddresses());
}
}
} catch (Exception e) {
try {
if (zk != null) {
zk.close();
}
} catch (InterruptedException ie) {
// ignore interrupt
}
throw new ZkException("zookeeper_create_error, serveraddrs=" + cluster1, e);
}
}
/**
* This method should be used only for debug purposes. Return ZooKeeper client if clustering mode (ZooKeeper-based)
* is on.
*
* @return the ZooKeeper client exploited by this CacheServer is clustering mode is on, null otherwise
*/
ZooKeeper getZooKeeper() {
if (this.clusterManager != null) {
return this.clusterManager.getZooKeeper();
} else {
return null;
}
}
static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
String logRootPath,
boolean ownAllocator) {
// Note re. persistent lock state initialization: the read lock persistent state (path) is
// initialized here but only used in the read handler. The reason is its more convenient and
// less error prone to manage all stream structure in one place.
final String logRootParentPath = new File(logRootPath).getParent();
final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
final String lockPath = logRootPath + LOCK_PATH;
final String readLockPath = logRootPath + READ_LOCK_PATH;
final String versionPath = logRootPath + VERSION_PATH;
final String allocationPath = logRootPath + ALLOCATION_PATH;
int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
checkFutures.add(Utils.zkGetData(zk, versionPath, false));
checkFutures.add(Utils.zkGetData(zk, lockPath, false));
checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
if (ownAllocator) {
checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
}
return Future.collect(checkFutures);
}
@Override
public Class[] getHbaseDependencyClasses() {
return new Class[] {
HConstants.class, org.apache.hadoop.hbase.protobuf.generated.ClientProtos.class,
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.class, Put.class,
RpcServer.class, CompatibilityFactory.class, JobUtil.class, TableMapper.class, FastLongHistogram.class,
Snapshot.class, ZooKeeper.class, Channel.class, Message.class, UnsafeByteOperations.class, Lists.class,
Tracer.class, MetricRegistry.class, ArrayUtils.class, ObjectMapper.class, Versioned.class, JsonView.class,
ZKWatcher.class
};
}
private void createNode(ZooKeeper zk, String data) {
try {
byte[] bytes = data.getBytes();
Stat stat = zk.exists(Constant.ZK_REGISTRY_PATH, false);
if(stat==null) {
String path1 = zk.create(Constant.ZK_REGISTRY_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
LOGGER.debug("create zookeeper node ({} => {})", path, data);
} catch (KeeperException | InterruptedException e) {
LOGGER.error("", e);
}
}
static void createNamespaceIfAbsent(ZooKeeper configStoreZk, NamespaceName namespaceName, String cluster)
throws KeeperException, InterruptedException, IOException {
String namespacePath = POLICIES_ROOT + "/" +namespaceName.toString();
Policies policies;
Stat stat = configStoreZk.exists(namespacePath, false);
if (stat == null) {
policies = new Policies();
policies.bundles = getBundles(16);
policies.replication_clusters = Collections.singleton(cluster);
createZkNode(
configStoreZk,
namespacePath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} else {
byte[] content = configStoreZk.getData(namespacePath, false, null);
policies = ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class);
// Only update z-node if the list of clusters should be modified
if (!policies.replication_clusters.contains(cluster)) {
policies.replication_clusters.add(cluster);
configStoreZk.setData(namespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
stat.getVersion());
}
}
}
@Test
public void testZKCreationRW() throws Exception {
ZooKeeperClientFactory zkf = new ZookeeperClientFactoryImpl();
CompletableFuture<ZooKeeper> zkFuture = zkf.create("127.0.0.1:" + localZkS.getZookeeperPort(), SessionType.ReadWrite,
(int) ZOOKEEPER_SESSION_TIMEOUT_MILLIS);
localZkc = zkFuture.get(ZOOKEEPER_SESSION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
assertTrue(localZkc.getState().isConnected());
assertNotEquals(localZkc.getState(), States.CONNECTEDREADONLY);
localZkc.close();
}
private void rmr(ZooKeeper zooKeeper, String storagePath) throws KeeperException, InterruptedException {
Stat stat = zooKeeper.exists(storagePath, false);
if (stat == null) {
return;
}
List<String> children = zooKeeper.getChildren(storagePath, false);
for (String s : children) {
rmr(zooKeeper, storagePath + "/" + s);
}
zooKeeper.delete(storagePath, -1);
}
private void doBackup(ZooKeeper zk, JsonGenerator jgen, String path)
throws KeeperException, InterruptedException, IOException {
try {
final Stat stat = new Stat();
List<ACL> acls = nullToEmpty(zk.getACL(path, stat));
if (stat.getEphemeralOwner() != 0 && !options.backupEphemeral) {
LOGGER.debug("Skipping ephemeral node: {}", path);
return;
}
final Stat dataStat = new Stat();
byte[] data = zk.getData(path, false, dataStat);
for (int i = 0; stat.compareTo(dataStat) != 0 && i < options.numRetries; i++) {
LOGGER.warn("Retrying getACL / getData to read consistent state");
acls = zk.getACL(path, stat);
data = zk.getData(path, false, dataStat);
}
if (stat.compareTo(dataStat) != 0) {
throw new IllegalStateException("Unable to read consistent data for znode: " + path);
}
LOGGER.debug("Backing up node: {}", path);
dumpNode(jgen, path, stat, acls, data);
final List<String> childPaths = nullToEmpty(zk.getChildren(path, false, null));
Collections.sort(childPaths);
for (String childPath : childPaths) {
final String fullChildPath = createFullPath(path, childPath);
if (!this.options.isPathExcluded(LOGGER, fullChildPath)) {
if (this.options.isPathIncluded(LOGGER, fullChildPath)) {
doBackup(zk, jgen, fullChildPath);
}
}
}
} catch (NoNodeException e) {
LOGGER.warn("Node disappeared during backup: {}", path);
}
}
private static void rm(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
List<String> children = zk.getChildren(path, false);
for (String c : children) {
rm(zk, path + "/" + c);
}
zk.delete(path, -1);
}
private void closeZooKeeper(ZooKeeper zooKeeper) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
LOGGER.warn("could not close ZooKeeper client due to interrupt", e);
Thread.currentThread().interrupt();
}
}