下面列出了怎么用org.apache.hadoop.io.file.tfile.TFile的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Read the next key and return the value-stream.
*
* @param key
* @return the valueStream if there are more keys or null otherwise.
* @throws IOException
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
/**
* Read the next key and return the value-stream.
*
* @param key
* @return the valueStream if there are more keys or null otherwise.
* @throws IOException
*/
public DataInputStream next(LogKey key) throws IOException {
if (!this.atBeginning) {
this.scanner.advance();
} else {
this.atBeginning = false;
}
if (this.scanner.atEnd()) {
return null;
}
TFile.Reader.Scanner.Entry entry = this.scanner.entry();
key.readFields(entry.getKeyStream());
// Skip META keys
if (RESERVED_KEYS.containsKey(key.toString())) {
return next(key);
}
DataInputStream valueStream = entry.getValueStream();
return valueStream;
}
/**
* @param path
* @return
* @throws IOException
*/
private TFile.Reader.Scanner getScanner(final Path path) throws IOException {
LOG.log(Level.FINE, "Creating Scanner for path {0}", path);
final TFile.Reader reader = new TFile.Reader(this.fileSystem.open(path),
this.fileSystem.getFileStatus(path).getLen(),
this.configuration);
final TFile.Reader.Scanner scanner = reader.createScanner();
for (int counter = 0;
counter < 3 && !scanner.atEnd();
counter += 1) {
//skip VERSION, APPLICATION_ACL, and APPLICATION_OWNER
scanner.advance();
}
LOG.log(Level.FINE, "Created Scanner for path {0}", path);
return scanner;
}
private void populateKV(TFile.Reader.Scanner.Entry entry) throws IOException {
entry.getKey(keyBytesWritable);
//splitpath contains the machine name. Create the key as splitPath + realKey
String keyStr = new StringBuilder()
.append(splitPath.getName()).append(":")
.append(new String(keyBytesWritable.getBytes()))
.toString();
/**
* In certain cases, values can be huge (files > 2 GB). Stream is
* better to handle such scenarios.
*/
currentValueReader = new BufferedReader(
new InputStreamReader(entry.getValueStream()));
key.set(keyStr);
String line = currentValueReader.readLine();
value.set((line == null) ? "" : line);
}
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
UserGroupInformation userUgi) throws IOException {
try {
this.fsDataOStream =
userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
@Override
public FSDataOutputStream run() throws Exception {
fc = FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
fc.setUMask(APP_LOG_FILE_UMASK);
return fc.create(
remoteAppLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
// Keys are not sorted: null arg
// 256KB minBlockSize : Expected log size for each container too
this.writer =
new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
//Write the version string
writeVersion();
}
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext =
FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
}
/**
* Returns the owner of the application.
*
* @return the application owner.
* @throws IOException
*/
public String getApplicationOwner() throws IOException {
TFile.Reader.Scanner ownerScanner = reader.createScanner();
LogKey key = new LogKey();
while (!ownerScanner.atEnd()) {
TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
key.readFields(entry.getKeyStream());
if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
DataInputStream valueStream = entry.getValueStream();
return valueStream.readUTF();
}
ownerScanner.advance();
}
return null;
}
public HistoryFileReader(Path historyFile) throws IOException {
fsdis = fs.open(historyFile);
reader =
new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
getConfig());
reset();
}
public Entry next() throws IOException {
TFile.Reader.Scanner.Entry entry = scanner.entry();
DataInputStream dis = entry.getKeyStream();
HistoryDataKey key = new HistoryDataKey();
key.readFields(dis);
dis = entry.getValueStream();
byte[] value = new byte[entry.getValueLength()];
dis.read(value);
scanner.advance();
return new Entry(key, value);
}
public HistoryFileWriter(Path historyFile) throws IOException {
if (fs.exists(historyFile)) {
fsdos = fs.append(historyFile);
} else {
fsdos = fs.create(historyFile);
}
fs.setPermission(historyFile, HISTORY_FILE_UMASK);
writer =
new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
getConfig());
}
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
UserGroupInformation userUgi) throws IOException {
try {
this.fsDataOStream =
userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
@Override
public FSDataOutputStream run() throws Exception {
fc = FileContext.getFileContext(conf);
fc.setUMask(APP_LOG_FILE_UMASK);
return fc.create(
remoteAppLogFile,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
new Options.CreateOpts[] {});
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
// Keys are not sorted: null arg
// 256KB minBlockSize : Expected log size for each container too
this.writer =
new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE), null, conf);
//Write the version string
writeVersion();
}
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext = FileContext.getFileContext(conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
}
/**
* Returns the owner of the application.
*
* @return the application owner.
* @throws IOException
*/
public String getApplicationOwner() throws IOException {
TFile.Reader.Scanner ownerScanner = reader.createScanner();
LogKey key = new LogKey();
while (!ownerScanner.atEnd()) {
TFile.Reader.Scanner.Entry entry = ownerScanner.entry();
key.readFields(entry.getKeyStream());
if (key.toString().equals(APPLICATION_OWNER_KEY.toString())) {
DataInputStream valueStream = entry.getValueStream();
return valueStream.readUTF();
}
ownerScanner.advance();
}
return null;
}
public HistoryFileReader(Path historyFile) throws IOException {
fsdis = fs.open(historyFile);
reader =
new TFile.Reader(fsdis, fs.getFileStatus(historyFile).getLen(),
getConfig());
reset();
}
public Entry next() throws IOException {
TFile.Reader.Scanner.Entry entry = scanner.entry();
DataInputStream dis = entry.getKeyStream();
HistoryDataKey key = new HistoryDataKey();
key.readFields(dis);
dis = entry.getValueStream();
byte[] value = new byte[entry.getValueLength()];
dis.read(value);
scanner.advance();
return new Entry(key, value);
}
public HistoryFileWriter(Path historyFile) throws IOException {
if (fs.exists(historyFile)) {
fsdos = fs.append(historyFile);
} else {
fsdos = fs.create(historyFile);
}
fs.setPermission(historyFile, HISTORY_FILE_UMASK);
writer =
new TFile.Writer(fsdos, MIN_BLOCK_SIZE, getConfig().get(
YarnConfiguration.FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_FS_APPLICATION_HISTORY_STORE_COMPRESSION_TYPE), null,
getConfig());
}
/**
* Parses the given file and writes its contents into the outputWriter for all logs in it.
*
* @param inputPath
* @param outputWriter
* @throws IOException
*/
void parseOneFile(final Path inputPath, final Writer outputWriter) throws IOException {
try (TFile.Reader.Scanner scanner = this.getScanner(inputPath)) {
while (!scanner.atEnd()) {
new LogFileEntry(scanner.entry()).write(outputWriter);
scanner.advance();
}
}
}
/**
* Parses the given file and stores the logs for each container in a file named after the container in the given.
* outputFolder
*
* @param inputPath
* @param outputFolder
* @throws IOException
*/
void parseOneFile(final Path inputPath, final File outputFolder) throws IOException {
try (TFile.Reader.Scanner scanner = this.getScanner(inputPath)) {
while (!scanner.atEnd()) {
new LogFileEntry(scanner.entry()).write(outputFolder);
scanner.advance();
}
}
}
@Override public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) split;
LOG.info("Initializing TFileRecordReader : " + fileSplit.getPath().toString());
start = fileSplit.getStart();
end = start + fileSplit.getLength();
FileSystem fs = fileSplit.getPath().getFileSystem(context.getConfiguration());
splitPath = fileSplit.getPath();
fin = fs.open(splitPath);
reader = new TFile.Reader(fin, fs.getFileStatus(splitPath).getLen(),
context.getConfiguration());
scanner = reader.createScannerByByteRange(start, fileSplit.getLength());
}
@VisibleForTesting
public TFile.Writer getWriter() {
return this.writer;
}
@VisibleForTesting
public TFile.Writer getWriter() {
return this.writer;
}
LogFileEntry(final TFile.Reader.Scanner.Entry entry) {
this.entry = entry;
}