下面列出了怎么用org.apache.hadoop.io.compress.CompressionCodecFactory的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testWriteCompressedFile() throws Exception {
final File folder = TEMPORARY_FOLDER.newFolder();
final Path testPath = Path.fromLocalFile(folder);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(100);
DataStream<String> stream = env.addSource(
new FiniteTestSource<>(testData),
TypeInformation.of(String.class)
);
stream.map(str -> str).addSink(
StreamingFileSink.forBulkFormat(
testPath,
CompressWriters.forExtractor(new DefaultExtractor<String>()).withHadoopCompression(TEST_CODEC_NAME)
).build());
env.execute();
validateResults(folder, testData, new CompressionCodecFactory(configuration).getCodecByName(TEST_CODEC_NAME));
}
@Test
public final void testInsertOverwriteLocationWithCompression() throws Exception {
if (!testingCluster.isHiveCatalogStoreRunning()) {
ResultSet res = executeQuery();
res.close();
FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
Path path = new Path("/tajo-data/testInsertOverwriteLocationWithCompression");
assertTrue(fs.exists(path));
assertEquals(1, fs.listStatus(path).length);
CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
for (FileStatus file : fs.listStatus(path)){
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
}
}
}
@Override
public void initialize(final InputSplit genericSplit, final TaskAttemptContext context) throws IOException {
final FileSplit split = (FileSplit) genericSplit;
final Configuration configuration = context.getConfiguration();
if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null)
this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER);
this.gryoReader = GryoReader.build().mapper(
GryoMapper.build().addRegistries(IoRegistryHelper.createRegistries(ConfUtil.makeApacheConfiguration(configuration))).create()).create();
long start = split.getStart();
final Path file = split.getPath();
if (null != new CompressionCodecFactory(configuration).getCodec(file)) {
throw new IllegalStateException("Compression is not supported for the (binary) Gryo format");
}
// open the file and seek to the start of the split
this.inputStream = file.getFileSystem(configuration).open(split.getPath());
this.splitLength = split.getLength();
if (this.splitLength > 0) this.splitLength -= (seekToHeader(this.inputStream, start) - start);
}
protected Configuration setupCommonConfig() {
tmpOivImgDir = Files.createTempDir();
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY,
tmpOivImgDir.getAbsolutePath());
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
SlowCodec.class.getCanonicalName());
CompressionCodecFactory.setCodecClasses(conf,
ImmutableList.<Class>of(SlowCodec.class));
return conf;
}
@Before
public void setUp() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
.createJournalURI("/bootstrapStandby").toString());
BKJMUtil.addJournalManagerDefinition(conf);
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
SlowCodec.class.getCanonicalName());
CompressionCodecFactory.setCodecClasses(conf,
ImmutableList.<Class> of(SlowCodec.class));
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1").addNN(
new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001)).addNN(
new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
.numDataNodes(1).manageNameDfsSharedDirs(false).build();
cluster.waitActive();
}
public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
throws IOException {
CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
CompressionCodec inputCodec = codecs.getCodec(inputPath);
FileSystem ifs = inputPath.getFileSystem(conf);
FSDataInputStream fileIn = ifs.open(inputPath);
if (inputCodec == null) {
decompressor = null;
coreInputStream = fileIn;
} else {
decompressor = CodecPool.getDecompressor(inputCodec);
coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
}
}
private JsonGenerator createJsonGenerator(Configuration conf, Path path)
throws IOException {
FileSystem outFS = path.getFileSystem(conf);
CompressionCodec codec =
new CompressionCodecFactory(conf).getCodec(path);
OutputStream output;
Compressor compressor = null;
if (codec != null) {
compressor = CodecPool.getCompressor(codec);
output = codec.createOutputStream(outFS.create(path), compressor);
} else {
output = outFS.create(path);
}
JsonGenerator outGen = outFactory.createJsonGenerator(output,
JsonEncoding.UTF8);
outGen.useDefaultPrettyPrinter();
return outGen;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (currentPath>=split.getNumPaths()) {
return false;
}
Path path = split.getPath(currentPath);
currentPath++;
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(path);
key = path.toString();
FSDataInputStream fileIn = fs.open(path);
value = codec!=null?codec.createInputStream(fileIn):fileIn;
return true;
}
/**
* Constructor.
*
* @param path
* Path to the JSON data file, possibly compressed.
* @param conf
* @throws IOException
*/
public JsonObjectMapperParser(Path path, Class<? extends T> clazz,
Configuration conf) throws IOException {
mapper = new ObjectMapper();
mapper.configure(
DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
this.clazz = clazz;
FileSystem fs = path.getFileSystem(conf);
CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path);
InputStream input;
if (codec == null) {
input = fs.open(path);
decompressor = null;
} else {
FSDataInputStream fsdis = fs.open(path);
decompressor = CodecPool.getDecompressor(codec);
input = codec.createInputStream(fsdis, decompressor);
}
jsonParser = mapper.getJsonFactory().createJsonParser(input);
}
public XmlRecordReader(FileSplit input, JobConf jobConf) throws IOException {
Configuration conf = jobConf;
this.startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
this.endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
FileSplit split = (FileSplit) input;
Path file = split.getPath();
CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
CompressionCodec codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(conf);
if (codec != null) {
this.fsin = new DataInputStream(codec.createInputStream(fs.open(file)));
//Data read only happens in first split and invalid other splits.
//This is to avoid reading duplicate data for compressed files.
this.start = (split.getStart() == 0) ? 0 : Long.MAX_VALUE;
this.end = Long.MAX_VALUE;
} else {
this.start = split.getStart();
this.end = this.start + split.getLength();
FSDataInputStream fileIn = fs.open(file);
fileIn.seek(this.start);
this.fsin = fileIn;
}
this.recordStartPos = this.start;
this.pos = this.start;
}
public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
throws IOException {
CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
CompressionCodec inputCodec = codecs.getCodec(inputPath);
FileSystem ifs = inputPath.getFileSystem(conf);
FSDataInputStream fileIn = ifs.open(inputPath);
if (inputCodec == null) {
decompressor = null;
coreInputStream = fileIn;
} else {
decompressor = CodecPool.getDecompressor(inputCodec);
coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
}
}
private JsonGenerator createJsonGenerator(Configuration conf, Path path)
throws IOException {
FileSystem outFS = path.getFileSystem(conf);
CompressionCodec codec =
new CompressionCodecFactory(conf).getCodec(path);
OutputStream output;
Compressor compressor = null;
if (codec != null) {
compressor = CodecPool.getCompressor(codec);
output = codec.createOutputStream(outFS.create(path), compressor);
} else {
output = outFS.create(path);
}
JsonGenerator outGen = outFactory.createJsonGenerator(output,
JsonEncoding.UTF8);
outGen.useDefaultPrettyPrinter();
return outGen;
}
@Test
public final void testInsertOverwriteWithCompression() throws Exception {
String tableName = IdentifierUtil.normalizeIdentifier("testInsertOverwriteWithCompression");
ResultSet res = executeFile("testInsertOverwriteWithCompression_ddl.sql");
res.close();
CatalogService catalog = testingCluster.getMaster().getCatalog();
assertTrue(catalog.existsTable(getCurrentDatabase(), tableName));
res = executeQuery();
res.close();
TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName);
if (!testingCluster.isHiveCatalogStoreRunning()) {
assertEquals(2, desc.getStats().getNumRows().intValue());
}
FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
assertTrue(fs.exists(new Path(desc.getUri())));
CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
for (FileStatus file : fs.listStatus(new Path(desc.getUri()))) {
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
}
executeString("DROP TABLE " + tableName + " PURGE");
}
private InputStream openFile(Path path) throws IOException {
CompressionCodec codec=new CompressionCodecFactory(conf).getCodec(path);
FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
// check if compressed
if (codec==null) { // uncompressed
return fileIn;
} else { // compressed
Decompressor decompressor = CodecPool.getDecompressor(codec);
this.openDecompressors.add(decompressor); // to be returned later using close
if (codec instanceof SplittableCompressionCodec) {
long end = dfsCluster.getFileSystem().getFileStatus(path).getLen();
final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
return cIn;
} else {
return codec.createInputStream(fileIn,decompressor);
}
}
}
private InputStream openFile(Path path) throws IOException {
CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path);
FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
// check if compressed
if (codec==null) { // uncompressed
return fileIn;
} else { // compressed
Decompressor decompressor = CodecPool.getDecompressor(codec);
this.openDecompressors.add(decompressor); // to be returned later using close
if (codec instanceof SplittableCompressionCodec) {
long end = dfsCluster.getFileSystem().getFileStatus(path).getLen();
final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
return cIn;
} else {
return codec.createInputStream(fileIn,decompressor);
}
}
}
@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
if (keyClass == null) {
throw new IllegalStateException("Key Class has not been initialized.");
}
if (valueClass == null) {
throw new IllegalStateException("Value Class has not been initialized.");
}
CompressionCodec codec = null;
Configuration conf = fs.getConf();
if (!compressionCodecName.equals("None")) {
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
codec = codecFactory.getCodecByName(compressionCodecName);
if (codec == null) {
throw new RuntimeException("Codec " + compressionCodecName + " not found.");
}
}
// the non-deprecated constructor syntax is only available in recent hadoop versions...
writer = SequenceFile.createWriter(conf,
getStream(),
keyClass,
valueClass,
compressionType,
codec);
}
@Before
public void setUp() throws Exception {
CompressionCodecFactory codecFactory = new CompressionCodecFactory(new
Configuration());
codec = codecFactory.getCodecByClassName("org.apache.hadoop.io.compress.DefaultCodec");
outputPath = new Path(workDir, outputFileName);
}
@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
if (keyClass == null) {
throw new IllegalStateException("Key Class has not been initialized.");
}
if (valueClass == null) {
throw new IllegalStateException("Value Class has not been initialized.");
}
CompressionCodec codec = null;
Configuration conf = fs.getConf();
if (!compressionCodecName.equals("None")) {
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
codec = codecFactory.getCodecByName(compressionCodecName);
if (codec == null) {
throw new RuntimeException("Codec " + compressionCodecName + " not found.");
}
}
// the non-deprecated constructor syntax is only available in recent hadoop versions...
writer = SequenceFile.createWriter(conf,
getStream(),
keyClass,
valueClass,
compressionType,
codec);
}
private CompressionCodec getCompressionCodec(Configuration conf, String compressionCodecName) {
checkNotNull(conf);
checkNotNull(compressionCodecName);
if (compressionCodecName.equals(NO_COMPRESSION)) {
return null;
}
CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
CompressionCodec codec = codecFactory.getCodecByName(compressionCodecName);
if (codec == null) {
throw new RuntimeException("Codec " + compressionCodecName + " not found.");
}
return codec;
}
@Override
void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
LOG.info("Preparing Sequence File State...");
if (this.format == null) throw new IllegalStateException("SequenceFormat must be specified.");
this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
this.codecFactory = new CompressionCodecFactory(hdfsConfig);
}
public DelimitedLineReader(Configuration conf, final AbstractFileFragment fragment, int bufferSize)
throws IOException {
this.fragment = fragment;
this.conf = conf;
this.factory = new CompressionCodecFactory(conf);
this.codec = factory.getCodec(fragment.getPath());
this.bufferSize = bufferSize;
if (this.codec instanceof SplittableCompressionCodec) {
// bzip2 does not support multi-thread model
throw new TajoRuntimeException(
new NotImplementedException(this.getClass() + " does not support " + this.codec.getDefaultExtension()));
}
}
/**
* Returns the configured CompressionCodec, or null if none is configured.
*
* @param context
* the ProcessContext
* @param configuration
* the Hadoop Configuration
* @return CompressionCodec or null
*/
protected org.apache.hadoop.io.compress.CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
org.apache.hadoop.io.compress.CompressionCodec codec = null;
if (context.getProperty(COMPRESSION_CODEC).isSet()) {
String compressionClassname = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString();
CompressionCodecFactory ccf = new CompressionCodecFactory(configuration);
codec = ccf.getCodecByClassName(compressionClassname);
}
return codec;
}
/**
* Create new multi-line json object reader.
*
* @param conf Hadoop context
* @param split HDFS split to start the reading from
* @throws IOException IOException when reading the file
*/
public JsonRecordReader(JobConf conf, FileSplit split) throws IOException {
this.jsonMemberName = conf.get(RECORD_MEMBER_IDENTIFIER);
this.maxObjectLength = conf.getInt(RECORD_MAX_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(conf);
final CompressionCodec codec = compressionCodecs.getCodec(file);
// openForWrite the file and seek to the start of the split
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream fileIn = fs.open(split.getPath());
if (codec != null) {
is = codec.createInputStream(fileIn);
start = 0;
end = Long.MAX_VALUE;
} else {
if (start != 0) {
fileIn.seek(start);
}
is = fileIn;
}
parser = new PartitionedJsonParser(is);
this.pos = start;
}
/**
* Helper routine to get compression codec class by path (file suffix).
*
* @param path path of file to get codec for
* @return matching codec class for the path. null if no codec is needed.
*/
private Class<? extends CompressionCodec> getCodecClassByPath(Configuration config, String path) {
Class<? extends CompressionCodec> codecClass = null;
CompressionCodecFactory factory = new CompressionCodecFactory(config);
CompressionCodec codec = factory.getCodec(new Path(path));
if (codec != null) {
codecClass = codec.getClass();
}
if (LOG.isDebugEnabled()) {
String msg = (codecClass == null ? "No codec" : "Codec " + codecClass);
LOG.debug("{} was found for file {}", msg, path);
}
return codecClass;
}
private static void decompress(FileSystem fs, String in, String outpath) throws IOException {
Configuration conf = new Configuration();
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(new Path(in));
//Decompressing zip file.
InputStream is = codec.createInputStream(fs.open(new Path(in)));
OutputStream out = fs.create(new Path(outpath));
//Write decompressed out
IOUtils.copyBytes(is, out, conf);
is.close();
out.close();
}
private static void decompress(FileSystem fs, String in, String outpath) throws IOException {
Configuration conf = new Configuration();
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
// the correct codec will be discovered by the extension of the file
CompressionCodec codec = factory.getCodec(new Path(in));
//Decompressing zip file.
InputStream is = codec.createInputStream(fs.open(new Path(in)));
OutputStream out = fs.create(new Path(outpath));
//Write decompressed out
IOUtils.copyBytes(is, out, conf);
is.close();
out.close();
}
private static FileStatus decompress(FileSystem fs, String in, String outpath) throws IOException {
Configuration conf = new Configuration();
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(new Path(in));
//Decompressing zip file.
InputStream is = codec.createInputStream(fs.open(new Path(in)));
OutputStream out = fs.create(new Path(outpath));
//Write decompressed out
IOUtils.copyBytes(is, out, conf);
is.close();
out.close();
return fs.getFileStatus(new Path(outpath));
}
public LineRecordReader(Configuration job,
FileSplit split) throws IOException {
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
@Test
public final void testColumnPartitionedTableByOneColumnsWithCompression() throws Exception {
String tableName = "testColumnPartitionedTableByOneColumnsWithCompression";
ResultSet res = executeString(
"create table " + tableName + " (col2 int4, col3 float8) USING csv " +
"WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
"PARTITION BY column(col1 int4)");
res.close();
assertTrue(catalog.existsTable(tableName));
res = executeString(
"insert overwrite into " + tableName + " select l_partkey, l_quantity, l_orderkey from lineitem");
res.close();
TableDesc desc = catalog.getTableDesc(tableName);
assertEquals(5, desc.getStats().getNumRows().intValue());
FileSystem fs = FileSystem.get(conf);
assertTrue(fs.exists(desc.getPath()));
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
Path path = desc.getPath();
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
for (FileStatus partition : fs.listStatus(path)){
assertTrue(fs.isDirectory(partition.getPath()));
for (FileStatus file : fs.listStatus(partition.getPath())) {
CompressionCodec codec = factory.getCodec(file.getPath());
assertTrue(codec instanceof DeflateCodec);
}
}
}
/**
* Create a data file that gets exported to the db.
* @param fileNum the number of the file (for multi-file export)
* @param numRecords how many records to write to the file.
* @param gzip is true if the file should be gzipped.
*/
protected void createTextFile(int fileNum, int numRecords, boolean gzip,
ColumnGenerator... extraCols) throws IOException {
int startId = fileNum * numRecords;
String ext = ".txt";
if (gzip) {
ext = ext + ".gz";
}
Path tablePath = getTablePath();
Path filePath = new Path(tablePath, "part" + fileNum + ext);
Configuration conf = new Configuration();
if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
}
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(tablePath);
OutputStream os = fs.create(filePath);
if (gzip) {
CompressionCodecFactory ccf = new CompressionCodecFactory(conf);
CompressionCodec codec = ccf.getCodec(filePath);
os = codec.createOutputStream(os);
}
BufferedWriter w = new BufferedWriter(new OutputStreamWriter(os));
for (int i = 0; i < numRecords; i++) {
w.write(getRecordLine(startId + i, extraCols));
}
w.close();
os.close();
if (gzip) {
verifyCompressedFile(filePath, numRecords);
}
}