类org.apache.hadoop.io.compress.DefaultCodec源码实例Demo

下面列出了怎么用org.apache.hadoop.io.compress.DefaultCodec的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: TestIFile.java

@Test
/**
 * Create an IFile.Writer using GzipCodec since this code does not
 * have a compressor when run via the tests (ie no native libraries).
 */
public void testIFileWriterWithCodec() throws Exception {
  Configuration conf = new Configuration();
  FileSystem localFs = FileSystem.getLocal(conf);
  FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
  Path path = new Path(new Path("build/test.ifile"), "data");
  DefaultCodec codec = new GzipCodec();
  codec.setConf(conf);
  IFile.Writer<Text, Text> writer =
    new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class,
                                 codec, null);
  writer.close();
}
 
源代码2 项目: hadoop   文件: TestIFile.java

@Test
/** Same as above but create a reader. */
public void testIFileReaderWithCodec() throws Exception {
  Configuration conf = new Configuration();
  FileSystem localFs = FileSystem.getLocal(conf);
  FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
  Path path = new Path(new Path("build/test.ifile"), "data");
  DefaultCodec codec = new GzipCodec();
  codec.setConf(conf);
  FSDataOutputStream out = rfs.create(path);
  IFile.Writer<Text, Text> writer =
      new IFile.Writer<Text, Text>(conf, out, Text.class, Text.class,
                                   codec, null);
  writer.close();
  FSDataInputStream in = rfs.open(path);
  IFile.Reader<Text, Text> reader =
    new IFile.Reader<Text, Text>(conf, in, rfs.getFileStatus(path).getLen(),
        codec, null);
  reader.close();
  
  // test check sum 
  byte[] ab= new byte[100];
  int readed= reader.checksumIn.readWithChecksum(ab, 0, ab.length);
  assertEquals( readed,reader.checksumIn.getChecksum().length);
  
}
 

private Configuration createConfiguration(OutputContext outputContext,
    Class<? extends Writable> keyClass, Class<? extends Writable> valClass,
    boolean shouldCompress, int maxSingleBufferSizeBytes,
    Class<? extends Partitioner> partitionerClass) {
  Configuration conf = new Configuration(false);
  conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
  conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, outputContext.getWorkDirs());
  conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClass.getName());
  conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valClass.getName());
  conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, partitionerClass.getName());
  if (maxSingleBufferSizeBytes >= 0) {
    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES,
        maxSingleBufferSizeBytes);
  }
  conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, shouldCompress);
  if (shouldCompress) {
    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC,
        DefaultCodec.class.getName());
  }
  conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS,
      reportPartitionStats.getType());
  return conf;
}
 
源代码4 项目: big-c   文件: TestIFile.java

@Test
/**
 * Create an IFile.Writer using GzipCodec since this code does not
 * have a compressor when run via the tests (ie no native libraries).
 */
public void testIFileWriterWithCodec() throws Exception {
  Configuration conf = new Configuration();
  FileSystem localFs = FileSystem.getLocal(conf);
  FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
  Path path = new Path(new Path("build/test.ifile"), "data");
  DefaultCodec codec = new GzipCodec();
  codec.setConf(conf);
  IFile.Writer<Text, Text> writer =
    new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class,
                                 codec, null);
  writer.close();
}
 
源代码5 项目: big-c   文件: TestIFile.java

@Test
/** Same as above but create a reader. */
public void testIFileReaderWithCodec() throws Exception {
  Configuration conf = new Configuration();
  FileSystem localFs = FileSystem.getLocal(conf);
  FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
  Path path = new Path(new Path("build/test.ifile"), "data");
  DefaultCodec codec = new GzipCodec();
  codec.setConf(conf);
  FSDataOutputStream out = rfs.create(path);
  IFile.Writer<Text, Text> writer =
      new IFile.Writer<Text, Text>(conf, out, Text.class, Text.class,
                                   codec, null);
  writer.close();
  FSDataInputStream in = rfs.open(path);
  IFile.Reader<Text, Text> reader =
    new IFile.Reader<Text, Text>(conf, in, rfs.getFileStatus(path).getLen(),
        codec, null);
  reader.close();
  
  // test check sum 
  byte[] ab= new byte[100];
  int readed= reader.checksumIn.readWithChecksum(ab, 0, ab.length);
  assertEquals( readed,reader.checksumIn.getChecksum().length);
  
}
 
源代码6 项目: tez   文件: ConfigUtils.java

public static Class<? extends CompressionCodec> getIntermediateInputCompressorClass(
    Configuration conf, Class<DefaultCodec> defaultValue) {
  Class<? extends CompressionCodec> codecClass = defaultValue;
  String name = conf
      .get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC);
  if (name != null) {
    try {
      codecClass = conf.getClassByName(name).asSubclass(
          CompressionCodec.class);
    } catch (ClassNotFoundException e) {
      throw new IllegalArgumentException("Compression codec " + name
          + " was not found.", e);
    }
  }
  return codecClass;
}
 
源代码7 项目: hiped2   文件: SequenceFileStoreFunc.java

@Override
public void setStoreLocation(String location, Job job)
    throws IOException {
  job.setOutputKeyClass(keyClass);
  job.setOutputValueClass(valueClass);
  if (compressionType != null && compressionCodecClass != null) {
    Class<? extends CompressionCodec> codecClass =
        FileOutputFormat.getOutputCompressorClass(job,
            DefaultCodec.class);
    SequenceFileOutputFormat.
        setOutputCompressorClass(job, codecClass);
    SequenceFileOutputFormat.setOutputCompressionType(job,
        SequenceFile.CompressionType.valueOf(compressionType));
  }
  FileOutputFormat.setOutputPath(job, new Path(location));
}
 
源代码8 项目: secor   文件: FileRegistryTest.java

private FileWriter createWriter() throws Exception {
    PowerMockito.mockStatic(FileUtil.class);

    PowerMockito.mockStatic(ReflectionUtil.class);
    FileWriter writer = Mockito.mock(FileWriter.class);
    Mockito.when(
            ReflectionUtil.createFileWriter(
                    Mockito.any(String.class),
                    Mockito.any(LogFilePath.class),
                    Mockito.any(CompressionCodec.class),
                    Mockito.any(SecorConfig.class)
            ))
            .thenReturn(writer);

    Mockito.when(writer.getLength()).thenReturn(123L);

    FileWriter createdWriter = mRegistry.getOrCreateWriter(
            mLogFilePath, new DefaultCodec());
    assertTrue(createdWriter == writer);

    return writer;
}
 

private static CompressionCodecName getCodec(JobConf conf) {

        CompressionCodecName codec;

        if (ParquetOutputFormat.isCompressionSet(conf)) { // explicit parquet config
            codec = ParquetOutputFormat.getCompression(conf);
        } else if (getCompressOutput(conf)) { // from hadoop config
            // find the right codec
            Class<?> codecClass = getOutputCompressorClass(conf, DefaultCodec.class);
            LOG.info("Compression set through hadoop codec: " + codecClass.getName());
            codec = CompressionCodecName.fromCompressionCodec(codecClass);
        } else {
            codec = CompressionCodecName.UNCOMPRESSED;
        }

        LOG.info("Compression: " + codec.name());
        return codec;
    }
 
源代码10 项目: RDFS   文件: ReduceTask.java

public MapOutputCopier(JobConf job, Reporter reporter) {
  setName("MapOutputCopier " + reduceTask.getTaskID() + "." + id);
  LOG.debug(getName() + " created");
  this.reporter = reporter;

  shuffleConnectionTimeout =
    job.getInt("mapreduce.reduce.shuffle.connect.timeout", STALLED_COPY_TIMEOUT);
  shuffleReadTimeout =
    job.getInt("mapreduce.reduce.shuffle.read.timeout", DEFAULT_READ_TIMEOUT);

  if (job.getCompressMapOutput()) {
    Class<? extends CompressionCodec> codecClass =
      job.getMapOutputCompressorClass(DefaultCodec.class);
    codec = ReflectionUtils.newInstance(codecClass, job);
    decompressor = CodecPool.getDecompressor(codec);
  }
  setDaemon(true);
}
 
源代码11 项目: sylph   文件: HdfsSink2.java

public HdfsSink2(Hdfs2SinkConfig config)
        throws ClassNotFoundException
{
    this.batchSize = config.getBatchBufferSize();
    this.writerDir = config.getWriteDir();
    switch (config.getZipType().trim().toLowerCase()) {
        case "lzo":
            codecClass = (Class<? extends CompressionCodec>) Class.forName("com.hadoop.compression.lzo.LzopCodec");
            break;
        case "lz4":
            codecClass = Lz4Codec.class;
            break;
        case "snappy":
            codecClass = SnappyCodec.class;
            break;
        case "gzip":
            codecClass = GzipCodec.class;
            break;
        case "bzip2":
            codecClass = BZip2Codec.class;
            break;
        case "default":
            codecClass = DefaultCodec.class;
            break;
        default:
            codecClass = NoneCodec.class;
    }
}
 
源代码12 项目: hadoop   文件: TestReduceTask.java

public void testValueIteratorWithCompression() throws Exception {
  Path tmpDir = new Path("build/test/test.reduce.task.compression");
  Configuration conf = new Configuration();
  DefaultCodec codec = new DefaultCodec();
  codec.setConf(conf);
  for (Pair[] testCase: testCases) {
    runValueIterator(tmpDir, testCase, conf, codec);
  }
}
 
源代码13 项目: hadoop   文件: MapFileOutputFormat.java

public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
                                    String name, Progressable progress)
  throws IOException {
  // get the path of the temporary output file 
  Path file = FileOutputFormat.getTaskOutputPath(job, name);
  
  FileSystem fs = file.getFileSystem(job);
  CompressionCodec codec = null;
  CompressionType compressionType = CompressionType.NONE;
  if (getCompressOutput(job)) {
    // find the kind of compression to do
    compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);

    // find the right codec
    Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
 DefaultCodec.class);
    codec = ReflectionUtils.newInstance(codecClass, job);
  }
  
  // ignore the progress parameter, since MapFile is local
  final MapFile.Writer out =
    new MapFile.Writer(job, fs, file.toString(),
                       job.getOutputKeyClass().asSubclass(WritableComparable.class),
                       job.getOutputValueClass().asSubclass(Writable.class),
                       compressionType, codec,
                       progress);

  return new RecordWriter<WritableComparable, Writable>() {

      public void write(WritableComparable key, Writable value)
        throws IOException {

        out.append(key, value);
      }

      public void close(Reporter reporter) throws IOException { out.close();}
    };
}
 
源代码14 项目: hadoop-gpu   文件: ReduceTask.java

private CompressionCodec initCodec() {
  // check if map-outputs are to be compressed
  if (conf.getCompressMapOutput()) {
    Class<? extends CompressionCodec> codecClass =
      conf.getMapOutputCompressorClass(DefaultCodec.class);
    return ReflectionUtils.newInstance(codecClass, conf);
  } 

  return null;
}
 
源代码15 项目: hadoop   文件: SequenceFileOutputFormat.java

public RecordWriter<K, V> getRecordWriter(
                                        FileSystem ignored, JobConf job,
                                        String name, Progressable progress)
  throws IOException {
  // get the path of the temporary output file 
  Path file = FileOutputFormat.getTaskOutputPath(job, name);
  
  FileSystem fs = file.getFileSystem(job);
  CompressionCodec codec = null;
  CompressionType compressionType = CompressionType.NONE;
  if (getCompressOutput(job)) {
    // find the kind of compression to do
    compressionType = getOutputCompressionType(job);

    // find the right codec
    Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
 DefaultCodec.class);
    codec = ReflectionUtils.newInstance(codecClass, job);
  }
  final SequenceFile.Writer out = 
    SequenceFile.createWriter(fs, job, file,
                              job.getOutputKeyClass(),
                              job.getOutputValueClass(),
                              compressionType,
                              codec,
                              progress);

  return new RecordWriter<K, V>() {

      public void write(K key, V value)
        throws IOException {

        out.append(key, value);
      }

      public void close(Reporter reporter) throws IOException { out.close();}
    };
}
 
源代码16 项目: hadoop   文件: TestHSync.java

/** Test hsync via SequenceFiles */
@Test
public void testSequenceFileSync() throws Exception {
  Configuration conf = new HdfsConfiguration();
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();

  final FileSystem fs = cluster.getFileSystem();
  final Path p = new Path("/testSequenceFileSync/foo");
  final int len = 1 << 16;
  FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
      EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
      4096, (short) 1, len, null);
  Writer w = SequenceFile.createWriter(new Configuration(),
      Writer.stream(out),
      Writer.keyClass(RandomDatum.class),
      Writer.valueClass(RandomDatum.class),
      Writer.compression(CompressionType.NONE, new DefaultCodec()));
  w.hflush();
  checkSyncMetric(cluster, 0);
  w.hsync();
  checkSyncMetric(cluster, 1);
  int seed = new Random().nextInt();
  RandomDatum.Generator generator = new RandomDatum.Generator(seed);
  generator.next();
  w.append(generator.getKey(), generator.getValue());
  w.hsync();
  checkSyncMetric(cluster, 2);
  w.close();
  checkSyncMetric(cluster, 2);
  out.close();
  checkSyncMetric(cluster, 3);
  cluster.shutdown();
}
 
源代码17 项目: big-c   文件: TestReduceTask.java

public void testValueIteratorWithCompression() throws Exception {
  Path tmpDir = new Path("build/test/test.reduce.task.compression");
  Configuration conf = new Configuration();
  DefaultCodec codec = new DefaultCodec();
  codec.setConf(conf);
  for (Pair[] testCase: testCases) {
    runValueIterator(tmpDir, testCase, conf, codec);
  }
}
 
源代码18 项目: hadoop-gpu   文件: MapFile.java

/** Create the named map using the named key comparator. */
public Writer(Configuration conf, FileSystem fs, String dirName,
              WritableComparator comparator, Class valClass,
              SequenceFile.CompressionType compress,
              Progressable progress)
  throws IOException {
  this(conf, fs, dirName, comparator, valClass, 
       compress, new DefaultCodec(), progress);
}
 
源代码19 项目: big-c   文件: ReduceTask.java

private CompressionCodec initCodec() {
  // check if map-outputs are to be compressed
  if (conf.getCompressMapOutput()) {
    Class<? extends CompressionCodec> codecClass =
      conf.getMapOutputCompressorClass(DefaultCodec.class);
    return ReflectionUtils.newInstance(codecClass, conf);
  } 

  return null;
}
 
源代码20 项目: big-c   文件: SequenceFileOutputFormat.java

public RecordWriter<K, V> getRecordWriter(
                                        FileSystem ignored, JobConf job,
                                        String name, Progressable progress)
  throws IOException {
  // get the path of the temporary output file 
  Path file = FileOutputFormat.getTaskOutputPath(job, name);
  
  FileSystem fs = file.getFileSystem(job);
  CompressionCodec codec = null;
  CompressionType compressionType = CompressionType.NONE;
  if (getCompressOutput(job)) {
    // find the kind of compression to do
    compressionType = getOutputCompressionType(job);

    // find the right codec
    Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
 DefaultCodec.class);
    codec = ReflectionUtils.newInstance(codecClass, job);
  }
  final SequenceFile.Writer out = 
    SequenceFile.createWriter(fs, job, file,
                              job.getOutputKeyClass(),
                              job.getOutputValueClass(),
                              compressionType,
                              codec,
                              progress);

  return new RecordWriter<K, V>() {

      public void write(K key, V value)
        throws IOException {

        out.append(key, value);
      }

      public void close(Reporter reporter) throws IOException { out.close();}
    };
}
 
源代码21 项目: big-c   文件: TestHSync.java

/** Test hsync via SequenceFiles */
@Test
public void testSequenceFileSync() throws Exception {
  Configuration conf = new HdfsConfiguration();
  MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();

  final FileSystem fs = cluster.getFileSystem();
  final Path p = new Path("/testSequenceFileSync/foo");
  final int len = 1 << 16;
  FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
      EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
      4096, (short) 1, len, null);
  Writer w = SequenceFile.createWriter(new Configuration(),
      Writer.stream(out),
      Writer.keyClass(RandomDatum.class),
      Writer.valueClass(RandomDatum.class),
      Writer.compression(CompressionType.NONE, new DefaultCodec()));
  w.hflush();
  checkSyncMetric(cluster, 0);
  w.hsync();
  checkSyncMetric(cluster, 1);
  int seed = new Random().nextInt();
  RandomDatum.Generator generator = new RandomDatum.Generator(seed);
  generator.next();
  w.append(generator.getKey(), generator.getValue());
  w.hsync();
  checkSyncMetric(cluster, 2);
  w.close();
  checkSyncMetric(cluster, 2);
  out.close();
  checkSyncMetric(cluster, 3);
  cluster.shutdown();
}
 
源代码22 项目: pulsar   文件: AbstractHdfsConnector.java

protected CompressionCodec getCompressionCodec() {
   if (connectorConfig.getCompression() == null) {
       return null;
   }

   CompressionCodec codec = getCompressionCodecFactory()
           .getCodecByName(connectorConfig.getCompression().name());

   return (codec != null) ? codec : new DefaultCodec();
}
 
源代码23 项目: pulsar   文件: AbstractHdfsConnector.java

protected CompressionCodec getCompressionCodec() {
   if (connectorConfig.getCompression() == null) {
       return null;
   }

   CompressionCodec codec = getCompressionCodecFactory()
           .getCodecByName(connectorConfig.getCompression().name());

   return (codec != null) ? codec : new DefaultCodec();
}
 
源代码24 项目: sqoop-on-spark   文件: TestExtractor.java

@DataProvider(name="test-hdfs-extractor")
public static Object[][] data() {
  List<Object[]> parameters = new ArrayList<Object[]>();
  for (Class<?> compressionClass : new Class<?>[]{null, DefaultCodec.class, BZip2Codec.class}) {
    for (Object outputFileType : new Object[]{TEXT_FILE, SEQUENCE_FILE}) {
      parameters.add(new Object[]{outputFileType, compressionClass});
    }
  }
  return parameters.toArray(new Object[0][]);
}
 
源代码25 项目: hadoop-gpu   文件: MapFileOutputFormat.java

public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
                                    String name, Progressable progress)
  throws IOException {
  // get the path of the temporary output file 
  Path file = FileOutputFormat.getTaskOutputPath(job, name);
  
  FileSystem fs = file.getFileSystem(job);
  CompressionCodec codec = null;
  CompressionType compressionType = CompressionType.NONE;
  if (getCompressOutput(job)) {
    // find the kind of compression to do
    compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);

    // find the right codec
    Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
 DefaultCodec.class);
    codec = ReflectionUtils.newInstance(codecClass, job);
  }
  
  // ignore the progress parameter, since MapFile is local
  final MapFile.Writer out =
    new MapFile.Writer(job, fs, file.toString(),
                       job.getOutputKeyClass().asSubclass(WritableComparable.class),
                       job.getOutputValueClass().asSubclass(Writable.class),
                       compressionType, codec,
                       progress);

  return new RecordWriter<WritableComparable, Writable>() {

      public void write(WritableComparable key, Writable value)
        throws IOException {

        out.append(key, value);
      }

      public void close(Reporter reporter) throws IOException { out.close();}
    };
}
 
源代码26 项目: hadoop-gpu   文件: TestReduceTask.java

public void testValueIteratorWithCompression() throws Exception {
  Path tmpDir = new Path("build/test/test.reduce.task.compression");
  Configuration conf = new Configuration();
  DefaultCodec codec = new DefaultCodec();
  codec.setConf(conf);
  for (Pair[] testCase: testCases) {
    runValueIterator(tmpDir, testCase, conf, codec);
  }
}
 
源代码27 项目: hbase   文件: Compression.java

@Override
DefaultCodec getCodec(Configuration conf) {
  if (codec == null) {
    synchronized (lock) {
      if (codec == null) {
        codec = buildCodec(conf);
      }
    }
  }

  return codec;
}
 
源代码28 项目: incubator-gobblin   文件: FsStateStore.java

/**
 * See {@link StateStore#put(String, String, T)}.
 *
 * <p>
 *   This implementation does not support putting the state object into an existing store as
 *   append is to be supported by the Hadoop SequenceFile (HADOOP-7139).
 * </p>
 */
@Override
public void put(String storeName, String tableName, T state) throws IOException {
  String tmpTableName = this.useTmpFileForPut ? TMP_FILE_PREFIX + tableName : tableName;
  Path tmpTablePath = new Path(new Path(this.storeRootDir, storeName), tmpTableName);

  if (!this.fs.exists(tmpTablePath) && !create(storeName, tmpTableName)) {
    throw new IOException("Failed to create a state file for table " + tmpTableName);
  }

  Closer closer = Closer.create();
  try {
    @SuppressWarnings("deprecation")
    SequenceFile.Writer writer = closer.register(SequenceFile.createWriter(this.fs, this.conf, tmpTablePath,
        Text.class, this.stateClass, SequenceFile.CompressionType.BLOCK, new DefaultCodec()));
    writer.append(new Text(Strings.nullToEmpty(state.getId())), state);
  } catch (Throwable t) {
    throw closer.rethrow(t);
  } finally {
    closer.close();
  }

  if (this.useTmpFileForPut) {
    Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
    renamePath(tmpTablePath, tablePath);
  }
}
 
源代码29 项目: incubator-gobblin   文件: FsStateStore.java

/**
 * See {@link StateStore#putAll(String, String, Collection)}.
 *
 * <p>
 *   This implementation does not support putting the state objects into an existing store as
 *   append is to be supported by the Hadoop SequenceFile (HADOOP-7139).
 * </p>
 */
@Override
public void putAll(String storeName, String tableName, Collection<T> states) throws IOException {
  String tmpTableName = this.useTmpFileForPut ? TMP_FILE_PREFIX + tableName : tableName;
  Path tmpTablePath = new Path(new Path(this.storeRootDir, storeName), tmpTableName);

  if (!this.fs.exists(tmpTablePath) && !create(storeName, tmpTableName)) {
    throw new IOException("Failed to create a state file for table " + tmpTableName);
  }

  Closer closer = Closer.create();
  try {
    @SuppressWarnings("deprecation")
    SequenceFile.Writer writer = closer.register(SequenceFile.createWriter(this.fs, this.conf, tmpTablePath,
        Text.class, this.stateClass, SequenceFile.CompressionType.BLOCK, new DefaultCodec()));
    for (T state : states) {
      writer.append(new Text(Strings.nullToEmpty(state.getId())), state);
    }
  } catch (Throwable t) {
    throw closer.rethrow(t);
  } finally {
    closer.close();
  }

  if (this.useTmpFileForPut) {
    Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
    renamePath(tmpTablePath, tablePath);
  }
}
 

/**
   * Write the sequence file.
   *
   * @param args the command-line arguments
   * @return the process exit code
   * @throws Exception if something goes wrong
   */
  public int run(final String[] args) throws Exception {

    Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
    int result = cli.runCmd();

    if (result != 0) {
      return result;
    }

    Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
    Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));

    Configuration conf = super.getConf();

    Job job = new Job(conf);
    job.setJarByClass(SequenceFileProtobufMapReduce.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Stock.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);

    job.setMapperClass(PbMapper.class);
    job.setReducerClass(PbReducer.class);

    SequenceFileOutputFormat.setCompressOutput(job, true);
    SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
    SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);

  ProtobufSerialization.register(job.getConfiguration());

  FileInputFormat.setInputPaths(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  if (job.waitForCompletion(true)) {
    return 0;
  }
  return 1;
}
 
 类所在包
 类方法
 同包方法