org.apache.hadoop.hbase.zookeeper.ZKUtil#setData ( )源码实例Demo

下面列出了org.apache.hadoop.hbase.zookeeper.ZKUtil#setData ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hbase   文件: MirroringTableStateManager.java
private void updateZooKeeper(TableState tableState) throws IOException {
  if (tableState == null) {
    return;
  }
  String znode = ZNodePaths.joinZNode(this.master.getZooKeeper().getZNodePaths().tableZNode,
    tableState.getTableName().getNameAsString());
  try {
    // Make sure znode exists.
    if (ZKUtil.checkExists(this.master.getZooKeeper(), znode) == -1) {
      ZKUtil.createAndFailSilent(this.master.getZooKeeper(), znode);
    }
    // Now set newState
    ZooKeeperProtos.DeprecatedTableState.Builder builder =
      ZooKeeperProtos.DeprecatedTableState.newBuilder();
    builder.setState(
      ZooKeeperProtos.DeprecatedTableState.State.valueOf(tableState.getState().toString()));
    byte[] data = ProtobufUtil.prependPBMagic(builder.build().toByteArray());
    ZKUtil.setData(this.master.getZooKeeper(), znode, data);
  } catch (KeeperException e) {
    // Only hbase1 clients suffer if this fails.
    LOG.warn("Failed setting table state to zookeeper mirrored for hbase-1.x clients", e);
  }
}
 
源代码2 项目: hbase   文件: TestSplitLogManager.java
@Test
public void testTaskDone() throws Exception {
  LOG.info("TestTaskDone - cleanup task node once in DONE state");

  slm = new SplitLogManager(master, conf);
  TaskBatch batch = new TaskBatch();
  String tasknode = submitTaskAndWait(batch, "foo/1");
  final ServerName worker1 = ServerName.valueOf("worker1,1,1");
  SplitLogTask slt = new SplitLogTask.Done(worker1);
  ZKUtil.setData(zkw, tasknode, slt.toByteArray());
  synchronized (batch) {
    while (batch.installed != batch.done) {
      batch.wait();
    }
  }
  waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
  assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
}
 
源代码3 项目: hbase   文件: TestSplitLogManager.java
@Test
public void testTaskErr() throws Exception {
  LOG.info("TestTaskErr - cleanup task node once in ERR state");

  conf.setInt("hbase.splitlog.max.resubmit", 0);
  slm = new SplitLogManager(master, conf);
  TaskBatch batch = new TaskBatch();

  String tasknode = submitTaskAndWait(batch, "foo/1");
  final ServerName worker1 = ServerName.valueOf("worker1,1,1");
  SplitLogTask slt = new SplitLogTask.Err(worker1);
  ZKUtil.setData(zkw, tasknode, slt.toByteArray());

  synchronized (batch) {
    while (batch.installed != batch.error) {
      batch.wait();
    }
  }
  waitForCounter(tot_mgr_task_deleted, 0, 1, to/2);
  assertTrue(ZKUtil.checkExists(zkw, tasknode) == -1);
  conf.setInt("hbase.splitlog.max.resubmit", ZKSplitLogManagerCoordination.DEFAULT_MAX_RESUBMIT);
}
 
源代码4 项目: hbase   文件: TestSplitLogManager.java
@Test
public void testTaskResigned() throws Exception {
  LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
  assertEquals(0, tot_mgr_resubmit.sum());
  slm = new SplitLogManager(master, conf);
  assertEquals(0, tot_mgr_resubmit.sum());
  TaskBatch batch = new TaskBatch();
  String tasknode = submitTaskAndWait(batch, "foo/1");
  assertEquals(0, tot_mgr_resubmit.sum());
  final ServerName worker1 = ServerName.valueOf("worker1,1,1");
  assertEquals(0, tot_mgr_resubmit.sum());
  SplitLogTask slt = new SplitLogTask.Resigned(worker1);
  assertEquals(0, tot_mgr_resubmit.sum());
  ZKUtil.setData(zkw, tasknode, slt.toByteArray());
  ZKUtil.checkExists(zkw, tasknode);
  // Could be small race here.
  if (tot_mgr_resubmit.sum() == 0) {
    waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
  }
  assertEquals(1, tot_mgr_resubmit.sum());

  byte[] taskstate = ZKUtil.getData(zkw, tasknode);
  slt = SplitLogTask.parseFrom(taskstate);
  assertTrue(slt.isUnassigned(master.getServerName()));
}
 
源代码5 项目: hbase   文件: TestSplitLogManager.java
@Test
public void testWorkerCrash() throws Exception {
  slm = new SplitLogManager(master, conf);
  TaskBatch batch = new TaskBatch();

  String tasknode = submitTaskAndWait(batch, "foo/1");
  final ServerName worker1 = ServerName.valueOf("worker1,1,1");

  SplitLogTask slt = new SplitLogTask.Owned(worker1);
  ZKUtil.setData(zkw, tasknode, slt.toByteArray());
  if (tot_mgr_heartbeat.sum() == 0) {
    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
  }

  // Not yet resubmitted.
  Assert.assertEquals(0, tot_mgr_resubmit.sum());

  // This server becomes dead
  Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);

  Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).

  // It has been resubmitted
  Assert.assertEquals(1, tot_mgr_resubmit.sum());
}
 
源代码6 项目: hbase   文件: ZKReplicationPeerStorage.java
@Override
public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
  byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
  try {
    ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes);
  } catch (KeeperException e) {
    throw new ReplicationException("Unable to change state of the peer with id=" + peerId, e);
  }
}
 
源代码7 项目: hbase   文件: ZKReplicationPeerStorage.java
@Override
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
    throws ReplicationException {
  try {
    ZKUtil.setData(this.zookeeper, getPeerNode(peerId),
      ReplicationPeerConfigUtil.toByteArray(peerConfig));
  } catch (KeeperException e) {
    throw new ReplicationException(
      "There was a problem trying to save changes to the " + "replication peer " + peerId, e);
  }
}
 
源代码8 项目: hbase   文件: TestZKReplicationQueueStorage.java
private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException {
  return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) {

    private int called = 0;
    private int getLastSeqIdOpIndex = 0;

    @Override
    protected int getQueuesZNodeCversion() throws KeeperException {
      if (called < 4) {
        called++;
      }
      return called;
    }

    @Override
    protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
        String peerId) throws KeeperException {
      Pair<Long, Integer> oldPair = super.getLastSequenceIdWithVersion(encodedRegionName, peerId);
      if (getLastSeqIdOpIndex < 100) {
        // Let the ZNode version increase.
        String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
        ZKUtil.createWithParents(zookeeper, path);
        ZKUtil.setData(zookeeper, path, ZKUtil.positionToByteArray(100L));
      }
      getLastSeqIdOpIndex++;
      return oldPair;
    }
  };
}
 
源代码9 项目: hbase   文件: SplitOrMergeTracker.java
/**
 * Set the switch on/off
 * @param enabled switch enabled or not?
 * @throws KeeperException keepException will be thrown out
 */
public void setSwitchEnabled(boolean enabled) throws KeeperException {
  byte [] upData = toByteArray(enabled);
  try {
    ZKUtil.setData(watcher, node, upData);
  } catch(KeeperException.NoNodeException nne) {
    ZKUtil.createAndWatch(watcher, node, upData);
  }
  super.nodeDataChanged(node);
}
 
源代码10 项目: hbase   文件: TestSplitLogManager.java
@Test
public void testMultipleResubmits() throws Exception {
  LOG.info("TestMultipleResbmits - no indefinite resubmissions");
  conf.setInt("hbase.splitlog.max.resubmit", 2);
  slm = new SplitLogManager(master, conf);
  TaskBatch batch = new TaskBatch();

  String tasknode = submitTaskAndWait(batch, "foo/1");
  int version = ZKUtil.checkExists(zkw, tasknode);
  final ServerName worker1 = ServerName.valueOf("worker1,1,1");
  final ServerName worker2 = ServerName.valueOf("worker2,1,1");
  final ServerName worker3 = ServerName.valueOf("worker3,1,1");
  SplitLogTask slt = new SplitLogTask.Owned(worker1);
  ZKUtil.setData(zkw, tasknode, slt.toByteArray());
  waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
  waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);
  int version1 = ZKUtil.checkExists(zkw, tasknode);
  assertTrue(version1 > version);
  slt = new SplitLogTask.Owned(worker2);
  ZKUtil.setData(zkw, tasknode, slt.toByteArray());
  waitForCounter(tot_mgr_heartbeat, 1, 2, to/2);
  waitForCounter(tot_mgr_resubmit, 1, 2, to + to/2);
  int version2 = ZKUtil.checkExists(zkw, tasknode);
  assertTrue(version2 > version1);
  slt = new SplitLogTask.Owned(worker3);
  ZKUtil.setData(zkw, tasknode, slt.toByteArray());
  waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
  waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
  Thread.sleep(to + to/2);
  assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum());
}
 
源代码11 项目: hbase   文件: TestSplitLogManager.java
@Test
public void testRescanCleanup() throws Exception {
  LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");

  slm = new SplitLogManager(master, conf);
  TaskBatch batch = new TaskBatch();

  String tasknode = submitTaskAndWait(batch, "foo/1");
  int version = ZKUtil.checkExists(zkw, tasknode);
  final ServerName worker1 = ServerName.valueOf("worker1,1,1");
  SplitLogTask slt = new SplitLogTask.Owned(worker1);
  ZKUtil.setData(zkw, tasknode, slt.toByteArray());
  waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
  waitForCounter(new Expr() {
    @Override
    public long eval() {
      return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum());
    }
  }, 0, 1, 5*60000); // wait long enough
  Assert.assertEquals("Could not run test. Lost ZK connection?",
    0, tot_mgr_resubmit_failed.sum());
  int version1 = ZKUtil.checkExists(zkw, tasknode);
  assertTrue(version1 > version);
  byte[] taskstate = ZKUtil.getData(zkw, tasknode);
  slt = SplitLogTask.parseFrom(taskstate);
  assertTrue(slt.isUnassigned(master.getServerName()));

  waitForCounter(tot_mgr_rescan_deleted, 0, 1, to/2);
}
 
源代码12 项目: hbase   文件: TestSplitLogManager.java
@Test
public void testUnassignedTimeout() throws Exception {
  LOG.info("TestUnassignedTimeout - iff all tasks are unassigned then" +
      " resubmit");

  // create an orphan task in OWNED state
  String tasknode1 = ZKSplitLog.getEncodedNodeName(zkw, "orphan/1");
  final ServerName worker1 = ServerName.valueOf("worker1,1,1");
  SplitLogTask slt = new SplitLogTask.Owned(worker1);
  zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
      CreateMode.PERSISTENT);

  slm = new SplitLogManager(master, conf);
  waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);

  // submit another task which will stay in unassigned mode
  TaskBatch batch = new TaskBatch();
  submitTaskAndWait(batch, "foo/1");

  // keep updating the orphan owned node every to/2 seconds
  for (int i = 0; i < (3 * to)/100; i++) {
    Thread.sleep(100);
    final ServerName worker2 = ServerName.valueOf("worker1,1,1");
    slt = new SplitLogTask.Owned(worker2);
    ZKUtil.setData(zkw, tasknode1, slt.toByteArray());
  }

  // since we have stopped heartbeating the owned node therefore it should
  // get resubmitted
  LOG.info("waiting for manager to resubmit the orphan task");
  waitForCounter(tot_mgr_resubmit, 0, 1, to + to/2);

  // now all the nodes are unassigned. manager should post another rescan
  waitForCounter(tot_mgr_resubmit_unassigned, 0, 1, 2 * to + to/2);
}
 
源代码13 项目: hbase   文件: TestSplitLogManager.java
@Test
public void testDeadWorker() throws Exception {
  LOG.info("testDeadWorker");

  conf.setLong("hbase.splitlog.max.resubmit", 0);
  slm = new SplitLogManager(master, conf);
  TaskBatch batch = new TaskBatch();

  String tasknode = submitTaskAndWait(batch, "foo/1");
  int version = ZKUtil.checkExists(zkw, tasknode);
  final ServerName worker1 = ServerName.valueOf("worker1,1,1");
  SplitLogTask slt = new SplitLogTask.Owned(worker1);
  ZKUtil.setData(zkw, tasknode, slt.toByteArray());
  if (tot_mgr_heartbeat.sum() == 0) {
    waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
  }
  slm.handleDeadWorker(worker1);
  if (tot_mgr_resubmit.sum() == 0) {
    waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
  }
  if (tot_mgr_resubmit_dead_server_task.sum() == 0) {
    waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
  }

  int version1 = ZKUtil.checkExists(zkw, tasknode);
  assertTrue(version1 > version);
  byte[] taskstate = ZKUtil.getData(zkw, tasknode);
  slt = SplitLogTask.parseFrom(taskstate);
  assertTrue(slt.isUnassigned(master.getServerName()));
  return;
}
 
源代码14 项目: hbase   文件: TestSplitLogWorker.java
@Test
public void testPreemptTask() throws Exception {
  LOG.info("testPreemptTask");
  SplitLogCounters.resetCounters();
  final ServerName SRV = ServerName.valueOf("tpt_svr,1,1");
  final String PATH = ZKSplitLog.getEncodedNodeName(zkw, "tpt_task");
  RegionServerServices mockedRS = getRegionServer(SRV);
  SplitLogWorker slw =
      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
  slw.start();
  try {
    Thread.yield(); // let the worker start
    Thread.sleep(1000);
    waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);

    // this time create a task node after starting the splitLogWorker
    zkw.getRecoverableZooKeeper().create(PATH,
      new SplitLogTask.Unassigned(MANAGER).toByteArray(),
      Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
    assertEquals(1, slw.getTaskReadySeq());
    byte [] bytes = ZKUtil.getData(zkw, PATH);
    SplitLogTask slt = SplitLogTask.parseFrom(bytes);
    assertTrue(slt.isOwned(SRV));
    slt = new SplitLogTask.Owned(MANAGER);
    ZKUtil.setData(zkw, PATH, slt.toByteArray());
    waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);
  } finally {
    stopSplitLogWorker(slw);
  }
}
 
源代码15 项目: hbase   文件: TestReplicationSourceManager.java
protected static void setupZkAndReplication() throws Exception {
  // The implementing class should set up the conf
  assertNotNull(conf);
  zkw = new ZKWatcher(conf, "test", null);
  ZKUtil.createWithParents(zkw, "/hbase/replication");
  ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
  ZKUtil.setData(zkw, "/hbase/replication/peers/1",
      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
          + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
  ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
  ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
    ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
  ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state");
  ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state",
    ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES);
  ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/new-sync-rep-state");
  ZKUtil.setData(zkw, "/hbase/replication/peers/1/new-sync-rep-state",
    ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES);
  ZKUtil.createWithParents(zkw, "/hbase/replication/state");
  ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);

  ZKClusterId.setClusterId(zkw, new ClusterId());
  CommonFSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
  fs = FileSystem.get(conf);
  oldLogDir = utility.getDataTestDir(HConstants.HREGION_OLDLOGDIR_NAME);
  logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME);
  remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME);
  replication = new Replication();
  replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
  managerOfCluster = getManagerFromCluster();
  if (managerOfCluster != null) {
    // After replication procedure, we need to add peer by hand (other than by receiving
    // notification from zk)
    managerOfCluster.addPeer(slaveId);
  }

  manager = replication.getReplicationManager();
  manager.addSource(slaveId);
  if (managerOfCluster != null) {
    waitPeer(slaveId, managerOfCluster, true);
  }
  waitPeer(slaveId, manager, true);

  htd = TableDescriptorBuilder.newBuilder(test)
    .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1)
      .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
    .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f2)).build();

  scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
  for(byte[] fam : htd.getColumnFamilyNames()) {
    scopes.put(fam, 0);
  }
  hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(r1).setEndKey(r2).build();
}
 
源代码16 项目: hbase   文件: TestSplitLogWorker.java
@Test
public void testMultipleTasks() throws Exception {
  LOG.info("testMultipleTasks");
  SplitLogCounters.resetCounters();
  final ServerName SRV = ServerName.valueOf("tmt_svr,1,1");
  final String PATH1 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task");
  RegionServerServices mockedRS = getRegionServer(SRV);
  SplitLogWorker slw =
      new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
  slw.start();
  try {
    Thread.yield(); // let the worker start
    Thread.sleep(100);
    waitForCounter(SplitLogCounters.tot_wkr_task_grabing, 0, 1, WAIT_TIME);

    SplitLogTask unassignedManager =
      new SplitLogTask.Unassigned(MANAGER);
    zkw.getRecoverableZooKeeper().create(PATH1, unassignedManager.toByteArray(),
      Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
    // now the worker is busy doing the above task

    // create another task
    final String PATH2 = ZKSplitLog.getEncodedNodeName(zkw, "tmt_task_2");
    zkw.getRecoverableZooKeeper().create(PATH2, unassignedManager.toByteArray(),
      Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

    // preempt the first task, have it owned by another worker
    final ServerName anotherWorker = ServerName.valueOf("another-worker,1,1");
    SplitLogTask slt = new SplitLogTask.Owned(anotherWorker);
    ZKUtil.setData(zkw, PATH1, slt.toByteArray());
    waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);

    waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
    assertEquals(2, slw.getTaskReadySeq());
    byte [] bytes = ZKUtil.getData(zkw, PATH2);
    slt = SplitLogTask.parseFrom(bytes);
    assertTrue(slt.isOwned(SRV));
  } finally {
    stopSplitLogWorker(slw);
  }
}
 
源代码17 项目: hbase   文件: TestSplitLogWorker.java
@Test
public void testRescan() throws Exception {
  LOG.info("testRescan");
  SplitLogCounters.resetCounters();
  final ServerName SRV = ServerName.valueOf("svr,1,1");
  RegionServerServices mockedRS = getRegionServer(SRV);
  slw = new SplitLogWorker(ds, TEST_UTIL.getConfiguration(), mockedRS, neverEndingTask);
  slw.start();
  Thread.yield(); // let the worker start
  Thread.sleep(100);

  String task = ZKSplitLog.getEncodedNodeName(zkw, "task");
  SplitLogTask slt = new SplitLogTask.Unassigned(MANAGER);
  zkw.getRecoverableZooKeeper().create(task,slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
    CreateMode.PERSISTENT);

  waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 1, WAIT_TIME);
  // now the worker is busy doing the above task

  // preempt the task, have it owned by another worker
  ZKUtil.setData(zkw, task, slt.toByteArray());
  waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 0, 1, WAIT_TIME);

  // create a RESCAN node
  String rescan = ZKSplitLog.getEncodedNodeName(zkw, "RESCAN");
  rescan = zkw.getRecoverableZooKeeper().create(rescan, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
    CreateMode.PERSISTENT_SEQUENTIAL);

  waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 1, 2, WAIT_TIME);
  // RESCAN node might not have been processed if the worker became busy
  // with the above task. preempt the task again so that now the RESCAN
  // node is processed
  ZKUtil.setData(zkw, task, slt.toByteArray());
  waitForCounter(SplitLogCounters.tot_wkr_preempt_task, 1, 2, WAIT_TIME);
  waitForCounter(SplitLogCounters.tot_wkr_task_acquired_rescan, 0, 1, WAIT_TIME);

  List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().splitLogZNode);
  LOG.debug(Objects.toString(nodes));
  int num = 0;
  for (String node : nodes) {
    num++;
    if (node.startsWith("RESCAN")) {
      String name = ZKSplitLog.getEncodedNodeName(zkw, node);
      String fn = ZKSplitLog.getFileName(name);
      byte [] data = ZKUtil.getData(zkw,
              ZNodePaths.joinZNode(zkw.getZNodePaths().splitLogZNode, fn));
      slt = SplitLogTask.parseFrom(data);
      assertTrue(slt.toString(), slt.isDone(SRV));
    }
  }
  assertEquals(2, num);
}