org.apache.hadoop.fs.FSDataOutputStream#writeBytes ( )源码实例Demo

下面列出了org.apache.hadoop.fs.FSDataOutputStream#writeBytes ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: big-c   文件: TestOverReplicatedBlocks.java
/**
 * Test over replicated block should get invalidated when decreasing the
 * replication for a partial block.
 */
@Test
public void testInvalidateOverReplicatedBlock() throws Exception {
  Configuration conf = new HdfsConfiguration();
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
      .build();
  try {
    final FSNamesystem namesystem = cluster.getNamesystem();
    final BlockManager bm = namesystem.getBlockManager();
    FileSystem fs = cluster.getFileSystem();
    Path p = new Path(MiniDFSCluster.getBaseDirectory(), "/foo1");
    FSDataOutputStream out = fs.create(p, (short) 2);
    out.writeBytes("HDFS-3119: " + p);
    out.hsync();
    fs.setReplication(p, (short) 1);
    out.close();
    ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, p);
    assertEquals("Expected only one live replica for the block", 1, bm
        .countNodes(block.getLocalBlock()).liveReplicas());
  } finally {
    cluster.shutdown();
  }
}
 
源代码2 项目: kite   文件: TestCSVFileReader.java
@BeforeClass
public static void createCSVFiles() throws IOException {
  localfs = LocalFileSystem.getInstance();
  csvFile = new Path("target/temp.csv");
  reorderedFile = new Path("target/reordered.csv");
  tsvFile = new Path("target/temp.tsv");
  validatorFile = new Path("target/validator.csv");

  FSDataOutputStream out = localfs.create(csvFile, true);
  out.writeBytes(CSV_CONTENT);
  out.close();

  out = localfs.create(reorderedFile, true);
  out.writeBytes(REORDERED_CSV_CONTENT);
  out.close();

  out = localfs.create(validatorFile, true);
  out.writeBytes(VALIDATOR_CSV_CONTENT);
  out.close();

  out = localfs.create(tsvFile, true);
  out.writeBytes(TSV_CONTENT);
  out.close();
}
 
源代码3 项目: RDFS   文件: TestHftpFileSystem.java
/**
 * Tests isUnderConstruction() functionality.
 */
public void testIsUnderConstruction() throws Exception {
  // Open output file stream.
  FSDataOutputStream out = hdfs.create(TEST_FILE, true);
  out.writeBytes("test");
  
  // Test file under construction.
  FSDataInputStream in1 = hftpFs.open(TEST_FILE);
  assertTrue(in1.isUnderConstruction());
  in1.close();
  
  // Close output file stream.
  out.close();
  
  // Test file not under construction.
  FSDataInputStream in2 = hftpFs.open(TEST_FILE);
  assertFalse(in2.isUnderConstruction());
  in2.close();
}
 
源代码4 项目: hadoop   文件: NameNodeConnector.java
/**
 * The idea for making sure that there is no more than one instance
 * running in an HDFS is to create a file in the HDFS, writes the hostname
 * of the machine on which the instance is running to the file, but did not
 * close the file until it exits. 
 * 
 * This prevents the second instance from running because it can not
 * creates the file while the first one is running.
 * 
 * This method checks if there is any running instance. If no, mark yes.
 * Note that this is an atomic operation.
 * 
 * @return null if there is a running instance;
 *         otherwise, the output stream to the newly created file.
 */
private OutputStream checkAndMarkRunning() throws IOException {
  try {
    if (fs.exists(idPath)) {
      // try appending to it so that it will fail fast if another balancer is
      // running.
      IOUtils.closeStream(fs.append(idPath));
      fs.delete(idPath, true);
    }
    final FSDataOutputStream fsout = fs.create(idPath, false);
    // mark balancer idPath to be deleted during filesystem closure
    fs.deleteOnExit(idPath);
    if (write2IdFile) {
      fsout.writeBytes(InetAddress.getLocalHost().getHostName());
      fsout.hflush();
    }
    return fsout;
  } catch(RemoteException e) {
    if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
      return null;
    } else {
      throw e;
    }
  }
}
 
源代码5 项目: big-c   文件: TestDecommission.java
private void writeConfigFile(Path name, List<String> nodes) 
  throws IOException {
  // delete if it already exists
  if (localFileSys.exists(name)) {
    localFileSys.delete(name, true);
  }

  FSDataOutputStream stm = localFileSys.create(name);
  
  if (nodes != null) {
    for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
      String node = it.next();
      stm.writeBytes(node);
      stm.writeBytes("\n");
    }
  }
  stm.close();
}
 
源代码6 项目: RDFS   文件: TestHftpFileSystem.java
/**
 * Scenario: Read an under construction file using hftp.
 * 
 * Expected: Hftp should be able to read the latest byte after the file
 * has been hdfsSynced (but not yet closed).
 * 
 * @throws IOException
 */
public void testConcurrentRead() throws IOException {
  // Write a test file.
  FSDataOutputStream out = hdfs.create(TEST_FILE, true);
  out.writeBytes("123");
  out.sync();  // sync but not close
  
  // Try read using hftp.
  FSDataInputStream in = hftpFs.open(TEST_FILE);
  assertEquals('1', in.read());
  assertEquals('2', in.read());
  assertEquals('3', in.read());
  in.close();
  
  // Try seek and read.
  in = hftpFs.open(TEST_FILE);
  in.seek(2);
  assertEquals('3', in.read());
  in.close();
  
  out.close();
}
 
源代码7 项目: hadoop   文件: TestDistributedFileSystem.java
@Test(timeout=60000)
public void testFileCloseStatus() throws IOException {
  Configuration conf = new HdfsConfiguration();
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
  DistributedFileSystem fs = cluster.getFileSystem();
  try {
    // create a new file.
    Path file = new Path("/simpleFlush.dat");
    FSDataOutputStream output = fs.create(file);
    // write to file
    output.writeBytes("Some test data");
    output.flush();
    assertFalse("File status should be open", fs.isFileClosed(file));
    output.close();
    assertTrue("File status should be closed", fs.isFileClosed(file));
  } finally {
    if (cluster != null) {
      cluster.shutdown();
    }
  }
}
 
源代码8 项目: spliceengine   文件: SpliceTableAdmin.java
private static void printErrorMessages(FSDataOutputStream out, Map<String, List<String>> errors) throws IOException {

        for(Map.Entry<String, List<String>> entry : errors.entrySet()) {
            String index = entry.getKey();
            List<String> messages = entry.getValue();

            out.writeBytes(index + ":\n");
            for (String message : messages) {
                out.writeBytes("\t" + message + "\n");
            }
        }
    }
 
源代码9 项目: kite   文件: JobClasspathHelper.java
/**
 * This method creates an file that contains a line with a MD5 sum
 * 
 * @param fs
 *            FileSystem where to create the file.
 * @param md5sum
 *            The string containing the MD5 sum.
 * @param remoteMd5Path
 *            The path where to save the file.
 * @throws IOException
 */
private void createMd5SumFile(FileSystem fs, String md5sum, Path remoteMd5Path) throws IOException {
  FSDataOutputStream os = null;
  try {
    os = fs.create(remoteMd5Path, true);
    os.writeBytes(md5sum);
    os.flush();
  } catch (Exception e) {
    LOG.error("{}", e);
  } finally {
    if (os != null) {
      os.close();
    }
  }
}
 
public void testCorruptHfileBucketFail() throws Exception {
  int port = AvailablePortHelper.getRandomAvailableTCPPort();
  MiniDFSCluster cluster = initMiniCluster(port ,1);
  
  hsf.setHomeDir("Store-1");
  hsf.setNameNodeURL("hdfs://127.0.0.1:" + port);
  HDFSStoreImpl store1 = (HDFSStoreImpl) hsf.create("Store-1");
  
  // create a corrupt file
  FileSystem fs = store1.getFileSystem();
  for (int i = 0; i < 113; i++) {
    FSDataOutputStream opStream = fs.create(new Path("Store-1/region-1/" + i + "/1-1-1.hop"));
    opStream.writeBytes("Some random corrupt file");
    opStream.close();
  }
    
  // create region with store
  regionfactory.setHDFSStoreName(store1.getName());
  Region<Object, Object> region1 = regionfactory.create("region-1");
  ExpectedException ex = TestUtils.addExpectedException("CorruptHFileException");
  try {
    region1.get("key");
    fail("get should have failed with corrupt file error");
  } catch (HDFSIOException e) {
    // expected
  } finally {
    ex.remove();
  }
  
  region1.destroyRegion();
  store1.destroy();
  cluster.shutdown();
  FileUtils.deleteDirectory(new File("hdfs-test-cluster"));
}
 
源代码11 项目: hadoop   文件: TestSpeculativeExecution.java
private Path createTempFile(String filename, String contents)
    throws IOException {
  Path path = new Path(TEST_ROOT_DIR, filename);
  FSDataOutputStream os = localFs.create(path);
  os.writeBytes(contents);
  os.close();
  localFs.setPermission(path, new FsPermission("700"));
  return path;
}
 
源代码12 项目: big-c   文件: TestPipelines.java
/**
 * Creates and closes a file of certain length.
 * Calls append to allow next write() operation to add to the end of it
 * After write() invocation, calls hflush() to make sure that data sunk through
 * the pipeline and check the state of the last block's replica.
 * It supposes to be in RBW state
 *
 * @throws IOException in case of an error
 */
@Test
public void pipeline_01() throws IOException {
  final String METHOD_NAME = GenericTestUtils.getMethodName();
  if(LOG.isDebugEnabled()) {
    LOG.debug("Running " + METHOD_NAME);
  }
  Path filePath = new Path("/" + METHOD_NAME + ".dat");

  DFSTestUtil.createFile(fs, filePath, FILE_SIZE, REPL_FACTOR, rand.nextLong());
  if(LOG.isDebugEnabled()) {
    LOG.debug("Invoking append but doing nothing otherwise...");
  }
  FSDataOutputStream ofs = fs.append(filePath);
  ofs.writeBytes("Some more stuff to write");
  ((DFSOutputStream) ofs.getWrappedStream()).hflush();

  List<LocatedBlock> lb = cluster.getNameNodeRpc().getBlockLocations(
    filePath.toString(), FILE_SIZE - 1, FILE_SIZE).getLocatedBlocks();

  String bpid = cluster.getNamesystem().getBlockPoolId();
  for (DataNode dn : cluster.getDataNodes()) {
    Replica r = DataNodeTestUtils.fetchReplicaInfo(dn, bpid, lb.get(0)
        .getBlock().getBlockId());

    assertTrue("Replica on DN " + dn + " shouldn't be null", r != null);
    assertEquals("Should be RBW replica on " + dn
        + " after sequence of calls append()/write()/hflush()",
        HdfsServerConstants.ReplicaState.RBW, r.getState());
  }
  ofs.close();
}
 
源代码13 项目: emr-sample-apps   文件: CopyFromS3.java
/**
 * This method constructs the JobConf to be used to run the map reduce job to
 * download the files from S3. This is a potentially expensive method since it
 * makes multiple calls to S3 to get a listing of all the input data. Clients
 * are encouraged to cache the returned JobConf reference and not call this
 * method multiple times unless necessary.
 * 
 * @return the JobConf to be used to run the map reduce job to download the
 *         files from S3.
 */
public JobConf getJobConf() throws IOException, ParseException {
  JobConf conf = new JobConf(CopyFromS3.class);
  conf.setJobName("CopyFromS3");
  conf.setOutputKeyClass(NullWritable.class);
  conf.setOutputValueClass(Text.class);
  conf.setMapperClass(S3CopyMapper.class);
  // We configure a reducer, even though we don't use it right now.
  // The idea is that, in the future we may. 
  conf.setReducerClass(HDFSWriterReducer.class);
  conf.setNumReduceTasks(0);

  FileInputFormat.setInputPaths(conf, new Path(tempFile));
  FileOutputFormat.setOutputPath(conf, new Path(outputPath));
  conf.setOutputFormat(TextOutputFormat.class);
  conf.setCompressMapOutput(true);

  JobClient jobClient = new JobClient(conf);

  FileSystem inputFS = FileSystem.get(URI.create(inputPathPrefix), conf);
  DatePathFilter datePathFilter = new DatePathFilter(startDate, endDate);
  List<Path> filePaths = getFilePaths(inputFS, new Path(inputPathPrefix), datePathFilter, jobClient.getDefaultMaps());

  // Write the file names to a temporary index file to be used
  // as input to the map tasks.
  FileSystem outputFS = FileSystem.get(URI.create(tempFile), conf);
  FSDataOutputStream outputStream = outputFS.create(new Path(tempFile), true);
  try {
    for (Path path : filePaths) {
      outputStream.writeBytes(path.toString() + "\n");
    }
  }
  finally {
    outputStream.close();
  }

  conf.setNumMapTasks(Math.min(filePaths.size(), jobClient.getDefaultMaps()));

  return conf;
}
 
源代码14 项目: big-c   文件: TestUserResolve.java
/**
 * Creates users file with the content as the String usersFileContent.
 * @param usersFilePath    the path to the file that is to be created
 * @param usersFileContent Content of users file
 * @throws IOException
 */
private static void writeUserList(Path usersFilePath, String usersFileContent)
    throws IOException {

  FSDataOutputStream out = null;
  try {
    out = fs.create(usersFilePath, true);
    out.writeBytes(usersFileContent);
  } finally {
    if (out != null) {
      out.close();
    }
  }
}
 
源代码15 项目: recsys-offline   文件: HdfsHelper.java
public static void writeLine(String path, String data) {
	FSDataOutputStream dos = null;
	try {
		Path dst = new Path(path);
		hdfs.createNewFile(dst);
		dos = hdfs.append(new Path(path));
		dos.writeBytes(data + "\r\n");
		dos.close();
		log.info("write hdfs " + path + " successed. ");
	} catch (Exception e) {
		e.printStackTrace();
		log.error("write hdfs " + path + " failed. ", e);
	}
}
 
/** @throws Exception If failed. */
@Test
public void testRenameIfSrcPathIsAlreadyBeingOpenedToWrite() throws Exception {
    Path fsHome = new Path(primaryFsUri);
    Path srcFile = new Path(fsHome, "srcFile");
    Path dstFile = new Path(fsHome, "dstFile");

    FSDataOutputStream os = fs.create(srcFile, EnumSet.noneOf(CreateFlag.class),
        Options.CreateOpts.perms(FsPermission.getDefault()));

    os.close();

    os = fs.create(srcFile, EnumSet.of(CreateFlag.APPEND),
        Options.CreateOpts.perms(FsPermission.getDefault()));

    fs.rename(srcFile, dstFile);

    assertPathExists(fs, dstFile);

    String testStr = "Test";

    try {
        os.writeBytes(testStr);
    }
    finally {
        os.close();
    }

    try (FSDataInputStream is = fs.open(dstFile)) {
        byte[] buf = new byte[testStr.getBytes().length];

        is.readFully(buf);

        assertEquals(testStr, new String(buf));
    }
}
 
源代码17 项目: big-c   文件: TestFSImage.java
private void testPersistHelper(Configuration conf) throws IOException {
  MiniDFSCluster cluster = null;
  try {
    cluster = new MiniDFSCluster.Builder(conf).build();
    cluster.waitActive();
    FSNamesystem fsn = cluster.getNamesystem();
    DistributedFileSystem fs = cluster.getFileSystem();

    final Path dir = new Path("/abc/def");
    final Path file1 = new Path(dir, "f1");
    final Path file2 = new Path(dir, "f2");

    // create an empty file f1
    fs.create(file1).close();

    // create an under-construction file f2
    FSDataOutputStream out = fs.create(file2);
    out.writeBytes("hello");
    ((DFSOutputStream) out.getWrappedStream()).hsync(EnumSet
        .of(SyncFlag.UPDATE_LENGTH));

    // checkpoint
    fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
    fs.saveNamespace();
    fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);

    cluster.restartNameNode();
    cluster.waitActive();
    fs = cluster.getFileSystem();

    assertTrue(fs.isDirectory(dir));
    assertTrue(fs.exists(file1));
    assertTrue(fs.exists(file2));

    // check internals of file2
    INodeFile file2Node = fsn.dir.getINode4Write(file2.toString()).asFile();
    assertEquals("hello".length(), file2Node.computeFileSize());
    assertTrue(file2Node.isUnderConstruction());
    BlockInfoContiguous[] blks = file2Node.getBlocks();
    assertEquals(1, blks.length);
    assertEquals(BlockUCState.UNDER_CONSTRUCTION, blks[0].getBlockUCState());
    // check lease manager
    Lease lease = fsn.leaseManager.getLeaseByPath(file2.toString());
    Assert.assertNotNull(lease);
  } finally {
    if (cluster != null) {
      cluster.shutdown();
    }
  }
}
 
源代码18 项目: hadoop   文件: TestStorageMover.java
/**
 * Move an open file into archival storage
 */
@Test
public void testMigrateOpenFileToArchival() throws Exception {
  LOG.info("testMigrateOpenFileToArchival");
  final Path fooDir = new Path("/foo");
  Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
  policyMap.put(fooDir, COLD);
  NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(fooDir), null,
      BLOCK_SIZE, null, policyMap);
  ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
      NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
  MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
  test.setupCluster();

  // create an open file
  banner("writing to file /foo/bar");
  final Path barFile = new Path(fooDir, "bar");
  DFSTestUtil.createFile(test.dfs, barFile, BLOCK_SIZE, (short) 1, 0L);
  FSDataOutputStream out = test.dfs.append(barFile);
  out.writeBytes("hello, ");
  ((DFSOutputStream) out.getWrappedStream()).hsync();

  try {
    banner("start data migration");
    test.setStoragePolicy(); // set /foo to COLD
    test.migrate();

    // make sure the under construction block has not been migrated
    LocatedBlocks lbs = test.dfs.getClient().getLocatedBlocks(
        barFile.toString(), BLOCK_SIZE);
    LOG.info("Locations: " + lbs);
    List<LocatedBlock> blks = lbs.getLocatedBlocks();
    Assert.assertEquals(1, blks.size());
    Assert.assertEquals(1, blks.get(0).getLocations().length);

    banner("finish the migration, continue writing");
    // make sure the writing can continue
    out.writeBytes("world!");
    ((DFSOutputStream) out.getWrappedStream()).hsync();
    IOUtils.cleanup(LOG, out);

    lbs = test.dfs.getClient().getLocatedBlocks(
        barFile.toString(), BLOCK_SIZE);
    LOG.info("Locations: " + lbs);
    blks = lbs.getLocatedBlocks();
    Assert.assertEquals(1, blks.size());
    Assert.assertEquals(1, blks.get(0).getLocations().length);

    banner("finish writing, starting reading");
    // check the content of /foo/bar
    FSDataInputStream in = test.dfs.open(barFile);
    byte[] buf = new byte[13];
    // read from offset 1024
    in.readFully(BLOCK_SIZE, buf, 0, buf.length);
    IOUtils.cleanup(LOG, in);
    Assert.assertEquals("hello, world!", new String(buf));
  } finally {
    test.shutdownCluster();
  }
}
 
源代码19 项目: RDFS   文件: TestJobHistoryVersion.java
/**
 * Creates a job history file of a given specific version. This method should
 * change if format/content of future versions of job history file changes.
 */
private void writeHistoryFile(FSDataOutputStream out, long version)
throws IOException {
  String delim = "\n"; // '\n' for version 0
  String counters = COUNTERS;
  String jobConf = "job.xml";
  if (version > 0) { // line delimeter should be '.' for later versions
    // Change the delimiter
    delim = DELIM + delim;
    
    // Write the version line
    out.writeBytes(RecordTypes.Meta.name() + " VERSION=\"" 
                   + JobHistory.VERSION + "\" " + delim);
    jobConf = JobHistory.escapeString(jobConf);
    counters = JobHistory.escapeString(counters);
  }
  
  // Write the job-start line
  
  
  out.writeBytes("Job JOBID=\"" + JOB + "\" JOBNAME=\"" + JOBNAME + "\"" 
                 + " USER=\"" + USER + "\" SUBMIT_TIME=\"" + TIME + "\"" 
                 + " JOBCONF=\"" + jobConf + "\" " + delim);
  
  // Write the job-launch line
  out.writeBytes("Job JOBID=\"" + JOB + "\" LAUNCH_TIME=\"" + TIME + "\"" 
                 + " TOTAL_MAPS=\"1\" TOTAL_REDUCES=\"0\" " + delim);
  
  // Write the task start line
  out.writeBytes("Task TASKID=\"" + TASK_ID + "\" TASK_TYPE=\"MAP\"" 
                 + " START_TIME=\"" + TIME + "\" SPLITS=\"\"" 
                 + " TOTAL_MAPS=\"1\" TOTAL_REDUCES=\"0\" " + delim);
  
  // Write the attempt start line
  out.writeBytes("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"" + TASK_ID + "\"" 
                 + " TASK_ATTEMPT_ID=\"" + TASK_ATTEMPT_ID + "\"" 
                 + " START_TIME=\"" + TIME + "\"" 
                 + " HOSTNAME=\"" + HOSTNAME + "\" " + delim);
  
  // Write the attempt finish line
  out.writeBytes("MapAttempt TASK_TYPE=\"MAP\" TASKID=\"" + TASK_ID + "\"" 
                 + " TASK_ATTEMPT_ID=\"" + TASK_ATTEMPT_ID + "\"" 
                 + " FINISH_TIME=\"" + TIME + "\""
                 + " TASK_STATUS=\"SUCCESS\""
                 + " HOSTNAME=\"" + HOSTNAME + "\" " + delim);
  
  // Write the task finish line
  out.writeBytes("Task TASKID=\"" + TASK_ID + "\" TASK_TYPE=\"MAP\""
                 + " TASK_STATUS=\"SUCCESS\""
                 + " FINISH_TIME=\"" + TIME + "\""
                 + " COUNTERS=\"" + counters + "\" " + delim);
  
  // Write the job-finish line
  out.writeBytes("Job JOBID=\"" + JOB + "\" FINISH_TIME=\"" + TIME + "\"" 
                 + " TOTAL_MAPS=\"1\" TOTAL_REDUCES=\"0\""
                 + " JOB_STATUS=\"SUCCESS\" FINISHED_MAPS=\"1\""
                 + " FINISHED_REDUCES=\"0\" FAILED_MAPS=\"0\""
                 + " FAILED_REDUCES=\"0\""
                 + " COUNTERS=\"" + counters + "\" " + delim);
  
}
 
源代码20 项目: hadoop   文件: TestBalancer.java
/**
 * Test running many balancer simultaneously.
 *
 * Case-1: First balancer is running. Now, running second one should get
 * "Another balancer is running. Exiting.." IOException and fail immediately
 *
 * Case-2: When running second balancer 'balancer.id' file exists but the
 * lease doesn't exists. Now, the second balancer should run successfully.
 */
@Test(timeout = 100000)
public void testManyBalancerSimultaneously() throws Exception {
  final Configuration conf = new HdfsConfiguration();
  initConf(conf);
  // add an empty node with half of the capacities(4 * CAPACITY) & the same
  // rack
  long[] capacities = new long[] { 4 * CAPACITY };
  String[] racks = new String[] { RACK0 };
  long newCapacity = 2 * CAPACITY;
  String newRack = RACK0;
  LOG.info("capacities = " + long2String(capacities));
  LOG.info("racks      = " + Arrays.asList(racks));
  LOG.info("newCapacity= " + newCapacity);
  LOG.info("newRack    = " + newRack);
  LOG.info("useTool    = " + false);
  assertEquals(capacities.length, racks.length);
  int numOfDatanodes = capacities.length;
  cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
      .racks(racks).simulatedCapacities(capacities).build();
  try {
    cluster.waitActive();
    client = NameNodeProxies.createProxy(conf,
        cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();

    long totalCapacity = sum(capacities);

    // fill up the cluster to be 30% full
    final long totalUsedSpace = totalCapacity * 3 / 10;
    createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
        (short) numOfDatanodes, 0);
    // start up an empty node with the same capacity and on the same rack
    cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
        new long[] { newCapacity });

    // Case1: Simulate first balancer by creating 'balancer.id' file. It
    // will keep this file until the balancing operation is completed.
    FileSystem fs = cluster.getFileSystem(0);
    final FSDataOutputStream out = fs
        .create(Balancer.BALANCER_ID_PATH, false);
    out.writeBytes(InetAddress.getLocalHost().getHostName());
    out.hflush();
    assertTrue("'balancer.id' file doesn't exist!",
        fs.exists(Balancer.BALANCER_ID_PATH));

    // start second balancer
    final String[] args = { "-policy", "datanode" };
    final Tool tool = new Cli();
    tool.setConf(conf);
    int exitCode = tool.run(args); // start balancing
    assertEquals("Exit status code mismatches",
        ExitStatus.IO_EXCEPTION.getExitCode(), exitCode);

    // Case2: Release lease so that another balancer would be able to
    // perform balancing.
    out.close();
    assertTrue("'balancer.id' file doesn't exist!",
        fs.exists(Balancer.BALANCER_ID_PATH));
    exitCode = tool.run(args); // start balancing
    assertEquals("Exit status code mismatches",
        ExitStatus.SUCCESS.getExitCode(), exitCode);
  } finally {
    cluster.shutdown();
  }
}