下面列出了怎么用org.apache.hadoop.fs.ChecksumFileSystem的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Tests read/seek/getPos/skipped opeation for input stream.
*/
private void testChecker(ChecksumFileSystem fileSys, boolean readCS)
throws Exception {
Path file = new Path("try.dat");
if( readCS ) {
writeFile(fileSys, file);
} else {
writeFile(fileSys.getRawFileSystem(), file);
}
stm = fileSys.open(file);
checkReadAndGetPos();
checkSeek();
checkSkip();
//checkMark
assertFalse(stm.markSupported());
stm.close();
cleanupFile(fileSys, file);
}
/**
* Tests read/seek/getPos/skipped opeation for input stream.
*/
private void testChecker(ChecksumFileSystem fileSys, boolean readCS)
throws Exception {
Path file = new Path("try.dat");
if( readCS ) {
writeFile(fileSys, file);
} else {
writeFile(fileSys.getRawFileSystem(), file);
}
stm = fileSys.open(file);
checkReadAndGetPos();
checkSeek();
checkSkip();
//checkMark
assertFalse(stm.markSupported());
stm.close();
cleanupFile(fileSys, file);
}
@Override
public synchronized int read(ByteBuffer dst) throws IOException {
int res;
if (fs instanceof ChecksumFileSystem) {
byte[] bytes = new byte[dst.remaining()];
res = fsDataInputStream.read(bytes);
dst.put(bytes);
} else {
res = fsDataInputStream.read(dst);
}
return res;
}
private void smallReadSeek(FileSystem fileSys, Path name) throws IOException {
if (fileSys instanceof ChecksumFileSystem) {
fileSys = ((ChecksumFileSystem)fileSys).getRawFileSystem();
}
// Make the buffer size small to trigger code for HADOOP-922
FSDataInputStream stmRaw = fileSys.open(name, 1);
byte[] expected = new byte[ONEMB];
Random rand = new Random(seed);
rand.nextBytes(expected);
// Issue a simple read first.
byte[] actual = new byte[128];
stmRaw.seek(100000);
stmRaw.read(actual, 0, actual.length);
checkAndEraseData(actual, 100000, expected, "First Small Read Test");
// now do a small seek of 4 bytes, within the same block.
int newpos1 = 100000 + 128 + 4;
stmRaw.seek(newpos1);
stmRaw.read(actual, 0, actual.length);
checkAndEraseData(actual, newpos1, expected, "Small Seek Bug 1");
// seek another 256 bytes this time
int newpos2 = newpos1 + 256;
stmRaw.seek(newpos2);
stmRaw.read(actual, 0, actual.length);
checkAndEraseData(actual, newpos2, expected, "Small Seek Bug 2");
// all done
stmRaw.close();
}
private void smallReadSeek(FileSystem fileSys, Path name) throws IOException {
if (fileSys instanceof ChecksumFileSystem) {
fileSys = ((ChecksumFileSystem)fileSys).getRawFileSystem();
}
// Make the buffer size small to trigger code for HADOOP-922
FSDataInputStream stmRaw = fileSys.open(name, 1);
byte[] expected = new byte[ONEMB];
Random rand = new Random(seed);
rand.nextBytes(expected);
// Issue a simple read first.
byte[] actual = new byte[128];
stmRaw.seek(100000);
stmRaw.read(actual, 0, actual.length);
checkAndEraseData(actual, 100000, expected, "First Small Read Test");
// now do a small seek of 4 bytes, within the same block.
int newpos1 = 100000 + 128 + 4;
stmRaw.seek(newpos1);
stmRaw.read(actual, 0, actual.length);
checkAndEraseData(actual, newpos1, expected, "Small Seek Bug 1");
// seek another 256 bytes this time
int newpos2 = newpos1 + 256;
stmRaw.seek(newpos2);
stmRaw.read(actual, 0, actual.length);
checkAndEraseData(actual, newpos2, expected, "Small Seek Bug 2");
// all done
stmRaw.close();
}
private FSDataOutputStream create(String filename, boolean noChecksum, boolean overwrite)
throws IOException {
Path filePath = qualifiedPath(filename);
// even though it was qualified using the default FS, it may not be in it
FileSystem fs = filePath.getFileSystem(getConf());
if (noChecksum && fs instanceof ChecksumFileSystem) {
fs = ((ChecksumFileSystem) fs).getRawFileSystem();
}
return fs.create(filePath, overwrite);
}
private void smallReadSeek(FileSystem fileSys, Path name) throws IOException {
if (fileSys instanceof ChecksumFileSystem) {
fileSys = ((ChecksumFileSystem)fileSys).getRawFileSystem();
}
// Make the buffer size small to trigger code for HADOOP-922
FSDataInputStream stmRaw = fileSys.open(name, 1);
byte[] expected = new byte[ONEMB];
Random rand = new Random(seed);
rand.nextBytes(expected);
// Issue a simple read first.
byte[] actual = new byte[128];
stmRaw.seek(100000);
stmRaw.read(actual, 0, actual.length);
checkAndEraseData(actual, 100000, expected, "First Small Read Test");
// now do a small seek of 4 bytes, within the same block.
int newpos1 = 100000 + 128 + 4;
stmRaw.seek(newpos1);
stmRaw.read(actual, 0, actual.length);
checkAndEraseData(actual, newpos1, expected, "Small Seek Bug 1");
// seek another 256 bytes this time
int newpos2 = newpos1 + 256;
stmRaw.seek(newpos2);
stmRaw.read(actual, 0, actual.length);
checkAndEraseData(actual, newpos2, expected, "Small Seek Bug 2");
// all done
stmRaw.close();
}
public void testFSInputChecker() throws Exception {
Configuration conf = new Configuration();
conf.setLong("dfs.block.size", BLOCK_SIZE);
conf.setInt("io.bytes.per.checksum", BYTES_PER_SUM);
conf.set("fs.hdfs.impl",
"org.apache.hadoop.hdfs.ChecksumDistributedFileSystem");
rand.nextBytes(expected);
// test DFS
MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
ChecksumFileSystem fileSys = (ChecksumFileSystem)cluster.getFileSystem();
try {
testChecker(fileSys, true);
testChecker(fileSys, false);
testSeekAndRead(fileSys);
} finally {
fileSys.close();
cluster.shutdown();
}
// test Local FS
fileSys = FileSystem.getLocal(conf);
try {
testChecker(fileSys, true);
testChecker(fileSys, false);
testFileCorruption((LocalFileSystem)fileSys);
testSeekAndRead(fileSys);
}finally {
fileSys.close();
}
}
private void testSeekAndRead(ChecksumFileSystem fileSys)
throws IOException {
Path file = new Path("try.dat");
writeFile(fileSys, file);
stm = fileSys.open(file,
fileSys.getConf().getInt("io.file.buffer.size", 4096));
checkSeekAndRead();
stm.close();
cleanupFile(fileSys, file);
}
private void smallReadSeek(FileSystem fileSys, Path name) throws IOException {
if (fileSys instanceof ChecksumFileSystem) {
fileSys = ((ChecksumFileSystem)fileSys).getRawFileSystem();
}
// Make the buffer size small to trigger code for HADOOP-922
FSDataInputStream stmRaw = fileSys.open(name, 1);
byte[] expected = new byte[ONEMB];
Random rand = new Random(seed);
rand.nextBytes(expected);
// Issue a simple read first.
byte[] actual = new byte[128];
stmRaw.seek(100000);
stmRaw.read(actual, 0, actual.length);
checkAndEraseData(actual, 100000, expected, "First Small Read Test");
// now do a small seek of 4 bytes, within the same block.
int newpos1 = 100000 + 128 + 4;
stmRaw.seek(newpos1);
stmRaw.read(actual, 0, actual.length);
checkAndEraseData(actual, newpos1, expected, "Small Seek Bug 1");
// seek another 256 bytes this time
int newpos2 = newpos1 + 256;
stmRaw.seek(newpos2);
stmRaw.read(actual, 0, actual.length);
checkAndEraseData(actual, newpos2, expected, "Small Seek Bug 2");
// all done
stmRaw.close();
}
public void testFSInputChecker() throws Exception {
Configuration conf = new Configuration();
conf.setLong("dfs.block.size", BLOCK_SIZE);
conf.setInt("io.bytes.per.checksum", BYTES_PER_SUM);
conf.set("fs.hdfs.impl",
"org.apache.hadoop.hdfs.ChecksumDistributedFileSystem");
rand.nextBytes(expected);
// test DFS
MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
ChecksumFileSystem fileSys = (ChecksumFileSystem)cluster.getFileSystem();
try {
testChecker(fileSys, true);
testChecker(fileSys, false);
testSeekAndRead(fileSys);
} finally {
fileSys.close();
cluster.shutdown();
}
// test Local FS
fileSys = FileSystem.getLocal(conf);
try {
testChecker(fileSys, true);
testChecker(fileSys, false);
testFileCorruption((LocalFileSystem)fileSys);
testSeekAndRead(fileSys);
}finally {
fileSys.close();
}
}
private void testSeekAndRead(ChecksumFileSystem fileSys)
throws IOException {
Path file = new Path("try.dat");
writeFile(fileSys, file);
stm = fileSys.open(file,
fileSys.getConf().getInt("io.file.buffer.size", 4096));
checkSeekAndRead();
stm.close();
cleanupFile(fileSys, file);
}
private FSDataOutputStream create(String filename, boolean noChecksum)
throws IOException {
Path filePath = qualifiedPath(filename);
// even though it was qualified using the default FS, it may not be in it
FileSystem fs = filePath.getFileSystem(getConf());
if (noChecksum && fs instanceof ChecksumFileSystem) {
fs = ((ChecksumFileSystem) fs).getRawFileSystem();
}
return fs.create(filePath, true /* overwrite */);
}
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}
long approxOutputSize = 0;
int bytesPerSum =
jobConf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
// add the checksum length
approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
(RawComparator<K>) jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
closeOnDiskFile(compressAwarePath);
LOG.info(reduceId +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
}
@Override
public int run(String[] args) throws IOException {
// silence the minidfs cluster
Log hadoopLog = LogFactory.getLog("org");
if (hadoopLog instanceof Log4JLogger) {
((Log4JLogger) hadoopLog).getLogger().setLevel(Level.WARN);
}
int reps = 1;
if (args.length == 1) {
try {
reps = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
printUsage();
return -1;
}
} else if (args.length > 1) {
printUsage();
return -1;
}
Configuration conf = getConf();
// the size of the file to write
long SIZE = conf.getLong("dfsthroughput.file.size",
10L * 1024 * 1024 * 1024);
BUFFER_SIZE = conf.getInt("dfsthroughput.buffer.size", 4 * 1024);
String localDir = conf.get("mapred.temp.dir");
if (localDir == null) {
localDir = conf.get("hadoop.tmp.dir");
conf.set("mapred.temp.dir", localDir);
}
dir = new LocalDirAllocator("mapred.temp.dir");
System.setProperty("test.build.data", localDir);
System.out.println("Local = " + localDir);
ChecksumFileSystem checkedLocal = FileSystem.getLocal(conf);
FileSystem rawLocal = checkedLocal.getRawFileSystem();
for(int i=0; i < reps; ++i) {
writeAndReadLocalFile("local", conf, SIZE);
writeAndReadFile(rawLocal, "raw", conf, SIZE);
writeAndReadFile(checkedLocal, "checked", conf, SIZE);
}
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf)
.racks(new String[]{"/foo"}).build();
cluster.waitActive();
FileSystem dfs = cluster.getFileSystem();
for(int i=0; i < reps; ++i) {
writeAndReadFile(dfs, "dfs", conf, SIZE);
}
} finally {
if (cluster != null) {
cluster.shutdown();
// clean up minidfs junk
rawLocal.delete(new Path(localDir, "dfs"), true);
}
}
return 0;
}
@Override
public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}
long approxOutputSize = 0;
int bytesPerSum =
jobConf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
// add the checksum length
approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath));
Writer<K, V> writer = new Writer<K, V>(jobConf, out,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(), codec, null, true);
RawKeyValueIterator iter = null;
CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
(Class<K>) jobConf.getMapOutputKeyClass(),
(Class<V>) jobConf.getMapOutputValueClass(),
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
(RawComparator<K>) jobConf.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
writer.close();
compressAwarePath = new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
closeOnDiskFile(compressAwarePath);
LOG.info(reduceId +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
}
@Override
public int run(String[] args) throws IOException {
// silence the minidfs cluster
Log hadoopLog = LogFactory.getLog("org");
if (hadoopLog instanceof Log4JLogger) {
((Log4JLogger) hadoopLog).getLogger().setLevel(Level.WARN);
}
int reps = 1;
if (args.length == 1) {
try {
reps = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
printUsage();
return -1;
}
} else if (args.length > 1) {
printUsage();
return -1;
}
Configuration conf = getConf();
// the size of the file to write
long SIZE = conf.getLong("dfsthroughput.file.size",
10L * 1024 * 1024 * 1024);
BUFFER_SIZE = conf.getInt("dfsthroughput.buffer.size", 4 * 1024);
String localDir = conf.get("mapred.temp.dir");
if (localDir == null) {
localDir = conf.get("hadoop.tmp.dir");
conf.set("mapred.temp.dir", localDir);
}
dir = new LocalDirAllocator("mapred.temp.dir");
System.setProperty("test.build.data", localDir);
System.out.println("Local = " + localDir);
ChecksumFileSystem checkedLocal = FileSystem.getLocal(conf);
FileSystem rawLocal = checkedLocal.getRawFileSystem();
for(int i=0; i < reps; ++i) {
writeAndReadLocalFile("local", conf, SIZE);
writeAndReadFile(rawLocal, "raw", conf, SIZE);
writeAndReadFile(checkedLocal, "checked", conf, SIZE);
}
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf)
.racks(new String[]{"/foo"}).build();
cluster.waitActive();
FileSystem dfs = cluster.getFileSystem();
for(int i=0; i < reps; ++i) {
writeAndReadFile(dfs, "dfs", conf, SIZE);
}
} finally {
if (cluster != null) {
cluster.shutdown();
// clean up minidfs junk
rawLocal.delete(new Path(localDir, "dfs"), true);
}
}
return 0;
}
public int run(String[] args) throws IOException {
// silence the minidfs cluster
Log hadoopLog = LogFactory.getLog("org");
if (hadoopLog instanceof Log4JLogger) {
((Log4JLogger) hadoopLog).getLogger().setLevel(Level.WARN);
}
int reps = 1;
if (args.length == 1) {
try {
reps = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
printUsage();
return -1;
}
} else if (args.length > 1) {
printUsage();
return -1;
}
Configuration conf = getConf();
// the size of the file to write
long SIZE = conf.getLong("dfsthroughput.file.size",
10L * 1024 * 1024 * 1024);
BUFFER_SIZE = conf.getInt("dfsthroughput.buffer.size", 4 * 1024);
String localDir = conf.get("mapred.temp.dir");
dir = new LocalDirAllocator("mapred.temp.dir");
System.setProperty("test.build.data", localDir);
System.out.println("Local = " + localDir);
ChecksumFileSystem checkedLocal = FileSystem.getLocal(conf);
FileSystem rawLocal = checkedLocal.getRawFileSystem();
for(int i=0; i < reps; ++i) {
writeAndReadLocalFile("local", conf, SIZE);
writeAndReadFile(rawLocal, "raw", conf, SIZE);
writeAndReadFile(checkedLocal, "checked", conf, SIZE);
}
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster(conf, 1, true, new String[]{"/foo"});
cluster.waitActive();
FileSystem dfs = cluster.getFileSystem();
for(int i=0; i < reps; ++i) {
writeAndReadFile(dfs, "dfs", conf, SIZE);
}
} finally {
if (cluster != null) {
cluster.shutdown();
// clean up minidfs junk
rawLocal.delete(new Path(localDir, "dfs"), true);
}
}
return 0;
}
@Override
public void merge(List<Path> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
return;
}
numDiskToDiskMerges.increment(1);
long approxOutputSize = 0;
int bytesPerSum =
conf.getInt("io.bytes.per.checksum", 512);
LOG.info("OnDiskMerger: We have " + inputs.size() +
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
for (Path file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
// add the checksum length
approxOutputSize +=
ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
// 2. Start the on-disk merge process
Path outputPath =
localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),
approxOutputSize, conf).suffix(Constants.MERGED_OUTPUT_PREFIX);
Writer writer =
new Writer(conf, rfs, outputPath,
(Class)ConfigUtils.getIntermediateInputKeyClass(conf),
(Class)ConfigUtils.getIntermediateInputValueClass(conf),
codec, null, null);
TezRawKeyValueIterator iter = null;
Path tmpDir = new Path(inputContext.getUniqueIdentifier());
try {
iter = TezMerger.merge(conf, rfs,
(Class)ConfigUtils.getIntermediateInputKeyClass(conf),
(Class)ConfigUtils.getIntermediateInputValueClass(conf),
codec, ifileReadAhead, ifileReadAheadLength, ifileBufferSize,
inputs.toArray(new Path[inputs.size()]), true, ioSortFactor, tmpDir,
(RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
nullProgressable, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
// TODO Maybe differentiate between data written because of Merges and
// the finalMerge (i.e. final mem available may be different from
// initial merge mem)
TezMerger.writeFile(iter, writer, nullProgressable, TezJobConfig.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
writer.close();
additionalBytesWritten.increment(writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
closeOnDiskFile(outputPath);
LOG.info(inputContext.getUniqueIdentifier() +
" Finished merging " + inputs.size() +
" map output files on disk of total-size " +
approxOutputSize + "." +
" Local output file is " + outputPath + " of size " +
localFS.getFileStatus(outputPath).getLen());
}
public int run(String[] args) throws IOException {
// silence the minidfs cluster
Log hadoopLog = LogFactory.getLog("org");
if (hadoopLog instanceof Log4JLogger) {
((Log4JLogger) hadoopLog).getLogger().setLevel(Level.WARN);
}
int reps = 1;
if (args.length == 1) {
try {
reps = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
printUsage();
return -1;
}
} else if (args.length > 1) {
printUsage();
return -1;
}
Configuration conf = getConf();
// the size of the file to write
long SIZE = conf.getLong("dfsthroughput.file.size",
10L * 1024 * 1024 * 1024);
BUFFER_SIZE = conf.getInt("dfsthroughput.buffer.size", 4 * 1024);
String localDir = conf.get("mapred.temp.dir");
dir = new LocalDirAllocator("mapred.temp.dir");
System.setProperty("test.build.data", localDir);
System.out.println("Local = " + localDir);
ChecksumFileSystem checkedLocal = FileSystem.getLocal(conf);
FileSystem rawLocal = checkedLocal.getRawFileSystem();
for(int i=0; i < reps; ++i) {
writeAndReadLocalFile("local", conf, SIZE);
writeAndReadFile(rawLocal, "raw", conf, SIZE);
writeAndReadFile(checkedLocal, "checked", conf, SIZE);
}
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster(conf, 1, true, new String[]{"/foo"});
cluster.waitActive();
FileSystem dfs = cluster.getFileSystem();
for(int i=0; i < reps; ++i) {
writeAndReadFile(dfs, "dfs", conf, SIZE);
}
} finally {
if (cluster != null) {
cluster.shutdown();
// clean up minidfs junk
rawLocal.delete(new Path(localDir, "dfs"), true);
}
}
return 0;
}