下面列出了org.apache.hadoop.io.compress.CompressionCodecFactory# getCodecByClassName ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public OutputStream getOutputStream() throws IOException {
// Filename
String fullyQualifiedExportFilePath = buildOutputFilePath();
// OutputStream
OutputStream rawOutputStream =fileSystem.newOutputStream(fullyQualifiedExportFilePath,
new DistributedFileOpenOption(exportParams.getReplicationCount(),StandardOpenOption.CREATE_NEW));
if (exportParams.getCompression()==COMPRESSION.BZ2) {
Configuration conf = new Configuration();
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodecByClassName("org.apache.hadoop.io.compress.BZip2Codec");
return codec.createOutputStream(rawOutputStream);
}
else if (exportParams.getCompression()==COMPRESSION.GZ) {
return new GZIPOutputStream(rawOutputStream);
}
else {
return rawOutputStream;
}
}
/**
* Returns the configured CompressionCodec, or null if none is configured.
*
* @param context
* the ProcessContext
* @param configuration
* the Hadoop Configuration
* @return CompressionCodec or null
*/
protected org.apache.hadoop.io.compress.CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
org.apache.hadoop.io.compress.CompressionCodec codec = null;
if (context.getProperty(COMPRESSION_CODEC).isSet()) {
String compressionClassname = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString();
CompressionCodecFactory ccf = new CompressionCodecFactory(configuration);
codec = ccf.getCodecByClassName(compressionClassname);
}
return codec;
}
/**
* Create a compression instance using the codec specified by
* <code>codecClassName</code>
*/
static FSImageCompression createCompression(Configuration conf,
String codecClassName)
throws IOException {
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodecByClassName(codecClassName);
if (codec == null) {
throw new IOException("Not a supported codec: " + codecClassName);
}
return new FSImageCompression(codec);
}
/**
* Create a compression instance using the codec specified by
* <code>codecClassName</code>
*/
static FSImageCompression createCompression(Configuration conf,
String codecClassName)
throws IOException {
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodecByClassName(codecClassName);
if (codec == null) {
throw new IOException("Not a supported codec: " + codecClassName);
}
return new FSImageCompression(codec);
}
public static CompressionCodec createCompressionCodec(String className)
throws Exception {
Configuration configuration = new Configuration();
CompressionCodecFactory.setCodecClasses(configuration,new LinkedList<Class>(Collections.singletonList(Class.forName(className))));
CompressionCodecFactory ccf = new CompressionCodecFactory(
configuration);
return ccf.getCodecByClassName(className);
}
/**
* Create a compression instance using the codec specified by
* <code>codecClassName</code>
*/
private static FSImageCompression createCompression(Configuration conf,
String codecClassName)
throws IOException {
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodecByClassName(codecClassName);
if (codec == null) {
throw new IOException("Not a supported codec: " + codecClassName);
}
return new FSImageCompression(codec);
}
/**
* Returns the configured CompressionCodec, or null if none is configured.
*
* @param context
* the ProcessContext
* @param configuration
* the Hadoop Configuration
* @return CompressionCodec or null
*/
protected org.apache.hadoop.io.compress.CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
org.apache.hadoop.io.compress.CompressionCodec codec = null;
if (context.getProperty(COMPRESSION_CODEC).isSet()) {
String compressionClassname = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString();
CompressionCodecFactory ccf = new CompressionCodecFactory(configuration);
codec = ccf.getCodecByClassName(compressionClassname);
}
return codec;
}
@Before
public void setUp() throws Exception {
CompressionCodecFactory codecFactory = new CompressionCodecFactory(new
Configuration());
codec = codecFactory.getCodecByClassName("org.apache.hadoop.io.compress.DefaultCodec");
outputPath = new Path(workDir, outputFileName);
}
@Before
public void setUp() throws Exception {
CompressionCodecFactory codecFactory = new CompressionCodecFactory(new
Configuration());
codec = codecFactory.getCodecByClassName("org.apache.hadoop.io.compress.DefaultCodec");
outputPath = new Path(workDir, outputFileName);
}
@Override
public void init() throws IOException {
os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
this.fs = path.getFileSystem(conf);
// Set value of non-deprecated key for backward compatibility.
if (!meta.containsProperty(StorageConstants.TEXT_DELIMITER)
&& meta.containsProperty(StorageConstants.SEQUENCEFILE_DELIMITER)) {
this.delimiter = StringEscapeUtils.unescapeJava(meta.getProperty(StorageConstants.SEQUENCEFILE_DELIMITER,
StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
} else {
this.delimiter = StringEscapeUtils.unescapeJava(meta.getProperty(StorageConstants.TEXT_DELIMITER,
StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0);
}
String nullCharacters;
if (!meta.containsProperty(StorageConstants.TEXT_NULL)
&& meta.containsProperty(StorageConstants.SEQUENCEFILE_NULL)) {
nullCharacters = StringEscapeUtils.unescapeJava(meta.getProperty(StorageConstants.SEQUENCEFILE_NULL,
NullDatum.DEFAULT_TEXT));
} else {
nullCharacters = StringEscapeUtils.unescapeJava(meta.getProperty(StorageConstants.TEXT_NULL,
NullDatum.DEFAULT_TEXT));
}
if (StringUtils.isEmpty(nullCharacters)) {
nullChars = NullDatum.get().asTextBytes();
} else {
nullChars = nullCharacters.getBytes();
}
this.columnNum = schema.size();
if(this.meta.containsProperty(StorageConstants.COMPRESSION_CODEC)) {
String codecName = this.meta.getProperty(StorageConstants.COMPRESSION_CODEC);
codecFactory = new CompressionCodecFactory(conf);
codec = codecFactory.getCodecByClassName(codecName);
} else {
if (fs.exists(path)) {
throw new AlreadyExistsStorageException(path);
}
}
try {
String serdeClass = this.meta.getProperty(StorageConstants.SEQUENCEFILE_SERDE,
TextSerializerDeserializer.class.getName());
serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
serde.init(schema);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw new IOException(e);
}
Class<? extends Writable> keyClass, valueClass;
if (serde instanceof BinarySerializerDeserializer) {
keyClass = BytesWritable.class;
EMPTY_KEY = new BytesWritable();
valueClass = BytesWritable.class;
} else {
keyClass = LongWritable.class;
EMPTY_KEY = new LongWritable();
valueClass = Text.class;
}
String type = this.meta.getProperty(StorageConstants.COMPRESSION_TYPE, CompressionType.NONE.name());
if (type.equals(CompressionType.BLOCK.name())) {
writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.BLOCK, codec);
} else if (type.equals(CompressionType.RECORD.name())) {
writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.RECORD, codec);
} else {
writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.NONE, codec);
}
if (tableStatsEnabled) {
this.stats = new TableStatistics(this.schema, columnStatsEnabled);
}
super.init();
}
static CompressionCodec getLZOCodec(Configuration hconf) {
CompressionCodecFactory factory = new CompressionCodecFactory(hconf);
return factory.getCodecByClassName("org.apache.hadoop.io.compress.LzoCodec");
}
@Override
public DataSet<ExecRow> write() throws StandardException{
Integer count;
String extension = ".csv";
long start = System.currentTimeMillis();
SpliceOperation operation=exportFunction.getOperation();
COMPRESSION compressionAlgorithm = null;
CompressionCodec codec = null;
if(operation instanceof ExportOperation){
ExportOperation op=(ExportOperation)exportFunction.getOperation();
compressionAlgorithm=op.getExportParams().getCompression();
if(compressionAlgorithm == COMPRESSION.BZ2){
extension+=".bz2";
}
else if (compressionAlgorithm == COMPRESSION.GZ) {
extension+=".gz";
}
}
try{
final DistributedFileSystem dfs=SIDriver.driver().getSIEnvironment().fileSystem(path);
dfs.createDirectory(path,false);
// The 'part-r-00000' naming convention is what spark uses so we are consistent on control side
try(OutputStream fileOut =dfs.newOutputStream(path /*directory*/,"part-r-00000"+extension/*file*/,StandardOpenOption.CREATE)){
OutputStream toWrite=fileOut;
if(compressionAlgorithm == COMPRESSION.BZ2){
Configuration conf = new Configuration();
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
codec = factory.getCodecByClassName("org.apache.hadoop.io.compress.BZip2Codec");
toWrite=codec.createOutputStream(fileOut);;
}
else if (compressionAlgorithm == COMPRESSION.GZ){
toWrite=new GZIPOutputStream(fileOut);
}
count=exportFunction.call(toWrite,dataSet.toLocalIterator());
}
dfs.touchFile(path, ExportFile.SUCCESS_FILE);
} catch (Exception e) {
throw new RuntimeException(e);
}
long end = System.currentTimeMillis();
ValueRow valueRow = new ValueRow(2);
valueRow.setColumn(1,new SQLLongint(count));
valueRow.setColumn(2,new SQLLongint(end-start));
return new ControlDataSet<>(new SingletonIterator(valueRow));
}
@Test(timeout = 5000)
public void testConcatenatedZlibPadding()
throws IOException, URISyntaxException {
byte[] bytes;
long compTotal = 0;
// Known raw and compressed lengths of input
long raws[] = { 2392, 102314, 42576, 31432, 25090 };
long compressed[] = { 723, 25396, 10926, 8203, 6665 };
CompressionCodecFactory codecFactory = new CompressionCodecFactory(new
Configuration());
codec = codecFactory.getCodecByClassName("org.apache.hadoop.io.compress.DefaultCodec");
URL url = getClass().getClassLoader()
.getResource("TestIFile_concatenated_compressed.bin");
assertNotEquals("IFileinput file must exist", null, url);
Path p = new Path(url.toURI());
FSDataInputStream inStream = localFs.open(p);
for (int i = 0; i < 5; i++) {
bytes = new byte[(int) raws[i]];
assertEquals("Compressed stream out-of-sync", inStream.getPos(), compTotal);
IFile.Reader.readToMemory(bytes, inStream, (int) compressed[i], codec,
false, -1);
compTotal += compressed[i];
// Now read the data
InMemoryReader inMemReader = new InMemoryReader(null,
new InputAttemptIdentifier(0, 0), bytes, 0, bytes.length);
DataInputBuffer keyIn = new DataInputBuffer();
DataInputBuffer valIn = new DataInputBuffer();
Deserializer<Text> keyDeserializer;
Deserializer<IntWritable> valDeserializer;
SerializationFactory serializationFactory =
new SerializationFactory(defaultConf);
keyDeserializer = serializationFactory.getDeserializer(Text.class);
valDeserializer = serializationFactory.getDeserializer(IntWritable.class);
keyDeserializer.open(keyIn);
valDeserializer.open(valIn);
while (inMemReader.nextRawKey(keyIn)) {
inMemReader.nextRawValue(valIn);
}
}
inStream.close();
}