下面列出了怎么用org.apache.hadoop.io.SequenceFile.CompressionType的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* This is used for verification
* Each mapper writes one control file
* control file only contains the base directory written by this mapper
* and the checksum file path so that we could create a Read mapper which
* scanned the files under the base directory and verify the checksum of
* files with the information given in the checksum file.
* @param fs
* @param outputPath base directory of mapper
* @param checksumFile location of checksum file
* @param name name of control file
* @throws IOException
*/
private void writeControlFile(FileSystem fs, Path outputPath,
Path checksumFile, String name) throws IOException {
SequenceFile.Writer write = null;
try {
Path parentDir = new Path(rtc.input, "filelists");
if (!fs.exists(parentDir)) {
fs.mkdirs(parentDir);
}
Path controlFile = new Path(parentDir, name);
write = SequenceFile.createWriter(fs, fs.getConf(), controlFile,
Text.class, Text.class, CompressionType.NONE);
write.append(new Text(outputPath.toString()),
new Text(checksumFile.toString()));
} finally {
if (write != null)
write.close();
write = null;
}
}
public void run() {
try {
for(int i=start; i < end; i++) {
String name = getFileName(i);
Path controlFile = new Path(INPUT_DIR, "in_file_" + name);
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, fs.getConf(), controlFile,
Text.class, LongWritable.class,
CompressionType.NONE);
String logFile = jhLogFiles[i].getPath().toString();
writer.append(new Text(logFile), new LongWritable(0));
} catch(Exception e) {
throw new IOException(e);
} finally {
if (writer != null)
writer.close();
writer = null;
}
}
} catch(IOException ex) {
LOG.error("FileCreateDaemon failed.", ex);
}
numFinishedThreads++;
}
public void control(JobConf fsConfig, String fileName)
throws IOException {
String name = fileName;
FileSystem fs = FileSystem.get(fsConfig);
SequenceFile.Writer write = null;
for (int i = 0; i < nmaps; i++) {
try {
Path controlFile = new Path(dfs_input, name + i);
write = SequenceFile.createWriter(fs, fsConfig, controlFile,
Text.class, Text.class, CompressionType.NONE);
write.append(new Text(name + i), new Text(workdir));
} finally {
if (write != null)
write.close();
write = null;
}
}
}
@Test
public void testFileSuffixNotGiven() throws IOException, InterruptedException {
final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test
final String suffix = null;
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
"/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
timedRollerPool, null,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
0, null, null, 30000, Executors.newSingleThreadExecutor());
// Need to override system time use for test so we know what to expect
final long testTime = System.currentTimeMillis();
Clock testClock = new Clock() {
public long currentTimeMillis() {
return testTime;
}
};
bucketWriter.setClock(testClock);
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
bucketWriter.append(e);
Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1) + ".tmp"));
}
private void writeMetadataTest(FileSystem fs, int count, int seed, Path file,
CompressionType compressionType, CompressionCodec codec, SequenceFile.Metadata metadata)
throws IOException {
fs.delete(file, true);
LOG.info("creating " + count + " records with metadata and with " + compressionType +
" compression");
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, conf, file,
RandomDatum.class, RandomDatum.class, compressionType, codec, null, metadata);
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
for (int i = 0; i < count; i++) {
generator.next();
RandomDatum key = generator.getKey();
RandomDatum value = generator.getValue();
writer.append(key, value);
}
writer.close();
}
private synchronized void storeGenerations() throws IOException {
FileSystem fileSystem = _path.getFileSystem(_configuration);
FileStatus[] listStatus = fileSystem.listStatus(_path);
SortedSet<FileStatus> existing = new TreeSet<FileStatus>(Arrays.asList(listStatus));
long currentFile;
if (!existing.isEmpty()) {
FileStatus last = existing.last();
currentFile = Long.parseLong(last.getPath().getName());
} else {
currentFile = 0;
}
Path path = new Path(_path, buffer(currentFile + 1));
LOG.info("Creating new snapshot file [{0}]", path);
FSDataOutputStream outputStream = fileSystem.create(path, false);
Writer writer = SequenceFile.createWriter(_configuration, outputStream, Text.class, LongWritable.class,
CompressionType.NONE, null);
for (Entry<String, Long> e : _namesToGenerations.entrySet()) {
writer.append(new Text(e.getKey()), new LongWritable(e.getValue()));
}
writer.close();
outputStream.close();
cleanupOldFiles(fileSystem, existing);
}
@Test
public void testSizeRoller() throws IOException, InterruptedException {
int maxBytes = 300;
MockHDFSWriter hdfsWriter = new MockHDFSWriter();
BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx,
"/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
hdfsWriter, timedRollerPool, null,
new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
0, null, null, 30000, Executors.newSingleThreadExecutor());
Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
for (int i = 0; i < 1000; i++) {
bucketWriter.append(e);
}
logger.info("Number of events written: {}", hdfsWriter.getEventsWritten());
logger.info("Number of bytes written: {}", hdfsWriter.getBytesWritten());
logger.info("Number of files opened: {}", hdfsWriter.getFilesOpened());
Assert.assertEquals("events written", 1000, hdfsWriter.getEventsWritten());
Assert.assertEquals("bytes written", 3000, hdfsWriter.getBytesWritten());
Assert.assertEquals("files opened", 10, hdfsWriter.getFilesOpened());
}
private void codecTestMapFile(Class<? extends CompressionCodec> clazz,
CompressionType type, int records) throws Exception {
FileSystem fs = FileSystem.get(conf);
LOG.info("Creating MapFiles with " + records +
" records using codec " + clazz.getSimpleName());
Path path = new Path(new Path(
System.getProperty("test.build.data", "/tmp")),
clazz.getSimpleName() + "-" + type + "-" + records);
LOG.info("Writing " + path);
createMapFile(conf, fs, path, clazz.newInstance(), type, records);
MapFile.Reader reader = new MapFile.Reader(path, conf);
Text key1 = new Text("002");
assertNotNull(reader.get(key1, new Text()));
Text key2 = new Text("004");
assertNotNull(reader.get(key2, new Text()));
}
/**
* Each mapper will write one checksum file.
* checksum file contains N pairs where N is the number of threads
* Each pair is has two entries: outputPath and checksum
* outputPath is the directory of files written by the thread
* checksum is the CRC checksum of all files under that directory
* @param name checksum file name
* @param threads array of writer threads
* @return checksum file path
* @throws IOException
*/
private Path writeChecksumFile(FileSystem fs, String name,
GenThread[] threads) throws IOException {
Path checksumFile = new Path(rtc.output_dir, name + ".checksum");
SequenceFile.Writer write = null;
write = SequenceFile.createWriter(fs, fs.getConf(), checksumFile,
Text.class, Text.class, CompressionType.NONE);
try {
for (GenThread rawThread: threads) {
GenWriterThread thread = (GenWriterThread)rawThread;
write.append(new Text(thread.outputPath.toString()),
new Text(Long.toString(thread.dc.getDirectoryChecksum())));
}
} finally {
if (write != null)
write.close();
write = null;
}
return checksumFile;
}
public Configuration build()
{
when(configuration.getInt(ZOOM_LEVEL, 1)).thenReturn(zoomLevel);
when(configuration.getInt(TILE_SIZE, MrGeoConstants.MRGEO_MRS_TILESIZE_DEFAULT_INT)).thenReturn(tileSize);
when(configuration.get(BOUNDS)).thenReturn(boundsString);
when(configuration.getBoolean(FILE_OUTPT_FORMAT_COMPRESS, false)).thenReturn(compressOutput);
when(configuration.get(FILE_OUTPUT_COMPRESSION_TYPE, CompressionType.RECORD.toString()))
.thenReturn(outputCompressionType);
when(configuration.get(FILE_OUTPUT_COMPRESSION_CODEC)).thenReturn(outputCompressionCodec);
when(configuration.get(FILE_OUTPUT_PATH)).thenReturn(outputFilePath);
// when(configuration.getClassByName(anyString())).thenAnswer(new Answer<Class>() {
//
// @Override
// public Class answer(InvocationOnMock invocationOnMock) throws Throwable {
// return Class.forName(invocationOnMock.getArguments()[0].toString());
// }
// });
return configuration;
}
private void writeMetadataTest(FileSystem fs, int count, int seed, Path file,
CompressionType compressionType, CompressionCodec codec, SequenceFile.Metadata metadata)
throws IOException {
fs.delete(file, true);
LOG.info("creating " + count + " records with metadata and with " + compressionType +
" compression");
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, conf, file,
RandomDatum.class, RandomDatum.class, compressionType, codec, null, metadata);
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
for (int i = 0; i < count; i++) {
generator.next();
RandomDatum key = generator.getKey();
RandomDatum value = generator.getValue();
writer.append(key, value);
}
writer.close();
}
public void testRecursiveSeqFileCreate() throws IOException {
FileSystem fs = FileSystem.getLocal(conf);
Path name = new Path(new Path(System.getProperty("test.build.data","."),
"recursiveCreateDir") , "file");
boolean createParent = false;
try {
SequenceFile.createWriter(fs, conf, name, RandomDatum.class,
RandomDatum.class, 512, (short) 1, 4096, createParent,
CompressionType.NONE, null, new Metadata());
fail("Expected an IOException due to missing parent");
} catch (IOException ioe) {
// Expected
}
createParent = true;
SequenceFile.createWriter(fs, conf, name, RandomDatum.class,
RandomDatum.class, 512, (short) 1, 4096, createParent,
CompressionType.NONE, null, new Metadata());
// should succeed, fails if exception thrown
}
/** Test hsync via SequenceFiles */
@Test
public void testSequenceFileSync() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem fs = cluster.getFileSystem();
final Path p = new Path("/testSequenceFileSync/foo");
final int len = 1 << 16;
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
4096, (short) 1, len, null);
Writer w = SequenceFile.createWriter(new Configuration(),
Writer.stream(out),
Writer.keyClass(RandomDatum.class),
Writer.valueClass(RandomDatum.class),
Writer.compression(CompressionType.NONE, new DefaultCodec()));
w.hflush();
checkSyncMetric(cluster, 0);
w.hsync();
checkSyncMetric(cluster, 1);
int seed = new Random().nextInt();
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
generator.next();
w.append(generator.getKey(), generator.getValue());
w.hsync();
checkSyncMetric(cluster, 2);
w.close();
checkSyncMetric(cluster, 2);
out.close();
checkSyncMetric(cluster, 3);
cluster.shutdown();
}
public void testCompression() throws Exception {
EnumSet<SequenceFile.CompressionType> seq =
EnumSet.allOf(SequenceFile.CompressionType.class);
for (CompressionType redCompression : seq) {
for(int combine=0; combine < 2; ++combine) {
checkCompression(false, redCompression, combine == 1);
checkCompression(true, redCompression, combine == 1);
}
}
}
public void testSetFile() throws Exception {
FileSystem fs = FileSystem.getLocal(conf);
try {
RandomDatum[] data = generate(10000);
writeTest(fs, data, FILE, CompressionType.NONE);
readTest(fs, data, FILE);
writeTest(fs, data, FILE, CompressionType.BLOCK);
readTest(fs, data, FILE);
} finally {
fs.close();
}
}
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass, CompressionType compress)
throws IOException {
super(conf, fs, dirName, comparator, valClass, compress);
this.fs = fs;
this.dir = new Path(dirName);
initBloomFilter(conf);
}
private static void writeTest(FileSystem fs, RandomDatum[] data,
String file, CompressionType compress)
throws IOException {
MapFile.delete(fs, file);
LOG.info("creating with " + data.length + " records");
SetFile.Writer writer =
new SetFile.Writer(conf, fs, file,
WritableComparator.get(RandomDatum.class),
compress);
for (int i = 0; i < data.length; i++)
writer.append(data[i]);
writer.close();
}
/** Create the named map for keys of the named class.
* @deprecated Use Writer(Configuration, Path, Option...) instead.
*/
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass,
CompressionType compress,
Progressable progress) throws IOException {
this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass),
compression(compress), progressable(progress));
}
/** Create the named map for keys of the named class.
* @deprecated Use Writer(Configuration, Path, Option...) instead.
*/
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass,
CompressionType compress) throws IOException {
this(conf, new Path(dirName), keyClass(keyClass),
valueClass(valClass), compression(compress));
}
private static void writeTest(FileSystem fs, RandomDatum[] data,
String file, CompressionType compress)
throws IOException {
MapFile.delete(fs, file);
LOG.info("creating with " + data.length + " records");
SetFile.Writer writer =
new SetFile.Writer(conf, fs, file,
WritableComparator.get(RandomDatum.class),
compress);
for (int i = 0; i < data.length; i++)
writer.append(data[i]);
writer.close();
}
@Test
public void testCompression() throws Exception {
EnumSet<SequenceFile.CompressionType> seq =
EnumSet.allOf(SequenceFile.CompressionType.class);
for (CompressionType redCompression : seq) {
for(int combine=0; combine < 2; ++combine) {
checkCompression(false, redCompression, combine == 1);
checkCompression(true, redCompression, combine == 1);
}
}
}
/** Create the named map for keys of the named class. */
public Writer(Configuration conf, FileSystem fs, String dirName,
Class<? extends WritableComparable> keyClass, Class valClass,
CompressionType compress, CompressionCodec codec,
Progressable progress)
throws IOException {
this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
compress, codec, progress);
}
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws IOException {
super.checkOutputSpecs(ignored, job);
if (getCompressOutput(job) &&
getOutputCompressionType(job) == CompressionType.RECORD ){
throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
+ "doesn't support Record Compression" );
}
}
@Override
public void open(String filePath, CompressionCodec codeC,
CompressionType compType) throws IOException {
super.open(filePath, codeC, compType);
if(closed) {
opened = true;
}
}
@SuppressWarnings("unchecked")
private void writeSkippedRec(KEY key, VALUE value) throws IOException{
if(skipWriter==null) {
Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
Path skipFile = new Path(skipDir, getTaskID().toString());
skipWriter = SequenceFile.createWriter(
skipFile.getFileSystem(conf), conf, skipFile,
keyClass, valClass,
CompressionType.BLOCK, reporter);
}
skipWriter.append(key, value);
}
@Deprecated
public Writer(Configuration conf, FileSystem fs, String dirName,
WritableComparator comparator, Class valClass,
CompressionType compress, CompressionCodec codec, Progressable progress)
throws IOException {
this(conf, new Path(dirName), comparator(comparator),
valueClass(valClass), compression(compress, codec),
progressable(progress));
}
@Override
public void checkOutputSpecs(JobContext job) throws IOException {
super.checkOutputSpecs(job);
if (getCompressOutput(job) &&
getOutputCompressionType(job) == CompressionType.RECORD ) {
throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
+ "doesn't support Record Compression" );
}
}
public RecordWriter<K, V> getRecordWriter(
FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
// get the path of the temporary output file
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
// find the kind of compression to do
compressionType = getOutputCompressionType(job);
// find the right codec
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
final SequenceFile.Writer out =
SequenceFile.createWriter(fs, job, file,
job.getOutputKeyClass(),
job.getOutputValueClass(),
compressionType,
codec,
progress);
return new RecordWriter<K, V>() {
public void write(K key, V value)
throws IOException {
out.append(key, value);
}
public void close(Reporter reporter) throws IOException { out.close();}
};
}
/**
* Set the {@link CompressionType} for the output {@link SequenceFile}.
* @param job the {@link Job} to modify
* @param style the {@link CompressionType} for the output
* {@link SequenceFile}
*/
public static void setOutputCompressionType(Job job,
CompressionType style) {
setCompressOutput(job, true);
job.getConfiguration().set(FileOutputFormat.COMPRESS_TYPE,
style.toString());
}
protected MapFile.Writer createMapFileWriter(TaskAttemptContext context, CompressionCodec codec,
CompressionType compressionType, Path file) throws IOException
{
return new MapFile.Writer(context.getConfiguration(), file,
MapFile.Writer.keyClass(context.getOutputKeyClass().asSubclass(WritableComparable.class)),
MapFile.Writer.valueClass(context.getOutputValueClass().asSubclass(Writable.class)),
MapFile.Writer.compression(compressionType, codec),
MapFile.Writer.progressable(context));
}