类org.apache.hadoop.io.compress.CompressionCodecFactory源码实例Demo

下面列出了怎么用org.apache.hadoop.io.compress.CompressionCodecFactory的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: flink   文件: CompressionFactoryITCase.java

@Test
public void testWriteCompressedFile() throws Exception {
	final File folder = TEMPORARY_FOLDER.newFolder();
	final Path testPath = Path.fromLocalFile(folder);

	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setParallelism(1);
	env.enableCheckpointing(100);

	DataStream<String> stream = env.addSource(
			new FiniteTestSource<>(testData),
			TypeInformation.of(String.class)
	);

	stream.map(str -> str).addSink(
			StreamingFileSink.forBulkFormat(
					testPath,
					CompressWriters.forExtractor(new DefaultExtractor<String>()).withHadoopCompression(TEST_CODEC_NAME)
			).build());

	env.execute();

	validateResults(folder, testData, new CompressionCodecFactory(configuration).getCodecByName(TEST_CODEC_NAME));
}
 
源代码2 项目: tajo   文件: TestInsertQuery.java

@Test
public final void testInsertOverwriteLocationWithCompression() throws Exception {
  if (!testingCluster.isHiveCatalogStoreRunning()) {
    ResultSet res = executeQuery();
    res.close();
    FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
    Path path = new Path("/tajo-data/testInsertOverwriteLocationWithCompression");
    assertTrue(fs.exists(path));
    assertEquals(1, fs.listStatus(path).length);

    CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
    for (FileStatus file : fs.listStatus(path)){
      CompressionCodec codec = factory.getCodec(file.getPath());
      assertTrue(codec instanceof DeflateCodec);
    }
  }
}
 
源代码3 项目: tinkerpop   文件: GryoRecordReader.java

@Override
public void initialize(final InputSplit genericSplit, final TaskAttemptContext context) throws IOException {
    final FileSplit split = (FileSplit) genericSplit;
    final Configuration configuration = context.getConfiguration();
    if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
        this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
    this.gryoReader = GryoReader.build().mapper(
            GryoMapper.build().addRegistries(IoRegistryHelper.createRegistries(ConfUtil.makeApacheConfiguration(configuration))).create()).create();
    long start = split.getStart();
    final Path file = split.getPath();
    if (null != new CompressionCodecFactory(configuration).getCodec(file)) {
        throw new IllegalStateException("Compression is not supported for the (binary) Gryo format");
    }
    // open the file and seek to the start of the split
    this.inputStream = file.getFileSystem(configuration).open(split.getPath());
    this.splitLength = split.getLength();
    if (this.splitLength > 0) this.splitLength -= (seekToHeader(this.inputStream, start) - start);
}
 
源代码4 项目: hadoop   文件: TestStandbyCheckpoints.java

protected Configuration setupCommonConfig() {
  tmpOivImgDir = Files.createTempDir();

  Configuration conf = new Configuration();
  conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
  conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
  conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
  conf.set(DFSConfigKeys.DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY,
      tmpOivImgDir.getAbsolutePath());
  conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
  conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
      SlowCodec.class.getCanonicalName());
  CompressionCodecFactory.setCodecClasses(conf,
      ImmutableList.<Class>of(SlowCodec.class));
  return conf;
}
 

@Before
public void setUp() throws Exception {
  Configuration conf = new Configuration();
  conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
  conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
  conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
  conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
      .createJournalURI("/bootstrapStandby").toString());
  BKJMUtil.addJournalManagerDefinition(conf);
  conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
  conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
      SlowCodec.class.getCanonicalName());
  CompressionCodecFactory.setCodecClasses(conf,
      ImmutableList.<Class> of(SlowCodec.class));
  MiniDFSNNTopology topology = new MiniDFSNNTopology()
      .addNameservice(new MiniDFSNNTopology.NSConf("ns1").addNN(
          new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001)).addNN(
          new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
  cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
      .numDataNodes(1).manageNameDfsSharedDirs(false).build();
  cluster.waitActive();
}
 

public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
    throws IOException {
  CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
  CompressionCodec inputCodec = codecs.getCodec(inputPath);

  FileSystem ifs = inputPath.getFileSystem(conf);
  FSDataInputStream fileIn = ifs.open(inputPath);

  if (inputCodec == null) {
    decompressor = null;
    coreInputStream = fileIn;
  } else {
    decompressor = CodecPool.getDecompressor(inputCodec);
    coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
  }
}
 
源代码7 项目: hadoop   文件: Anonymizer.java

private JsonGenerator createJsonGenerator(Configuration conf, Path path) 
throws IOException {
  FileSystem outFS = path.getFileSystem(conf);
  CompressionCodec codec =
    new CompressionCodecFactory(conf).getCodec(path);
  OutputStream output;
  Compressor compressor = null;
  if (codec != null) {
    compressor = CodecPool.getCompressor(codec);
    output = codec.createOutputStream(outFS.create(path), compressor);
  } else {
    output = outFS.create(path);
  }

  JsonGenerator outGen = outFactory.createJsonGenerator(output, 
                                                        JsonEncoding.UTF8);
  outGen.useDefaultPrettyPrinter();
  
  return outGen;
}
 

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    if (currentPath>=split.getNumPaths()) {
        return false;
    }

    Path path = split.getPath(currentPath);
    currentPath++;

    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    CompressionCodec codec = factory.getCodec(path);
    key = path.toString();
    FSDataInputStream fileIn = fs.open(path);

    value = codec!=null?codec.createInputStream(fileIn):fileIn;
    return true;
}
 
源代码9 项目: RDFS   文件: JsonObjectMapperParser.java

/**
 * Constructor.
 * 
 * @param path 
 *          Path to the JSON data file, possibly compressed.
 * @param conf
 * @throws IOException
 */
public JsonObjectMapperParser(Path path, Class<? extends T> clazz,
    Configuration conf) throws IOException {
  mapper = new ObjectMapper();
  mapper.configure(
      DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
  this.clazz = clazz;
  FileSystem fs = path.getFileSystem(conf);
  CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path);
  InputStream input;
  if (codec == null) {
    input = fs.open(path);
    decompressor = null;
  } else {
    FSDataInputStream fsdis = fs.open(path);
    decompressor = CodecPool.getDecompressor(codec);
    input = codec.createInputStream(fsdis, decompressor);
  }
  jsonParser = mapper.getJsonFactory().createJsonParser(input);
}
 
源代码10 项目: Hive-XML-SerDe   文件: XmlInputFormat.java

public XmlRecordReader(FileSplit input, JobConf jobConf) throws IOException {
    Configuration conf = jobConf;
    this.startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
    this.endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
    FileSplit split = (FileSplit) input;

    Path file = split.getPath();
    CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
    CompressionCodec codec = compressionCodecs.getCodec(file);
    FileSystem fs = file.getFileSystem(conf);
    if (codec != null) {
        this.fsin = new DataInputStream(codec.createInputStream(fs.open(file)));
        //Data read only happens in first split and invalid other splits.
        //This is to avoid reading duplicate data for compressed files.
        this.start = (split.getStart() == 0) ? 0 : Long.MAX_VALUE;
        this.end = Long.MAX_VALUE;
    } else {
        this.start = split.getStart();
        this.end = this.start + split.getLength();
        FSDataInputStream fileIn = fs.open(file);
        fileIn.seek(this.start);
        this.fsin = fileIn;
    }
    this.recordStartPos = this.start;
    this.pos = this.start;
}
 

public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
    throws IOException {
  CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
  CompressionCodec inputCodec = codecs.getCodec(inputPath);

  FileSystem ifs = inputPath.getFileSystem(conf);
  FSDataInputStream fileIn = ifs.open(inputPath);

  if (inputCodec == null) {
    decompressor = null;
    coreInputStream = fileIn;
  } else {
    decompressor = CodecPool.getDecompressor(inputCodec);
    coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
  }
}
 
源代码12 项目: big-c   文件: Anonymizer.java

private JsonGenerator createJsonGenerator(Configuration conf, Path path) 
throws IOException {
  FileSystem outFS = path.getFileSystem(conf);
  CompressionCodec codec =
    new CompressionCodecFactory(conf).getCodec(path);
  OutputStream output;
  Compressor compressor = null;
  if (codec != null) {
    compressor = CodecPool.getCompressor(codec);
    output = codec.createOutputStream(outFS.create(path), compressor);
  } else {
    output = outFS.create(path);
  }

  JsonGenerator outGen = outFactory.createJsonGenerator(output, 
                                                        JsonEncoding.UTF8);
  outGen.useDefaultPrettyPrinter();
  
  return outGen;
}
 
源代码13 项目: tajo   文件: TestInsertQuery.java

@Test
public final void testInsertOverwriteWithCompression() throws Exception {
  String tableName = IdentifierUtil.normalizeIdentifier("testInsertOverwriteWithCompression");
  ResultSet res = executeFile("testInsertOverwriteWithCompression_ddl.sql");
  res.close();

  CatalogService catalog = testingCluster.getMaster().getCatalog();
  assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));

  res = executeQuery();
  res.close();
  TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
  if (!testingCluster.isHiveCatalogStoreRunning()) {
    assertEquals(2, desc.getStats().getNumRows().intValue());
  }

  FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
  assertTrue(fs.exists(new Path(desc.getUri())));
  CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());

  for (FileStatus file : fs.listStatus(new Path(desc.getUri()))) {
    CompressionCodec codec = factory.getCodec(file.getPath());
    assertTrue(codec instanceof DeflateCodec);
  }
  executeString("DROP TABLE " + tableName + " PURGE");
}
 

private InputStream openFile(Path path) throws IOException {
        CompressionCodec codec=new CompressionCodecFactory(conf).getCodec(path);
 	FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
	// check if compressed
	if (codec==null) { // uncompressed
		return fileIn;
	} else { // compressed
		Decompressor decompressor = CodecPool.getDecompressor(codec);
		this.openDecompressors.add(decompressor); // to be returned later using close
		if (codec instanceof SplittableCompressionCodec) {
			long end = dfsCluster.getFileSystem().getFileStatus(path).getLen(); 
        		final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
					return cIn;
      		} else {
        		return codec.createInputStream(fileIn,decompressor);
      		}
	}
}
 

private InputStream openFile(Path path) throws IOException {
        CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path);
 	FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
	// check if compressed
	if (codec==null) { // uncompressed
		return fileIn;
	} else { // compressed
		Decompressor decompressor = CodecPool.getDecompressor(codec);
		this.openDecompressors.add(decompressor); // to be returned later using close
		if (codec instanceof SplittableCompressionCodec) {
			long end = dfsCluster.getFileSystem().getFileStatus(path).getLen(); 
        		final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
					return cIn;
      		} else {
        		return codec.createInputStream(fileIn,decompressor);
      		}
	}
}
 
源代码16 项目: Flink-CEPplus   文件: SequenceFileWriter.java

@Override
public void open(FileSystem fs, Path path) throws IOException {
	super.open(fs, path);
	if (keyClass == null) {
		throw new IllegalStateException("Key Class has not been initialized.");
	}
	if (valueClass == null) {
		throw new IllegalStateException("Value Class has not been initialized.");
	}

	CompressionCodec codec = null;

	Configuration conf = fs.getConf();

	if (!compressionCodecName.equals("None")) {
		CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
		codec = codecFactory.getCodecByName(compressionCodecName);
		if (codec == null) {
			throw new RuntimeException("Codec " + compressionCodecName + " not found.");
		}
	}

	// the non-deprecated constructor syntax is only available in recent hadoop versions...
	writer = SequenceFile.createWriter(conf,
			getStream(),
			keyClass,
			valueClass,
			compressionType,
			codec);
}
 
源代码17 项目: tez   文件: TestIFile.java

@Before
public void setUp() throws Exception {
  CompressionCodecFactory codecFactory = new CompressionCodecFactory(new
      Configuration());
  codec = codecFactory.getCodecByClassName("org.apache.hadoop.io.compress.DefaultCodec");
  outputPath = new Path(workDir, outputFileName);
}
 
源代码18 项目: flink   文件: SequenceFileWriter.java

@Override
public void open(FileSystem fs, Path path) throws IOException {
	super.open(fs, path);
	if (keyClass == null) {
		throw new IllegalStateException("Key Class has not been initialized.");
	}
	if (valueClass == null) {
		throw new IllegalStateException("Value Class has not been initialized.");
	}

	CompressionCodec codec = null;

	Configuration conf = fs.getConf();

	if (!compressionCodecName.equals("None")) {
		CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
		codec = codecFactory.getCodecByName(compressionCodecName);
		if (codec == null) {
			throw new RuntimeException("Codec " + compressionCodecName + " not found.");
		}
	}

	// the non-deprecated constructor syntax is only available in recent hadoop versions...
	writer = SequenceFile.createWriter(conf,
			getStream(),
			keyClass,
			valueClass,
			compressionType,
			codec);
}
 
源代码19 项目: flink   文件: SequenceFileWriterFactory.java

private CompressionCodec getCompressionCodec(Configuration conf, String compressionCodecName) {
	checkNotNull(conf);
	checkNotNull(compressionCodecName);

	if (compressionCodecName.equals(NO_COMPRESSION)) {
		return null;
	}

	CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
	CompressionCodec codec = codecFactory.getCodecByName(compressionCodecName);
	if (codec == null) {
		throw new RuntimeException("Codec " + compressionCodecName + " not found.");
	}
	return codec;
}
 
源代码20 项目: jstorm   文件: HdfsState.java

@Override
void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
    LOG.info("Preparing Sequence File State...");
    if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified.");

    this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
    this.codecFactory = new CompressionCodecFactory(hdfsConfig);
}
 
源代码21 项目: tajo   文件: DelimitedLineReader.java

public DelimitedLineReader(Configuration conf, final AbstractFileFragment fragment, int bufferSize)
    throws IOException {
  this.fragment = fragment;
  this.conf = conf;
  this.factory = new CompressionCodecFactory(conf);
  this.codec = factory.getCodec(fragment.getPath());
  this.bufferSize = bufferSize;
  if (this.codec instanceof SplittableCompressionCodec) {
    // bzip2 does not support multi-thread model
    throw new TajoRuntimeException(
        new NotImplementedException(this.getClass() + " does not support " + this.codec.getDefaultExtension()));
  }
}
 

/**
 * Returns the configured CompressionCodec, or null if none is configured.
 *
 * @param context
 *            the ProcessContext
 * @param configuration
 *            the Hadoop Configuration
 * @return CompressionCodec or null
 */
protected org.apache.hadoop.io.compress.CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
    org.apache.hadoop.io.compress.CompressionCodec codec = null;
    if (context.getProperty(COMPRESSION_CODEC).isSet()) {
        String compressionClassname = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString();
        CompressionCodecFactory ccf = new CompressionCodecFactory(configuration);
        codec = ccf.getCodecByClassName(compressionClassname);
    }

    return codec;
}
 
源代码23 项目: pxf   文件: JsonRecordReader.java

/**
 * Create new multi-line json object reader.
 *
 * @param conf  Hadoop context
 * @param split HDFS split to start the reading from
 * @throws IOException IOException when reading the file
 */
public JsonRecordReader(JobConf conf, FileSplit split) throws IOException {

    this.jsonMemberName = conf.get(RECORD_MEMBER_IDENTIFIER);
    this.maxObjectLength = conf.getInt(RECORD_MAX_LENGTH, Integer.MAX_VALUE);

    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();
    compressionCodecs = new CompressionCodecFactory(conf);
    final CompressionCodec codec = compressionCodecs.getCodec(file);

    // openForWrite the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(conf);
    FSDataInputStream fileIn = fs.open(split.getPath());
    if (codec != null) {
        is = codec.createInputStream(fileIn);
        start = 0;
        end = Long.MAX_VALUE;
    } else {
        if (start != 0) {
            fileIn.seek(start);
        }
        is = fileIn;
    }
    parser = new PartitionedJsonParser(is);
    this.pos = start;
}
 
源代码24 项目: pxf   文件: CodecFactory.java

/**
 * Helper routine to get compression codec class by path (file suffix).
 *
 * @param path path of file to get codec for
 * @return matching codec class for the path. null if no codec is needed.
 */
private Class<? extends CompressionCodec> getCodecClassByPath(Configuration config, String path) {
    Class<? extends CompressionCodec> codecClass = null;
    CompressionCodecFactory factory = new CompressionCodecFactory(config);
    CompressionCodec codec = factory.getCodec(new Path(path));
    if (codec != null) {
        codecClass = codec.getClass();
    }
    if (LOG.isDebugEnabled()) {
        String msg = (codecClass == null ? "No codec" : "Codec " + codecClass);
        LOG.debug("{} was found for file {}", msg, path);
    }
    return codecClass;
}
 
源代码25 项目: ViraPipe   文件: InterleaveMulti.java

private static void decompress(FileSystem fs, String in, String outpath) throws IOException {

    Configuration conf = new Configuration();
    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    CompressionCodec codec = factory.getCodec(new Path(in));
    //Decompressing zip file.
    InputStream is = codec.createInputStream(fs.open(new Path(in)));
    OutputStream out = fs.create(new Path(outpath));
    //Write decompressed out
    IOUtils.copyBytes(is, out, conf);
    is.close();
    out.close();
  }
 
源代码26 项目: ViraPipe   文件: Decompress.java

private static void decompress(FileSystem fs, String in, String outpath) throws IOException {
  Configuration conf = new Configuration();
  CompressionCodecFactory factory = new CompressionCodecFactory(conf);
  // the correct codec will be discovered by the extension of the file

  CompressionCodec codec = factory.getCodec(new Path(in));
  //Decompressing zip file.
  InputStream is = codec.createInputStream(fs.open(new Path(in)));
  OutputStream out = fs.create(new Path(outpath));
  //Write decompressed out
  IOUtils.copyBytes(is, out, conf);
  is.close();
  out.close();
}
 
源代码27 项目: ViraPipe   文件: DecompressInterleave.java

private static FileStatus decompress(FileSystem fs, String in, String outpath) throws IOException {
   Configuration conf = new Configuration();
  CompressionCodecFactory factory = new CompressionCodecFactory(conf);

  CompressionCodec codec = factory.getCodec(new Path(in));
  //Decompressing zip file.
  InputStream is = codec.createInputStream(fs.open(new Path(in)));
  OutputStream out = fs.create(new Path(outpath));
  //Write decompressed out
  IOUtils.copyBytes(is, out, conf);
  is.close();
  out.close();
  return fs.getFileStatus(new Path(outpath));
}
 
源代码28 项目: hadoop-gpu   文件: LineRecordReader.java

public LineRecordReader(Configuration job, 
                        FileSplit split) throws IOException {
  this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                  Integer.MAX_VALUE);
  start = split.getStart();
  end = start + split.getLength();
  final Path file = split.getPath();
  compressionCodecs = new CompressionCodecFactory(job);
  final CompressionCodec codec = compressionCodecs.getCodec(file);

  // open the file and seek to the start of the split
  FileSystem fs = file.getFileSystem(job);
  FSDataInputStream fileIn = fs.open(split.getPath());
  boolean skipFirstLine = false;
  if (codec != null) {
    in = new LineReader(codec.createInputStream(fileIn), job);
    end = Long.MAX_VALUE;
  } else {
    if (start != 0) {
      skipFirstLine = true;
      --start;
      fileIn.seek(start);
    }
    in = new LineReader(fileIn, job);
  }
  if (skipFirstLine) {  // skip first line and re-establish "start".
    start += in.readLine(new Text(), 0,
                         (int)Math.min((long)Integer.MAX_VALUE, end - start));
  }
  this.pos = start;
}
 

@Test
public final void testColumnPartitionedTableByOneColumnsWithCompression() throws Exception {
  String tableName = "testColumnPartitionedTableByOneColumnsWithCompression";
  ResultSet res = executeString(
      "create table " + tableName + " (col2 int4, col3 float8) USING csv " +
          "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
          "PARTITION BY column(col1 int4)");
  res.close();
  assertTrue(catalog.existsTable(tableName));

  res = executeString(
      "insert overwrite into " + tableName + " select l_partkey, l_quantity, l_orderkey from lineitem");
  res.close();
  TableDesc desc = catalog.getTableDesc(tableName);
  assertEquals(5, desc.getStats().getNumRows().intValue());

  FileSystem fs = FileSystem.get(conf);
  assertTrue(fs.exists(desc.getPath()));
  CompressionCodecFactory factory = new CompressionCodecFactory(conf);

  Path path = desc.getPath();
  assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
  assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
  assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));

  for (FileStatus partition : fs.listStatus(path)){
    assertTrue(fs.isDirectory(partition.getPath()));
    for (FileStatus file : fs.listStatus(partition.getPath())) {
      CompressionCodec codec = factory.getCodec(file.getPath());
      assertTrue(codec instanceof DeflateCodec);
    }
  }
}
 

/**
 * Create a data file that gets exported to the db.
 * @param fileNum the number of the file (for multi-file export)
 * @param numRecords how many records to write to the file.
 * @param gzip is true if the file should be gzipped.
 */
protected void createTextFile(int fileNum, int numRecords, boolean gzip,
    ColumnGenerator... extraCols) throws IOException {
  int startId = fileNum * numRecords;

  String ext = ".txt";
  if (gzip) {
    ext = ext + ".gz";
  }
  Path tablePath = getTablePath();
  Path filePath = new Path(tablePath, "part" + fileNum + ext);

  Configuration conf = new Configuration();
  if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
    conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
  }
  FileSystem fs = FileSystem.get(conf);
  fs.mkdirs(tablePath);
  OutputStream os = fs.create(filePath);
  if (gzip) {
    CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
    CompressionCodec codec = ccf.getCodec(filePath);
    os = codec.createOutputStream(os);
  }
  BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
  for (int i = 0; i < numRecords; i++) {
    w.write(getRecordLine(startId + i, extraCols));
  }
  w.close();
  os.close();

  if (gzip) {
    verifyCompressedFile(filePath, numRecords);
  }
}
 
 类所在包
 同包方法