org.apache.hadoop.mapred.IFile.Writer#close ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.IFile.Writer#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }

  TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments, 0);
  int noInMemorySegments = inMemorySegments.size();
  
  InMemoryMapOutput<K, V> mergedMapOutputs = 
    unconditionalReserve(dummyMapId, mergeOutputSize, false);
  
  Writer<K, V> writer = 
    new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
  
  LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
           " segments of total-size: " + mergeOutputSize);

  RawKeyValueIterator rIter = 
    Merger.merge(jobConf, rfs,
                 (Class<K>)jobConf.getMapOutputKeyClass(),
                 (Class<V>)jobConf.getMapOutputValueClass(),
                 inMemorySegments, inMemorySegments.size(),
                 new Path(reduceId.toString()),
                 (RawComparator<K>)jobConf.getOutputKeyComparator(),
                 reporter, null, null, null);
  Merger.writeFile(rIter, writer, reporter, jobConf);
  writer.close();

  LOG.info(reduceId +  
           " Memory-to-Memory merge of the " + noInMemorySegments +
           " files in-memory complete.");

  // Note the output of the merge
  closeInMemoryMergedFile(mergedMapOutputs);
}
 
源代码2 项目: big-c   文件: MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }

  TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments, 0);
  int noInMemorySegments = inMemorySegments.size();
  
  InMemoryMapOutput<K, V> mergedMapOutputs = 
    unconditionalReserve(dummyMapId, mergeOutputSize, false);
  
  Writer<K, V> writer = 
    new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
  
  LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
           " segments of total-size: " + mergeOutputSize);

  RawKeyValueIterator rIter = 
    Merger.merge(jobConf, rfs,
                 (Class<K>)jobConf.getMapOutputKeyClass(),
                 (Class<V>)jobConf.getMapOutputValueClass(),
                 inMemorySegments, inMemorySegments.size(),
                 new Path(reduceId.toString()),
                 (RawComparator<K>)jobConf.getOutputKeyComparator(),
                 reporter, null, null, null);
  Merger.writeFile(rIter, writer, reporter, jobConf);
  writer.close();

  LOG.info(reduceId +  
           " Memory-to-Memory merge of the " + noInMemorySegments +
           " files in-memory complete.");

  // Note the output of the merge
  closeInMemoryMergedFile(mergedMapOutputs);
}
 
源代码3 项目: hadoop   文件: TestPipeApplication.java
/**
 * test org.apache.hadoop.mapred.pipes.Application
 * test a internal functions: MessageType.REGISTER_COUNTER,  INCREMENT_COUNTER, STATUS, PROGRESS...
 *
 * @throws Throwable
 */

@Test
public void testApplication() throws Throwable {
  JobConf conf = new JobConf();

  RecordReader<FloatWritable, NullWritable> rReader = new Reader();

  // client for test
  File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationStub");

  TestTaskReporter reporter = new TestTaskReporter();

  File[] psw = cleanTokenPasswordFile();
  try {

    conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskName);
    conf.set(MRJobConfig.CACHE_LOCALFILES, fCommand.getAbsolutePath());

    // token for authorization
    Token<AMRMTokenIdentifier> token = new Token<AMRMTokenIdentifier>(
            "user".getBytes(), "password".getBytes(), new Text("kind"), new Text(
            "service"));

    TokenCache.setJobToken(token, conf.getCredentials());
    FakeCollector output = new FakeCollector(new Counters.Counter(),
            new Progress());
    FileSystem fs = new RawLocalFileSystem();
    fs.setConf(conf);
    Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs.create(
            new Path(workSpace.getAbsolutePath() + File.separator + "outfile")),
            IntWritable.class, Text.class, null, null, true);
    output.setWriter(wr);
    conf.set(Submitter.PRESERVE_COMMANDFILE, "true");

    initStdOut(conf);

    Application<WritableComparable<IntWritable>, Writable, IntWritable, Text> application = new Application<WritableComparable<IntWritable>, Writable, IntWritable, Text>(
            conf, rReader, output, reporter, IntWritable.class, Text.class);
    application.getDownlink().flush();

    application.getDownlink().mapItem(new IntWritable(3), new Text("txt"));

    application.getDownlink().flush();

    application.waitForFinish();

    wr.close();

    // test getDownlink().mapItem();
    String stdOut = readStdOut(conf);
    assertTrue(stdOut.contains("key:3"));
    assertTrue(stdOut.contains("value:txt"));

    // reporter test counter, and status should be sended
    // test MessageType.REGISTER_COUNTER and INCREMENT_COUNTER
    assertEquals(1.0, reporter.getProgress(), 0.01);
    assertNotNull(reporter.getCounter("group", "name"));
    // test status MessageType.STATUS
    assertEquals(reporter.getStatus(), "PROGRESS");
    stdOut = readFile(new File(workSpace.getAbsolutePath() + File.separator
            + "outfile"));
    // check MessageType.PROGRESS
    assertEquals(0.55f, rReader.getProgress(), 0.001);
    application.getDownlink().close();
    // test MessageType.OUTPUT
    Entry<IntWritable, Text> entry = output.getCollect().entrySet()
            .iterator().next();
    assertEquals(123, entry.getKey().get());
    assertEquals("value", entry.getValue().toString());
    try {
      // try to abort
      application.abort(new Throwable());
      fail();
    } catch (IOException e) {
      // abort works ?
      assertEquals("pipe child exception", e.getMessage());
    }
  } finally {
    if (psw != null) {
      // remove password files
      for (File file : psw) {
        file.deleteOnExit();
      }
    }
  }
}
 
源代码4 项目: hadoop   文件: MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }
  
  //name this output file same as the name of the first file that is 
  //there in the current list of inmem files (this is guaranteed to
  //be absent on the disk currently. So we don't overwrite a prev. 
  //created spill). Also we need to create the output file now since
  //it is not guaranteed that this file will be present after merge
  //is called (we delete empty files as soon as we see them
  //in the merge method)

  //figure out the mapId 
  TaskAttemptID mapId = inputs.get(0).getMapId();
  TaskID mapTaskId = mapId.getTaskID();

  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments,0);
  int noInMemorySegments = inMemorySegments.size();

  Path outputPath = 
    mapOutputFile.getInputFileForWrite(mapTaskId,
                                       mergeOutputSize).suffix(
                                           Task.MERGED_OUTPUT_PREFIX);

  FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
  Writer<K, V> writer = new Writer<K, V>(jobConf, out,
      (Class<K>) jobConf.getMapOutputKeyClass(),
      (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

  RawKeyValueIterator rIter = null;
  CompressAwarePath compressAwarePath;
  try {
    LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
             " segments...");
    
    rIter = Merger.merge(jobConf, rfs,
                         (Class<K>)jobConf.getMapOutputKeyClass(),
                         (Class<V>)jobConf.getMapOutputValueClass(),
                         inMemorySegments, inMemorySegments.size(),
                         new Path(reduceId.toString()),
                         (RawComparator<K>)jobConf.getOutputKeyComparator(),
                         reporter, spilledRecordsCounter, null, null);
    
    if (null == combinerClass) {
      Merger.writeFile(rIter, writer, reporter, jobConf);
    } else {
      combineCollector.setWriter(writer);
      combineAndSpill(rIter, reduceCombineInputCounter);
    }
    writer.close();
    compressAwarePath = new CompressAwarePath(outputPath,
        writer.getRawLength(), writer.getCompressedLength());

    LOG.info(reduceId +  
        " Merge of the " + noInMemorySegments +
        " files in-memory complete." +
        " Local file is " + outputPath + " of size " + 
        localFS.getFileStatus(outputPath).getLen());
  } catch (IOException e) { 
    //make sure that we delete the ondisk file that we created 
    //earlier when we invoked cloneFileAttributes
    localFS.delete(outputPath, true);
    throw e;
  }

  // Note the output of the merge
  closeOnDiskFile(compressAwarePath);
}
 
源代码5 项目: hadoop   文件: MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
  // sanity check
  if (inputs == null || inputs.isEmpty()) {
    LOG.info("No ondisk files to merge...");
    return;
  }
  
  long approxOutputSize = 0;
  int bytesPerSum = 
    jobConf.getInt("io.bytes.per.checksum", 512);
  
  LOG.info("OnDiskMerger: We have  " + inputs.size() + 
           " map outputs on disk. Triggering merge...");
  
  // 1. Prepare the list of files to be merged. 
  for (CompressAwarePath file : inputs) {
    approxOutputSize += localFS.getFileStatus(file).getLen();
  }

  // add the checksum length
  approxOutputSize += 
    ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);

  // 2. Start the on-disk merge process
  Path outputPath = 
    localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
        approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);

  FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
  Writer<K, V> writer = new Writer<K, V>(jobConf, out,
      (Class<K>) jobConf.getMapOutputKeyClass(),
      (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

  RawKeyValueIterator iter  = null;
  CompressAwarePath compressAwarePath;
  Path tmpDir = new Path(reduceId.toString());
  try {
    iter = Merger.merge(jobConf, rfs,
                        (Class<K>) jobConf.getMapOutputKeyClass(),
                        (Class<V>) jobConf.getMapOutputValueClass(),
                        codec, inputs.toArray(new Path[inputs.size()]), 
                        true, ioSortFactor, tmpDir, 
                        (RawComparator<K>) jobConf.getOutputKeyComparator(), 
                        reporter, spilledRecordsCounter, null, 
                        mergedMapOutputsCounter, null);

    Merger.writeFile(iter, writer, reporter, jobConf);
    writer.close();
    compressAwarePath = new CompressAwarePath(outputPath,
        writer.getRawLength(), writer.getCompressedLength());
  } catch (IOException e) {
    localFS.delete(outputPath, true);
    throw e;
  }

  closeOnDiskFile(compressAwarePath);

  LOG.info(reduceId +
      " Finished merging " + inputs.size() + 
      " map output files on disk of total-size " + 
      approxOutputSize + "." + 
      " Local output file is " + outputPath + " of size " +
      localFS.getFileStatus(outputPath).getLen());
}
 
源代码6 项目: big-c   文件: TestPipeApplication.java
/**
 * test org.apache.hadoop.mapred.pipes.Application
 * test a internal functions: MessageType.REGISTER_COUNTER,  INCREMENT_COUNTER, STATUS, PROGRESS...
 *
 * @throws Throwable
 */

@Test
public void testApplication() throws Throwable {
  JobConf conf = new JobConf();

  RecordReader<FloatWritable, NullWritable> rReader = new Reader();

  // client for test
  File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationStub");

  TestTaskReporter reporter = new TestTaskReporter();

  File[] psw = cleanTokenPasswordFile();
  try {

    conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskName);
    conf.set(MRJobConfig.CACHE_LOCALFILES, fCommand.getAbsolutePath());

    // token for authorization
    Token<AMRMTokenIdentifier> token = new Token<AMRMTokenIdentifier>(
            "user".getBytes(), "password".getBytes(), new Text("kind"), new Text(
            "service"));

    TokenCache.setJobToken(token, conf.getCredentials());
    FakeCollector output = new FakeCollector(new Counters.Counter(),
            new Progress());
    FileSystem fs = new RawLocalFileSystem();
    fs.setConf(conf);
    Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs.create(
            new Path(workSpace.getAbsolutePath() + File.separator + "outfile")),
            IntWritable.class, Text.class, null, null, true);
    output.setWriter(wr);
    conf.set(Submitter.PRESERVE_COMMANDFILE, "true");

    initStdOut(conf);

    Application<WritableComparable<IntWritable>, Writable, IntWritable, Text> application = new Application<WritableComparable<IntWritable>, Writable, IntWritable, Text>(
            conf, rReader, output, reporter, IntWritable.class, Text.class);
    application.getDownlink().flush();

    application.getDownlink().mapItem(new IntWritable(3), new Text("txt"));

    application.getDownlink().flush();

    application.waitForFinish();

    wr.close();

    // test getDownlink().mapItem();
    String stdOut = readStdOut(conf);
    assertTrue(stdOut.contains("key:3"));
    assertTrue(stdOut.contains("value:txt"));

    // reporter test counter, and status should be sended
    // test MessageType.REGISTER_COUNTER and INCREMENT_COUNTER
    assertEquals(1.0, reporter.getProgress(), 0.01);
    assertNotNull(reporter.getCounter("group", "name"));
    // test status MessageType.STATUS
    assertEquals(reporter.getStatus(), "PROGRESS");
    stdOut = readFile(new File(workSpace.getAbsolutePath() + File.separator
            + "outfile"));
    // check MessageType.PROGRESS
    assertEquals(0.55f, rReader.getProgress(), 0.001);
    application.getDownlink().close();
    // test MessageType.OUTPUT
    Entry<IntWritable, Text> entry = output.getCollect().entrySet()
            .iterator().next();
    assertEquals(123, entry.getKey().get());
    assertEquals("value", entry.getValue().toString());
    try {
      // try to abort
      application.abort(new Throwable());
      fail();
    } catch (IOException e) {
      // abort works ?
      assertEquals("pipe child exception", e.getMessage());
    }
  } finally {
    if (psw != null) {
      // remove password files
      for (File file : psw) {
        file.deleteOnExit();
      }
    }
  }
}
 
源代码7 项目: big-c   文件: MergeManagerImpl.java
@Override
public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {
  if (inputs == null || inputs.size() == 0) {
    return;
  }
  
  //name this output file same as the name of the first file that is 
  //there in the current list of inmem files (this is guaranteed to
  //be absent on the disk currently. So we don't overwrite a prev. 
  //created spill). Also we need to create the output file now since
  //it is not guaranteed that this file will be present after merge
  //is called (we delete empty files as soon as we see them
  //in the merge method)

  //figure out the mapId 
  TaskAttemptID mapId = inputs.get(0).getMapId();
  TaskID mapTaskId = mapId.getTaskID();

  List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
  long mergeOutputSize = 
    createInMemorySegments(inputs, inMemorySegments,0);
  int noInMemorySegments = inMemorySegments.size();

  Path outputPath = 
    mapOutputFile.getInputFileForWrite(mapTaskId,
                                       mergeOutputSize).suffix(
                                           Task.MERGED_OUTPUT_PREFIX);

  FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
  Writer<K, V> writer = new Writer<K, V>(jobConf, out,
      (Class<K>) jobConf.getMapOutputKeyClass(),
      (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

  RawKeyValueIterator rIter = null;
  CompressAwarePath compressAwarePath;
  try {
    LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
             " segments...");
    
    rIter = Merger.merge(jobConf, rfs,
                         (Class<K>)jobConf.getMapOutputKeyClass(),
                         (Class<V>)jobConf.getMapOutputValueClass(),
                         inMemorySegments, inMemorySegments.size(),
                         new Path(reduceId.toString()),
                         (RawComparator<K>)jobConf.getOutputKeyComparator(),
                         reporter, spilledRecordsCounter, null, null);
    
    if (null == combinerClass) {
      Merger.writeFile(rIter, writer, reporter, jobConf);
    } else {
      combineCollector.setWriter(writer);
      combineAndSpill(rIter, reduceCombineInputCounter);
    }
    writer.close();
    compressAwarePath = new CompressAwarePath(outputPath,
        writer.getRawLength(), writer.getCompressedLength());

    LOG.info(reduceId +  
        " Merge of the " + noInMemorySegments +
        " files in-memory complete." +
        " Local file is " + outputPath + " of size " + 
        localFS.getFileStatus(outputPath).getLen());
  } catch (IOException e) { 
    //make sure that we delete the ondisk file that we created 
    //earlier when we invoked cloneFileAttributes
    localFS.delete(outputPath, true);
    throw e;
  }

  // Note the output of the merge
  closeOnDiskFile(compressAwarePath);
}
 
源代码8 项目: big-c   文件: MergeManagerImpl.java
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
  // sanity check
  if (inputs == null || inputs.isEmpty()) {
    LOG.info("No ondisk files to merge...");
    return;
  }
  
  long approxOutputSize = 0;
  int bytesPerSum = 
    jobConf.getInt("io.bytes.per.checksum", 512);
  
  LOG.info("OnDiskMerger: We have  " + inputs.size() + 
           " map outputs on disk. Triggering merge...");
  
  // 1. Prepare the list of files to be merged. 
  for (CompressAwarePath file : inputs) {
    approxOutputSize += localFS.getFileStatus(file).getLen();
  }

  // add the checksum length
  approxOutputSize += 
    ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);

  // 2. Start the on-disk merge process
  Path outputPath = 
    localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
        approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);

  FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
  Writer<K, V> writer = new Writer<K, V>(jobConf, out,
      (Class<K>) jobConf.getMapOutputKeyClass(),
      (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);

  RawKeyValueIterator iter  = null;
  CompressAwarePath compressAwarePath;
  Path tmpDir = new Path(reduceId.toString());
  try {
    iter = Merger.merge(jobConf, rfs,
                        (Class<K>) jobConf.getMapOutputKeyClass(),
                        (Class<V>) jobConf.getMapOutputValueClass(),
                        codec, inputs.toArray(new Path[inputs.size()]), 
                        true, ioSortFactor, tmpDir, 
                        (RawComparator<K>) jobConf.getOutputKeyComparator(), 
                        reporter, spilledRecordsCounter, null, 
                        mergedMapOutputsCounter, null);

    Merger.writeFile(iter, writer, reporter, jobConf);
    writer.close();
    compressAwarePath = new CompressAwarePath(outputPath,
        writer.getRawLength(), writer.getCompressedLength());
  } catch (IOException e) {
    localFS.delete(outputPath, true);
    throw e;
  }

  closeOnDiskFile(compressAwarePath);

  LOG.info(reduceId +
      " Finished merging " + inputs.size() + 
      " map output files on disk of total-size " + 
      approxOutputSize + "." + 
      " Local output file is " + outputPath + " of size " +
      localFS.getFileStatus(outputPath).getLen());
}
 
 同类方法