类org.apache.hadoop.fs.ChecksumFileSystem源码实例Demo

下面列出了怎么用org.apache.hadoop.fs.ChecksumFileSystem的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: RDFS   文件: TestFSInputChecker.java
/**
 * Tests read/seek/getPos/skipped opeation for input stream.
 */
private void testChecker(ChecksumFileSystem fileSys, boolean readCS)
throws Exception {
  Path file = new Path("try.dat");
  if( readCS ) {
    writeFile(fileSys, file);
  } else {
    writeFile(fileSys.getRawFileSystem(), file);
  }
  stm = fileSys.open(file);
  checkReadAndGetPos();
  checkSeek();
  checkSkip();
  //checkMark
  assertFalse(stm.markSupported());
  stm.close();
  cleanupFile(fileSys, file);
}
 
源代码2 项目: hadoop-gpu   文件: TestFSInputChecker.java
/**
 * Tests read/seek/getPos/skipped opeation for input stream.
 */
private void testChecker(ChecksumFileSystem fileSys, boolean readCS)
throws Exception {
  Path file = new Path("try.dat");
  if( readCS ) {
    writeFile(fileSys, file);
  } else {
    writeFile(fileSys.getRawFileSystem(), file);
  }
  stm = fileSys.open(file);
  checkReadAndGetPos();
  checkSeek();
  checkSkip();
  //checkMark
  assertFalse(stm.markSupported());
  stm.close();
  cleanupFile(fileSys, file);
}
 
源代码3 项目: incubator-iotdb   文件: HDFSInput.java
@Override
public synchronized int read(ByteBuffer dst) throws IOException {
  int res;
  if (fs instanceof ChecksumFileSystem) {
    byte[] bytes = new byte[dst.remaining()];
    res = fsDataInputStream.read(bytes);
    dst.put(bytes);
  } else {
    res = fsDataInputStream.read(dst);
  }
  return res;
}
 
源代码4 项目: hadoop   文件: TestSeekBug.java
private void smallReadSeek(FileSystem fileSys, Path name) throws IOException {
  if (fileSys instanceof ChecksumFileSystem) {
    fileSys = ((ChecksumFileSystem)fileSys).getRawFileSystem();
  }
  // Make the buffer size small to trigger code for HADOOP-922
  FSDataInputStream stmRaw = fileSys.open(name, 1);
  byte[] expected = new byte[ONEMB];
  Random rand = new Random(seed);
  rand.nextBytes(expected);
  
  // Issue a simple read first.
  byte[] actual = new byte[128];
  stmRaw.seek(100000);
  stmRaw.read(actual, 0, actual.length);
  checkAndEraseData(actual, 100000, expected, "First Small Read Test");

  // now do a small seek of 4 bytes, within the same block.
  int newpos1 = 100000 + 128 + 4;
  stmRaw.seek(newpos1);
  stmRaw.read(actual, 0, actual.length);
  checkAndEraseData(actual, newpos1, expected, "Small Seek Bug 1");

  // seek another 256 bytes this time
  int newpos2 = newpos1 + 256;
  stmRaw.seek(newpos2);
  stmRaw.read(actual, 0, actual.length);
  checkAndEraseData(actual, newpos2, expected, "Small Seek Bug 2");

  // all done
  stmRaw.close();
}
 
源代码5 项目: big-c   文件: TestSeekBug.java
private void smallReadSeek(FileSystem fileSys, Path name) throws IOException {
  if (fileSys instanceof ChecksumFileSystem) {
    fileSys = ((ChecksumFileSystem)fileSys).getRawFileSystem();
  }
  // Make the buffer size small to trigger code for HADOOP-922
  FSDataInputStream stmRaw = fileSys.open(name, 1);
  byte[] expected = new byte[ONEMB];
  Random rand = new Random(seed);
  rand.nextBytes(expected);
  
  // Issue a simple read first.
  byte[] actual = new byte[128];
  stmRaw.seek(100000);
  stmRaw.read(actual, 0, actual.length);
  checkAndEraseData(actual, 100000, expected, "First Small Read Test");

  // now do a small seek of 4 bytes, within the same block.
  int newpos1 = 100000 + 128 + 4;
  stmRaw.seek(newpos1);
  stmRaw.read(actual, 0, actual.length);
  checkAndEraseData(actual, newpos1, expected, "Small Seek Bug 1");

  // seek another 256 bytes this time
  int newpos2 = newpos1 + 256;
  stmRaw.seek(newpos2);
  stmRaw.read(actual, 0, actual.length);
  checkAndEraseData(actual, newpos2, expected, "Small Seek Bug 2");

  // all done
  stmRaw.close();
}
 
源代码6 项目: parquet-mr   文件: BaseCommand.java
private FSDataOutputStream create(String filename, boolean noChecksum, boolean overwrite)
  throws IOException {
  Path filePath = qualifiedPath(filename);
  // even though it was qualified using the default FS, it may not be in it
  FileSystem fs = filePath.getFileSystem(getConf());
  if (noChecksum && fs instanceof ChecksumFileSystem) {
    fs = ((ChecksumFileSystem) fs).getRawFileSystem();
  }
  return fs.create(filePath, overwrite);
}
 
源代码7 项目: RDFS   文件: TestSeekBug.java
private void smallReadSeek(FileSystem fileSys, Path name) throws IOException {
  if (fileSys instanceof ChecksumFileSystem) {
    fileSys = ((ChecksumFileSystem)fileSys).getRawFileSystem();
  }
  // Make the buffer size small to trigger code for HADOOP-922
  FSDataInputStream stmRaw = fileSys.open(name, 1);
  byte[] expected = new byte[ONEMB];
  Random rand = new Random(seed);
  rand.nextBytes(expected);
  
  // Issue a simple read first.
  byte[] actual = new byte[128];
  stmRaw.seek(100000);
  stmRaw.read(actual, 0, actual.length);
  checkAndEraseData(actual, 100000, expected, "First Small Read Test");

  // now do a small seek of 4 bytes, within the same block.
  int newpos1 = 100000 + 128 + 4;
  stmRaw.seek(newpos1);
  stmRaw.read(actual, 0, actual.length);
  checkAndEraseData(actual, newpos1, expected, "Small Seek Bug 1");

  // seek another 256 bytes this time
  int newpos2 = newpos1 + 256;
  stmRaw.seek(newpos2);
  stmRaw.read(actual, 0, actual.length);
  checkAndEraseData(actual, newpos2, expected, "Small Seek Bug 2");

  // all done
  stmRaw.close();
}
 
源代码8 项目: RDFS   文件: TestFSInputChecker.java
public void testFSInputChecker() throws Exception {
  Configuration conf = new Configuration();
  conf.setLong("dfs.block.size", BLOCK_SIZE);
  conf.setInt("io.bytes.per.checksum", BYTES_PER_SUM);
  conf.set("fs.hdfs.impl",
           "org.apache.hadoop.hdfs.ChecksumDistributedFileSystem");
  rand.nextBytes(expected);

  // test DFS
  MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  ChecksumFileSystem fileSys = (ChecksumFileSystem)cluster.getFileSystem();
  try {
    testChecker(fileSys, true);
    testChecker(fileSys, false);
    testSeekAndRead(fileSys);
  } finally {
    fileSys.close();
    cluster.shutdown();
  }
  
  
  // test Local FS
  fileSys = FileSystem.getLocal(conf);
  try {
    testChecker(fileSys, true);
    testChecker(fileSys, false);
    testFileCorruption((LocalFileSystem)fileSys);
    testSeekAndRead(fileSys);
  }finally {
    fileSys.close();
  }
}
 
源代码9 项目: RDFS   文件: TestFSInputChecker.java
private void testSeekAndRead(ChecksumFileSystem fileSys)
throws IOException {
  Path file = new Path("try.dat");
  writeFile(fileSys, file);
  stm = fileSys.open(file,
      fileSys.getConf().getInt("io.file.buffer.size", 4096));
  checkSeekAndRead();
  stm.close();
  cleanupFile(fileSys, file);
}
 
源代码10 项目: hadoop-gpu   文件: TestSeekBug.java
private void smallReadSeek(FileSystem fileSys, Path name) throws IOException {
  if (fileSys instanceof ChecksumFileSystem) {
    fileSys = ((ChecksumFileSystem)fileSys).getRawFileSystem();
  }
  // Make the buffer size small to trigger code for HADOOP-922
  FSDataInputStream stmRaw = fileSys.open(name, 1);
  byte[] expected = new byte[ONEMB];
  Random rand = new Random(seed);
  rand.nextBytes(expected);
  
  // Issue a simple read first.
  byte[] actual = new byte[128];
  stmRaw.seek(100000);
  stmRaw.read(actual, 0, actual.length);
  checkAndEraseData(actual, 100000, expected, "First Small Read Test");

  // now do a small seek of 4 bytes, within the same block.
  int newpos1 = 100000 + 128 + 4;
  stmRaw.seek(newpos1);
  stmRaw.read(actual, 0, actual.length);
  checkAndEraseData(actual, newpos1, expected, "Small Seek Bug 1");

  // seek another 256 bytes this time
  int newpos2 = newpos1 + 256;
  stmRaw.seek(newpos2);
  stmRaw.read(actual, 0, actual.length);
  checkAndEraseData(actual, newpos2, expected, "Small Seek Bug 2");

  // all done
  stmRaw.close();
}
 
源代码11 项目: hadoop-gpu   文件: TestFSInputChecker.java
public void testFSInputChecker() throws Exception {
  Configuration conf = new Configuration();
  conf.setLong("dfs.block.size", BLOCK_SIZE);
  conf.setInt("io.bytes.per.checksum", BYTES_PER_SUM);
  conf.set("fs.hdfs.impl",
           "org.apache.hadoop.hdfs.ChecksumDistributedFileSystem");
  rand.nextBytes(expected);

  // test DFS
  MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  ChecksumFileSystem fileSys = (ChecksumFileSystem)cluster.getFileSystem();
  try {
    testChecker(fileSys, true);
    testChecker(fileSys, false);
    testSeekAndRead(fileSys);
  } finally {
    fileSys.close();
    cluster.shutdown();
  }
  
  
  // test Local FS
  fileSys = FileSystem.getLocal(conf);
  try {
    testChecker(fileSys, true);
    testChecker(fileSys, false);
    testFileCorruption((LocalFileSystem)fileSys);
    testSeekAndRead(fileSys);
  }finally {
    fileSys.close();
  }
}
 
源代码12 项目: hadoop-gpu   文件: TestFSInputChecker.java
private void testSeekAndRead(ChecksumFileSystem fileSys)
throws IOException {
  Path file = new Path("try.dat");
  writeFile(fileSys, file);
  stm = fileSys.open(file,
      fileSys.getConf().getInt("io.file.buffer.size", 4096));
  checkSeekAndRead();
  stm.close();
  cleanupFile(fileSys, file);
}
 
源代码13 项目: kite   文件: BaseCommand.java
private FSDataOutputStream create(String filename, boolean noChecksum)
    throws IOException {
  Path filePath = qualifiedPath(filename);
  // even though it was qualified using the default FS, it may not be in it
  FileSystem fs = filePath.getFileSystem(getConf());
  if (noChecksum && fs instanceof ChecksumFileSystem) {
    fs = ((ChecksumFileSystem) fs).getRawFileSystem();
  }
  return fs.create(filePath, true /* overwrite */);
}
 
源代码14 项目: hadoop   文件: MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
  // sanity check
  if (inputs == null || inputs.isEmpty()) {
    LOG.info("No ondisk files to merge...");
    return;
  }
  
  long approxOutputSize = 0;
  int bytesPerSum = 
    jobConf.getInt("io.bytes.per.checksum", 512);
  
  LOG.info("OnDiskMerger: We have  " + inputs.size() + 
           " map outputs on disk. Triggering merge...");
  
  // 1. Prepare the list of files to be merged. 
  for (CompressAwarePath file : inputs) {
    approxOutputSize += localFS.getFileStatus(file).getLen();
  }

  // add the checksum length
  approxOutputSize += 
    ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);

  // 2. Start the on-disk merge process
  Path outputPath = 
    localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
        approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);

  FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
  Writer<K, V> writer = new Writer<K, V>(jobConf, out,
      (Class<K>) jobConf.getMapOutputKeyClass(),
      (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

  RawKeyValueIterator iter  = null;
  CompressAwarePath compressAwarePath;
  Path tmpDir = new Path(reduceId.toString());
  try {
    iter = Merger.merge(jobConf, rfs,
                        (Class<K>) jobConf.getMapOutputKeyClass(),
                        (Class<V>) jobConf.getMapOutputValueClass(),
                        codec, inputs.toArray(new Path[inputs.size()]), 
                        true, ioSortFactor, tmpDir, 
                        (RawComparator<K>) jobConf.getOutputKeyComparator(), 
                        reporter, spilledRecordsCounter, null, 
                        mergedMapOutputsCounter, null);

    Merger.writeFile(iter, writer, reporter, jobConf);
    writer.close();
    compressAwarePath = new CompressAwarePath(outputPath,
        writer.getRawLength(), writer.getCompressedLength());
  } catch (IOException e) {
    localFS.delete(outputPath, true);
    throw e;
  }

  closeOnDiskFile(compressAwarePath);

  LOG.info(reduceId +
      " Finished merging " + inputs.size() + 
      " map output files on disk of total-size " + 
      approxOutputSize + "." + 
      " Local output file is " + outputPath + " of size " +
      localFS.getFileStatus(outputPath).getLen());
}
 
源代码15 项目: hadoop   文件: BenchmarkThroughput.java
@Override
public int run(String[] args) throws IOException {
  // silence the minidfs cluster
  Log hadoopLog = LogFactory.getLog("org");
  if (hadoopLog instanceof Log4JLogger) {
    ((Log4JLogger) hadoopLog).getLogger().setLevel(Level.WARN);
  }
  int reps = 1;
  if (args.length == 1) {
    try {
      reps = Integer.parseInt(args[0]);
    } catch (NumberFormatException e) {
      printUsage();
      return -1;
    }
  } else if (args.length > 1) {
    printUsage();
    return -1;
  }
  Configuration conf = getConf();
  // the size of the file to write
  long SIZE = conf.getLong("dfsthroughput.file.size",
      10L * 1024 * 1024 * 1024);
  BUFFER_SIZE = conf.getInt("dfsthroughput.buffer.size", 4 * 1024);

  String localDir = conf.get("mapred.temp.dir");
  if (localDir == null) {
    localDir = conf.get("hadoop.tmp.dir");
    conf.set("mapred.temp.dir", localDir);
  }
  dir = new LocalDirAllocator("mapred.temp.dir");

  System.setProperty("test.build.data", localDir);
  System.out.println("Local = " + localDir);
  ChecksumFileSystem checkedLocal = FileSystem.getLocal(conf);
  FileSystem rawLocal = checkedLocal.getRawFileSystem();
  for(int i=0; i < reps; ++i) {
    writeAndReadLocalFile("local", conf, SIZE);
    writeAndReadFile(rawLocal, "raw", conf, SIZE);
    writeAndReadFile(checkedLocal, "checked", conf, SIZE);
  }
  MiniDFSCluster cluster = null;
  try {
    cluster = new MiniDFSCluster.Builder(conf)
                                .racks(new String[]{"/foo"}).build();
    cluster.waitActive();
    FileSystem dfs = cluster.getFileSystem();
    for(int i=0; i < reps; ++i) {
      writeAndReadFile(dfs, "dfs", conf, SIZE);
    }
  } finally {
    if (cluster != null) {
      cluster.shutdown();
      // clean up minidfs junk
      rawLocal.delete(new Path(localDir, "dfs"), true);
    }
  }
  return 0;
}
 
源代码16 项目: big-c   文件: MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
  // sanity check
  if (inputs == null || inputs.isEmpty()) {
    LOG.info("No ondisk files to merge...");
    return;
  }
  
  long approxOutputSize = 0;
  int bytesPerSum = 
    jobConf.getInt("io.bytes.per.checksum", 512);
  
  LOG.info("OnDiskMerger: We have  " + inputs.size() + 
           " map outputs on disk. Triggering merge...");
  
  // 1. Prepare the list of files to be merged. 
  for (CompressAwarePath file : inputs) {
    approxOutputSize += localFS.getFileStatus(file).getLen();
  }

  // add the checksum length
  approxOutputSize += 
    ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);

  // 2. Start the on-disk merge process
  Path outputPath = 
    localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
        approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);

  FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
  Writer<K, V> writer = new Writer<K, V>(jobConf, out,
      (Class<K>) jobConf.getMapOutputKeyClass(),
      (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

  RawKeyValueIterator iter  = null;
  CompressAwarePath compressAwarePath;
  Path tmpDir = new Path(reduceId.toString());
  try {
    iter = Merger.merge(jobConf, rfs,
                        (Class<K>) jobConf.getMapOutputKeyClass(),
                        (Class<V>) jobConf.getMapOutputValueClass(),
                        codec, inputs.toArray(new Path[inputs.size()]), 
                        true, ioSortFactor, tmpDir, 
                        (RawComparator<K>) jobConf.getOutputKeyComparator(), 
                        reporter, spilledRecordsCounter, null, 
                        mergedMapOutputsCounter, null);

    Merger.writeFile(iter, writer, reporter, jobConf);
    writer.close();
    compressAwarePath = new CompressAwarePath(outputPath,
        writer.getRawLength(), writer.getCompressedLength());
  } catch (IOException e) {
    localFS.delete(outputPath, true);
    throw e;
  }

  closeOnDiskFile(compressAwarePath);

  LOG.info(reduceId +
      " Finished merging " + inputs.size() + 
      " map output files on disk of total-size " + 
      approxOutputSize + "." + 
      " Local output file is " + outputPath + " of size " +
      localFS.getFileStatus(outputPath).getLen());
}
 
源代码17 项目: big-c   文件: BenchmarkThroughput.java
@Override
public int run(String[] args) throws IOException {
  // silence the minidfs cluster
  Log hadoopLog = LogFactory.getLog("org");
  if (hadoopLog instanceof Log4JLogger) {
    ((Log4JLogger) hadoopLog).getLogger().setLevel(Level.WARN);
  }
  int reps = 1;
  if (args.length == 1) {
    try {
      reps = Integer.parseInt(args[0]);
    } catch (NumberFormatException e) {
      printUsage();
      return -1;
    }
  } else if (args.length > 1) {
    printUsage();
    return -1;
  }
  Configuration conf = getConf();
  // the size of the file to write
  long SIZE = conf.getLong("dfsthroughput.file.size",
      10L * 1024 * 1024 * 1024);
  BUFFER_SIZE = conf.getInt("dfsthroughput.buffer.size", 4 * 1024);

  String localDir = conf.get("mapred.temp.dir");
  if (localDir == null) {
    localDir = conf.get("hadoop.tmp.dir");
    conf.set("mapred.temp.dir", localDir);
  }
  dir = new LocalDirAllocator("mapred.temp.dir");

  System.setProperty("test.build.data", localDir);
  System.out.println("Local = " + localDir);
  ChecksumFileSystem checkedLocal = FileSystem.getLocal(conf);
  FileSystem rawLocal = checkedLocal.getRawFileSystem();
  for(int i=0; i < reps; ++i) {
    writeAndReadLocalFile("local", conf, SIZE);
    writeAndReadFile(rawLocal, "raw", conf, SIZE);
    writeAndReadFile(checkedLocal, "checked", conf, SIZE);
  }
  MiniDFSCluster cluster = null;
  try {
    cluster = new MiniDFSCluster.Builder(conf)
                                .racks(new String[]{"/foo"}).build();
    cluster.waitActive();
    FileSystem dfs = cluster.getFileSystem();
    for(int i=0; i < reps; ++i) {
      writeAndReadFile(dfs, "dfs", conf, SIZE);
    }
  } finally {
    if (cluster != null) {
      cluster.shutdown();
      // clean up minidfs junk
      rawLocal.delete(new Path(localDir, "dfs"), true);
    }
  }
  return 0;
}
 
源代码18 项目: RDFS   文件: BenchmarkThroughput.java
public int run(String[] args) throws IOException {
  // silence the minidfs cluster
  Log hadoopLog = LogFactory.getLog("org");
  if (hadoopLog instanceof Log4JLogger) {
    ((Log4JLogger) hadoopLog).getLogger().setLevel(Level.WARN);
  }
  int reps = 1;
  if (args.length == 1) {
    try {
      reps = Integer.parseInt(args[0]);
    } catch (NumberFormatException e) {
      printUsage();
      return -1;
    }
  } else if (args.length > 1) {
    printUsage();
    return -1;
  }
  Configuration conf = getConf();
  // the size of the file to write
  long SIZE = conf.getLong("dfsthroughput.file.size",
      10L * 1024 * 1024 * 1024);
  BUFFER_SIZE = conf.getInt("dfsthroughput.buffer.size", 4 * 1024);

  String localDir = conf.get("mapred.temp.dir");
  dir = new LocalDirAllocator("mapred.temp.dir");

  System.setProperty("test.build.data", localDir);
  System.out.println("Local = " + localDir);
  ChecksumFileSystem checkedLocal = FileSystem.getLocal(conf);
  FileSystem rawLocal = checkedLocal.getRawFileSystem();
  for(int i=0; i < reps; ++i) {
    writeAndReadLocalFile("local", conf, SIZE);
    writeAndReadFile(rawLocal, "raw", conf, SIZE);
    writeAndReadFile(checkedLocal, "checked", conf, SIZE);
  }
  MiniDFSCluster cluster = null;
  try {
    cluster = new MiniDFSCluster(conf, 1, true, new String[]{"/foo"});
    cluster.waitActive();
    FileSystem dfs = cluster.getFileSystem();
    for(int i=0; i < reps; ++i) {
      writeAndReadFile(dfs, "dfs", conf, SIZE);
    }
  } finally {
    if (cluster != null) {
      cluster.shutdown();
      // clean up minidfs junk
      rawLocal.delete(new Path(localDir, "dfs"), true);
    }
  }
  return 0;
}
 
源代码19 项目: incubator-tez   文件: MergeManager.java
@Override
public void merge(List<Path> inputs) throws IOException {
  // sanity check
  if (inputs == null || inputs.isEmpty()) {
    LOG.info("No ondisk files to merge...");
    return;
  }
  numDiskToDiskMerges.increment(1);
  
  long approxOutputSize = 0;
  int bytesPerSum = 
    conf.getInt("io.bytes.per.checksum", 512);
  
  LOG.info("OnDiskMerger: We have  " + inputs.size() + 
           " map outputs on disk. Triggering merge...");
  
  // 1. Prepare the list of files to be merged. 
  for (Path file : inputs) {
    approxOutputSize += localFS.getFileStatus(file).getLen();
  }

  // add the checksum length
  approxOutputSize += 
    ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);

  // 2. Start the on-disk merge process
  Path outputPath = 
    localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
        approxOutputSize, conf).suffix(Constants.MERGED_OUTPUT_PREFIX);
  Writer writer = 
    new Writer(conf, rfs, outputPath, 
                    (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
                    (Class)ConfigUtils.getIntermediateInputValueClass(conf),
                    codec, null, null);
  TezRawKeyValueIterator iter  = null;
  Path tmpDir = new Path(inputContext.getUniqueIdentifier());
  try {
    iter = TezMerger.merge(conf, rfs,
                        (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
                        (Class)ConfigUtils.getIntermediateInputValueClass(conf),
                        codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize,
                        inputs.toArray(new Path[inputs.size()]), true, ioSortFactor, tmpDir, 
                        (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), 
                        nullProgressable, spilledRecordsCounter, null, 
                        mergedMapOutputsCounter, null);

    // TODO Maybe differentiate between data written because of Merges and
    // the finalMerge (i.e. final mem available may be different from
    // initial merge mem)
    TezMerger.writeFile(iter, writer, nullProgressable, TezJobConfig.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
    writer.close();
    additionalBytesWritten.increment(writer.getCompressedLength());
  } catch (IOException e) {
    localFS.delete(outputPath, true);
    throw e;
  }

  closeOnDiskFile(outputPath);

  LOG.info(inputContext.getUniqueIdentifier() +
      " Finished merging " + inputs.size() + 
      " map output files on disk of total-size " + 
      approxOutputSize + "." + 
      " Local output file is " + outputPath + " of size " +
      localFS.getFileStatus(outputPath).getLen());
}
 
源代码20 项目: hadoop-gpu   文件: BenchmarkThroughput.java
public int run(String[] args) throws IOException {
  // silence the minidfs cluster
  Log hadoopLog = LogFactory.getLog("org");
  if (hadoopLog instanceof Log4JLogger) {
    ((Log4JLogger) hadoopLog).getLogger().setLevel(Level.WARN);
  }
  int reps = 1;
  if (args.length == 1) {
    try {
      reps = Integer.parseInt(args[0]);
    } catch (NumberFormatException e) {
      printUsage();
      return -1;
    }
  } else if (args.length > 1) {
    printUsage();
    return -1;
  }
  Configuration conf = getConf();
  // the size of the file to write
  long SIZE = conf.getLong("dfsthroughput.file.size",
      10L * 1024 * 1024 * 1024);
  BUFFER_SIZE = conf.getInt("dfsthroughput.buffer.size", 4 * 1024);

  String localDir = conf.get("mapred.temp.dir");
  dir = new LocalDirAllocator("mapred.temp.dir");

  System.setProperty("test.build.data", localDir);
  System.out.println("Local = " + localDir);
  ChecksumFileSystem checkedLocal = FileSystem.getLocal(conf);
  FileSystem rawLocal = checkedLocal.getRawFileSystem();
  for(int i=0; i < reps; ++i) {
    writeAndReadLocalFile("local", conf, SIZE);
    writeAndReadFile(rawLocal, "raw", conf, SIZE);
    writeAndReadFile(checkedLocal, "checked", conf, SIZE);
  }
  MiniDFSCluster cluster = null;
  try {
    cluster = new MiniDFSCluster(conf, 1, true, new String[]{"/foo"});
    cluster.waitActive();
    FileSystem dfs = cluster.getFileSystem();
    for(int i=0; i < reps; ++i) {
      writeAndReadFile(dfs, "dfs", conf, SIZE);
    }
  } finally {
    if (cluster != null) {
      cluster.shutdown();
      // clean up minidfs junk
      rawLocal.delete(new Path(localDir, "dfs"), true);
    }
  }
  return 0;
}
 
 类所在包
 同包方法