下面列出了怎么用org.apache.hadoop.fs.FileSystem.Statistics的API类实例代码及写法,或者点击链接到github查看源代码。
TrackedRecordReader(TaskReporter reporter, JobConf job)
throws IOException{
inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);
this.reporter = reporter;
List<Statistics> matchedStats = null;
if (this.reporter.getInputSplit() instanceof FileSplit) {
matchedStats = getFsStatistics(((FileSplit) this.reporter
.getInputSplit()).getPath(), job);
}
fsStats = matchedStats;
bytesInPrev = getInputBytes(fsStats);
rawIn = job.getInputFormat().getRecordReader(reporter.getInputSplit(),
job, reporter);
bytesInCurr = getInputBytes(fsStats);
fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
TaskReporter reporter,
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
throws InterruptedException, IOException {
this.reporter = reporter;
this.inputRecordCounter = reporter
.getCounter(TaskCounter.MAP_INPUT_RECORDS);
this.fileInputByteCounter = reporter
.getCounter(FileInputFormatCounter.BYTES_READ);
List <Statistics> matchedStats = null;
if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
.getPath(), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesInPrev = getInputBytes(fsStats);
this.real = inputFormat.createRecordReader(split, taskContext);
long bytesInCurr = getInputBytes(fsStats);
fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
@SuppressWarnings("unchecked")
NewDirectOutputCollector(MRJobConfig jobContext,
JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter)
throws IOException, ClassNotFoundException, InterruptedException {
this.reporter = reporter;
mapOutputRecordCounter = reporter
.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
List<Statistics> matchedStats = null;
if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
.getOutputPath(taskContext), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes(fsStats);
out = outputFormat.getRecordWriter(taskContext);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
@SuppressWarnings("unchecked")
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
this.reporter = context.getReporter();
JobConf job = context.getJobConf();
String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(job);
OutputFormat<K, V> outputFormat = job.getOutputFormat();
mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
List<Statistics> matchedStats = null;
if (outputFormat instanceof FileOutputFormat) {
matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes(fsStats);
out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
@SuppressWarnings("unchecked")
NewTrackingRecordWriter(ReduceTask reduce,
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
throws InterruptedException, IOException {
this.outputRecordCounter = reduce.reduceOutputCounter;
this.fileOutputByteCounter = reduce.fileOutputByteCounter;
List<Statistics> matchedStats = null;
if (reduce.outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
.getOutputPath(taskContext), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes(fsStats);
this.real = (org.apache.hadoop.mapreduce.RecordWriter<K, V>) reduce.outputFormat
.getRecordWriter(taskContext);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
@Test
public void testStatistics() throws IOException, URISyntaxException {
URI fsUri = getFsUri();
Statistics stats = FileContext.getStatistics(fsUri);
Assert.assertEquals(0, stats.getBytesRead());
Path filePath = fileContextTestHelper .getTestRootPath(fc, "file1");
createFile(fc, filePath, numBlocks, blockSize);
Assert.assertEquals(0, stats.getBytesRead());
verifyWrittenBytes(stats);
FSDataInputStream fstr = fc.open(filePath);
byte[] buf = new byte[blockSize];
int bytesRead = fstr.read(buf, 0, blockSize);
fstr.read(0, buf, 0, blockSize);
Assert.assertEquals(blockSize, bytesRead);
verifyReadBytes(stats);
verifyWrittenBytes(stats);
verifyReadBytes(FileContext.getStatistics(getFsUri()));
Map<URI, Statistics> statsMap = FileContext.getAllStatistics();
URI exactUri = getSchemeAuthorityUri();
verifyWrittenBytes(statsMap.get(exactUri));
fc.delete(filePath, true);
}
TrackedRecordReader(TaskReporter reporter, JobConf job)
throws IOException{
inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);
this.reporter = reporter;
List<Statistics> matchedStats = null;
if (this.reporter.getInputSplit() instanceof FileSplit) {
matchedStats = getFsStatistics(((FileSplit) this.reporter
.getInputSplit()).getPath(), job);
}
fsStats = matchedStats;
bytesInPrev = getInputBytes(fsStats);
rawIn = job.getInputFormat().getRecordReader(reporter.getInputSplit(),
job, reporter);
bytesInCurr = getInputBytes(fsStats);
fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
TaskReporter reporter,
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
throws InterruptedException, IOException {
this.reporter = reporter;
this.inputRecordCounter = reporter
.getCounter(TaskCounter.MAP_INPUT_RECORDS);
this.fileInputByteCounter = reporter
.getCounter(FileInputFormatCounter.BYTES_READ);
List <Statistics> matchedStats = null;
if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
.getPath(), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesInPrev = getInputBytes(fsStats);
this.real = inputFormat.createRecordReader(split, taskContext);
long bytesInCurr = getInputBytes(fsStats);
fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}
@SuppressWarnings("unchecked")
NewDirectOutputCollector(MRJobConfig jobContext,
JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter)
throws IOException, ClassNotFoundException, InterruptedException {
this.reporter = reporter;
mapOutputRecordCounter = reporter
.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
List<Statistics> matchedStats = null;
if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
.getOutputPath(taskContext), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes(fsStats);
out = outputFormat.getRecordWriter(taskContext);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
@SuppressWarnings("unchecked")
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
this.reporter = context.getReporter();
JobConf job = context.getJobConf();
String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(job);
OutputFormat<K, V> outputFormat = job.getOutputFormat();
mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = reporter
.getCounter(FileOutputFormatCounter.BYTES_WRITTEN);
List<Statistics> matchedStats = null;
if (outputFormat instanceof FileOutputFormat) {
matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes(fsStats);
out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
@SuppressWarnings({ "deprecation", "unchecked" })
public OldTrackingRecordWriter(ReduceTask reduce, JobConf job,
TaskReporter reporter, String finalName) throws IOException {
this.reduceOutputCounter = reduce.reduceOutputCounter;
this.fileOutputByteCounter = reduce.fileOutputByteCounter;
List<Statistics> matchedStats = null;
if (job.getOutputFormat() instanceof FileOutputFormat) {
matchedStats = getFsStatistics(FileOutputFormat.getOutputPath(job), job);
}
fsStats = matchedStats;
FileSystem fs = FileSystem.get(job);
long bytesOutPrev = getOutputBytes(fsStats);
this.real = job.getOutputFormat().getRecordWriter(fs, job, finalName,
reporter);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
@SuppressWarnings("unchecked")
NewTrackingRecordWriter(ReduceTask reduce,
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
throws InterruptedException, IOException {
this.outputRecordCounter = reduce.reduceOutputCounter;
this.fileOutputByteCounter = reduce.fileOutputByteCounter;
List<Statistics> matchedStats = null;
if (reduce.outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
.getOutputPath(taskContext), taskContext.getConfiguration());
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes(fsStats);
this.real = (org.apache.hadoop.mapreduce.RecordWriter<K, V>) reduce.outputFormat
.getRecordWriter(taskContext);
long bytesOutCurr = getOutputBytes(fsStats);
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
@Test
public void testStatistics() throws IOException, URISyntaxException {
URI fsUri = getFsUri();
Statistics stats = FileContext.getStatistics(fsUri);
Assert.assertEquals(0, stats.getBytesRead());
Path filePath = fileContextTestHelper .getTestRootPath(fc, "file1");
createFile(fc, filePath, numBlocks, blockSize);
Assert.assertEquals(0, stats.getBytesRead());
verifyWrittenBytes(stats);
FSDataInputStream fstr = fc.open(filePath);
byte[] buf = new byte[blockSize];
int bytesRead = fstr.read(buf, 0, blockSize);
fstr.read(0, buf, 0, blockSize);
Assert.assertEquals(blockSize, bytesRead);
verifyReadBytes(stats);
verifyWrittenBytes(stats);
verifyReadBytes(FileContext.getStatistics(getFsUri()));
Map<URI, Statistics> statsMap = FileContext.getAllStatistics();
URI exactUri = getSchemeAuthorityUri();
verifyWrittenBytes(statsMap.get(exactUri));
fc.delete(filePath, true);
}
/**
* Direct HTTP PUT request without JOSS package
*
* @param objName name of the object
* @param contentType content type
* @return HttpURLConnection
*/
@Override
public FSDataOutputStream createObject(String objName, String contentType,
Map<String, String> metadata, Statistics statistics, boolean overwrite) throws IOException {
final URL url = new URL(mJossAccount.getAccessURL() + "/" + getURLEncodedObjName(objName));
LOG.debug("PUT {}. Content-Type : {}", url.toString(), contentType);
// When overwriting an object, cached metadata will be outdated
String cachedName = getObjName(container + "/", objName);
objectCache.remove(cachedName);
try {
final OutputStream sos;
if (nonStreamingUpload) {
sos = new SwiftNoStreamingOutputStream(mJossAccount, url, contentType,
metadata, swiftConnectionManager, this);
} else {
sos = new SwiftOutputStream(mJossAccount, url, contentType,
metadata, swiftConnectionManager);
}
return new FSDataOutputStream(sos, statistics);
} catch (IOException e) {
LOG.error(e.getMessage());
throw e;
}
}
/**
* Gets a handle to the Statistics instance based on the scheme associated
* with path.
*
* @param path the path.
* @param conf the configuration to extract the scheme from if not part of
* the path.
* @return a Statistics instance, or null if none is found for the scheme.
*/
protected static List<Statistics> getFsStatistics(Path path, Configuration conf) throws IOException {
List<Statistics> matchedStats = new ArrayList<FileSystem.Statistics>();
path = path.getFileSystem(conf).makeQualified(path);
String scheme = path.toUri().getScheme();
for (Statistics stats : FileSystem.getAllStatistics()) {
if (stats.getScheme().equals(scheme)) {
matchedStats.add(stats);
}
}
return matchedStats;
}
void updateCounters() {
if (readBytesCounter == null) {
readBytesCounter = counters.findCounter(scheme,
FileSystemCounter.BYTES_READ);
}
if (writeBytesCounter == null) {
writeBytesCounter = counters.findCounter(scheme,
FileSystemCounter.BYTES_WRITTEN);
}
if (readOpsCounter == null) {
readOpsCounter = counters.findCounter(scheme,
FileSystemCounter.READ_OPS);
}
if (largeReadOpsCounter == null) {
largeReadOpsCounter = counters.findCounter(scheme,
FileSystemCounter.LARGE_READ_OPS);
}
if (writeOpsCounter == null) {
writeOpsCounter = counters.findCounter(scheme,
FileSystemCounter.WRITE_OPS);
}
long readBytes = 0;
long writeBytes = 0;
long readOps = 0;
long largeReadOps = 0;
long writeOps = 0;
for (FileSystem.Statistics stat: stats) {
readBytes = readBytes + stat.getBytesRead();
writeBytes = writeBytes + stat.getBytesWritten();
readOps = readOps + stat.getReadOps();
largeReadOps = largeReadOps + stat.getLargeReadOps();
writeOps = writeOps + stat.getWriteOps();
}
readBytesCounter.setValue(readBytes);
writeBytesCounter.setValue(writeBytes);
readOpsCounter.setValue(readOps);
largeReadOpsCounter.setValue(largeReadOps);
writeOpsCounter.setValue(writeOps);
}
private long getInputBytes(List<Statistics> stats) {
if (stats == null) return 0;
long bytesRead = 0;
for (Statistics stat: stats) {
bytesRead = bytesRead + stat.getBytesRead();
}
return bytesRead;
}
private long getInputBytes(List<Statistics> stats) {
if (stats == null) return 0;
long bytesRead = 0;
for (Statistics stat: stats) {
bytesRead = bytesRead + stat.getBytesRead();
}
return bytesRead;
}
private long getOutputBytes(List<Statistics> stats) {
if (stats == null) return 0;
long bytesWritten = 0;
for (Statistics stat: stats) {
bytesWritten = bytesWritten + stat.getBytesWritten();
}
return bytesWritten;
}
private long getOutputBytes(List<Statistics> stats) {
if (stats == null) return 0;
long bytesWritten = 0;
for (Statistics stat: stats) {
bytesWritten = bytesWritten + stat.getBytesWritten();
}
return bytesWritten;
}
private synchronized void updateCounters() {
for(Statistics stat: FileSystem.getAllStatistics()) {
String uriScheme = stat.getScheme();
FileSystemStatisticUpdater updater = statisticUpdaters.get(uriScheme);
if(updater==null) {//new FileSystem has been found in the cache
updater = new FileSystemStatisticUpdater(uriScheme, stat);
statisticUpdaters.put(uriScheme, updater);
}
updater.updateCounters();
}
}
private long getOutputBytes(List<Statistics> stats) {
if (stats == null) return 0;
long bytesWritten = 0;
for (Statistics stat: stats) {
bytesWritten = bytesWritten + stat.getBytesWritten();
}
return bytesWritten;
}
private long getOutputBytes(List<Statistics> stats) {
if (stats == null) return 0;
long bytesWritten = 0;
for (Statistics stat: stats) {
bytesWritten = bytesWritten + stat.getBytesWritten();
}
return bytesWritten;
}
/**
* Prints statistics for all file systems.
*/
public static synchronized void printStatistics() {
for (Map.Entry<URI, Statistics> pair : STATISTICS_TABLE.entrySet()) {
System.out.println(" FileSystem " + pair.getKey().getScheme() + "://"
+ pair.getKey().getAuthority() + ": " + pair.getValue());
}
}
protected static synchronized Map<URI, Statistics> getAllStatistics() {
Map<URI, Statistics> statsMap = new HashMap<URI, Statistics>(
STATISTICS_TABLE.size());
for (Map.Entry<URI, Statistics> pair : STATISTICS_TABLE.entrySet()) {
URI key = pair.getKey();
Statistics value = pair.getValue();
Statistics newStatsObj = new Statistics(value);
statsMap.put(URI.create(key.toString()), newStatsObj);
}
return statsMap;
}
@Test(timeout = 1000)
public void testStatistics() throws Exception {
int fileSchemeCount = 0;
for (Statistics stats : FileSystem.getAllStatistics()) {
if (stats.getScheme().equals("file")) {
fileSchemeCount++;
}
}
assertEquals(1, fileSchemeCount);
}
/**
* Gets a handle to the Statistics instance based on the scheme associated
* with path.
*
* @param path the path.
* @param conf the configuration to extract the scheme from if not part of
* the path.
* @return a Statistics instance, or null if none is found for the scheme.
*/
protected static List<Statistics> getFsStatistics(Path path, Configuration conf) throws IOException {
List<Statistics> matchedStats = new ArrayList<FileSystem.Statistics>();
path = path.getFileSystem(conf).makeQualified(path);
String scheme = path.toUri().getScheme();
for (Statistics stats : FileSystem.getAllStatistics()) {
if (stats.getScheme().equals(scheme)) {
matchedStats.add(stats);
}
}
return matchedStats;
}
void updateCounters() {
if (readBytesCounter == null) {
readBytesCounter = counters.findCounter(scheme,
FileSystemCounter.BYTES_READ);
}
if (writeBytesCounter == null) {
writeBytesCounter = counters.findCounter(scheme,
FileSystemCounter.BYTES_WRITTEN);
}
if (readOpsCounter == null) {
readOpsCounter = counters.findCounter(scheme,
FileSystemCounter.READ_OPS);
}
if (largeReadOpsCounter == null) {
largeReadOpsCounter = counters.findCounter(scheme,
FileSystemCounter.LARGE_READ_OPS);
}
if (writeOpsCounter == null) {
writeOpsCounter = counters.findCounter(scheme,
FileSystemCounter.WRITE_OPS);
}
long readBytes = 0;
long writeBytes = 0;
long readOps = 0;
long largeReadOps = 0;
long writeOps = 0;
for (FileSystem.Statistics stat: stats) {
readBytes = readBytes + stat.getBytesRead();
writeBytes = writeBytes + stat.getBytesWritten();
readOps = readOps + stat.getReadOps();
largeReadOps = largeReadOps + stat.getLargeReadOps();
writeOps = writeOps + stat.getWriteOps();
}
readBytesCounter.setValue(readBytes);
writeBytesCounter.setValue(writeBytes);
readOpsCounter.setValue(readOps);
largeReadOpsCounter.setValue(largeReadOps);
writeOpsCounter.setValue(writeOps);
}
private long getInputBytes(List<Statistics> stats) {
if (stats == null) return 0;
long bytesRead = 0;
for (Statistics stat: stats) {
bytesRead = bytesRead + stat.getBytesRead();
}
return bytesRead;
}
private long getInputBytes(List<Statistics> stats) {
if (stats == null) return 0;
long bytesRead = 0;
for (Statistics stat: stats) {
bytesRead = bytesRead + stat.getBytesRead();
}
return bytesRead;
}