下面列出了org.apache.hadoop.fs.FileContext#open ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static FSDataInputStream getPreviousJobHistoryFileStream(
Configuration conf, ApplicationAttemptId applicationAttemptId)
throws IOException {
FSDataInputStream in = null;
Path historyFile = null;
String jobId =
TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
.toString();
String jobhistoryDir =
JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
Path histDirPath =
FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
// read the previous history file
historyFile =
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
jobId, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile);
in = fc.open(historyFile);
return in;
}
public static FSDataInputStream getPreviousJobHistoryFileStream(
Configuration conf, ApplicationAttemptId applicationAttemptId)
throws IOException {
FSDataInputStream in = null;
Path historyFile = null;
String jobId =
TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
.toString();
String jobhistoryDir =
JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
Path histDirPath =
FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
// read the previous history file
historyFile =
fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
jobId, (applicationAttemptId.getAttemptId() - 1)));
LOG.info("History file is at " + historyFile);
in = fc.open(historyFile);
return in;
}
private void testCopyPartialHelper(int dataSize, int offset, long size) throws IOException
{
FileUtils.deleteQuietly(new File("target/IOUtilsTest"));
File file = new File("target/IOUtilsTest/testCopyPartial/input");
createDataFile(file, dataSize);
FileContext fileContext = FileContext.getFileContext();
DataInputStream inputStream = fileContext.open(new Path(file.getAbsolutePath()));
Path output = new Path("target/IOUtilsTest/testCopyPartial/output");
DataOutputStream outputStream = fileContext.create(output, EnumSet
.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent());
if (offset == 0) {
IOUtils.copyPartial(inputStream, size, outputStream);
} else {
IOUtils.copyPartial(inputStream, offset, size, outputStream);
}
outputStream.close();
Assert.assertTrue("output exists", fileContext.util().exists(output));
Assert.assertEquals("output size", size, fileContext.getFileStatus(output).getLen());
// FileUtils.deleteQuietly(new File("target/IOUtilsTest"));
}
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext =
FileContext.getFileContext(remoteAppLogFile.toUri(), conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
}
private String getJobSummary(FileContext fc, Path path) throws IOException {
Path qPath = fc.makeQualified(path);
FSDataInputStream in = fc.open(qPath);
String jobSummaryString = in.readUTF();
in.close();
return jobSummaryString;
}
private static String getJobSummary(FileContext fc, Path path)
throws IOException {
Path qPath = fc.makeQualified(path);
FSDataInputStream in = fc.open(qPath);
String jobSummaryString = in.readUTF();
in.close();
return jobSummaryString;
}
public LogReader(Configuration conf, Path remoteAppLogFile)
throws IOException {
FileContext fileContext = FileContext.getFileContext(conf);
this.fsDataIStream = fileContext.open(remoteAppLogFile);
reader =
new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
remoteAppLogFile).getLen(), conf);
this.scanner = reader.createScanner();
}
private String getJobSummary(FileContext fc, Path path) throws IOException {
Path qPath = fc.makeQualified(path);
FSDataInputStream in = fc.open(qPath);
String jobSummaryString = in.readUTF();
in.close();
return jobSummaryString;
}
private static String getJobSummary(FileContext fc, Path path)
throws IOException {
Path qPath = fc.makeQualified(path);
FSDataInputStream in = fc.open(qPath);
String jobSummaryString = in.readUTF();
in.close();
return jobSummaryString;
}
@Override
public Object restore() throws IOException
{
FileContext fc = FileContext.getFileContext(fs.getUri());
// recover from wherever it was left
if (fc.util().exists(snapshotBackupPath)) {
LOG.warn("Incomplete checkpoint, reverting to {}", snapshotBackupPath);
fc.rename(snapshotBackupPath, snapshotPath, Rename.OVERWRITE);
// combine logs (w/o append, create new file)
Path tmpLogPath = new Path(basedir, "log.combined");
try (FSDataOutputStream fsOut = fc.create(tmpLogPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE))) {
try (FSDataInputStream fsIn = fc.open(logBackupPath)) {
IOUtils.copy(fsIn, fsOut);
}
try (FSDataInputStream fsIn = fc.open(logPath)) {
IOUtils.copy(fsIn, fsOut);
}
}
fc.rename(tmpLogPath, logPath, Rename.OVERWRITE);
fc.delete(logBackupPath, false);
} else {
// we have log backup, but no checkpoint backup
// failure between log rotation and writing checkpoint
if (fc.util().exists(logBackupPath)) {
LOG.warn("Found {}, did checkpointing fail?", logBackupPath);
fc.rename(logBackupPath, logPath, Rename.OVERWRITE);
}
}
if (!fc.util().exists(snapshotPath)) {
LOG.debug("No existing checkpoint.");
return null;
}
LOG.debug("Reading checkpoint {}", snapshotPath);
InputStream is = fc.open(snapshotPath);
// indeterministic class loading behavior
// http://stackoverflow.com/questions/9110677/readresolve-not-working-an-instance-of-guavas-serializedform-appears
final ClassLoader loader = Thread.currentThread().getContextClassLoader();
try (ObjectInputStream ois = new ObjectInputStream(is)
{
@Override
protected Class<?> resolveClass(ObjectStreamClass objectStreamClass)
throws IOException, ClassNotFoundException
{
return Class.forName(objectStreamClass.getName(), true, loader);
}
}) {
return ois.readObject();
} catch (ClassNotFoundException cnfe) {
throw new IOException("Failed to read checkpointed state", cnfe);
}
}
@Override
public Object restore() throws IOException
{
FileContext fc = FileContext.getFileContext(fs.getUri());
// recover from wherever it was left
if (fc.util().exists(snapshotBackupPath)) {
LOG.warn("Incomplete checkpoint, reverting to {}", snapshotBackupPath);
fc.rename(snapshotBackupPath, snapshotPath, Rename.OVERWRITE);
// combine logs (w/o append, create new file)
Path tmpLogPath = new Path(basedir, "log.combined");
try (FSDataOutputStream fsOut = fc.create(tmpLogPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE))) {
try (FSDataInputStream fsIn = fc.open(logBackupPath)) {
IOUtils.copy(fsIn, fsOut);
}
try (FSDataInputStream fsIn = fc.open(logPath)) {
IOUtils.copy(fsIn, fsOut);
}
}
fc.rename(tmpLogPath, logPath, Rename.OVERWRITE);
fc.delete(logBackupPath, false);
} else {
// we have log backup, but no checkpoint backup
// failure between log rotation and writing checkpoint
if (fc.util().exists(logBackupPath)) {
LOG.warn("Found {}, did checkpointing fail?", logBackupPath);
fc.rename(logBackupPath, logPath, Rename.OVERWRITE);
}
}
if (!fc.util().exists(snapshotPath)) {
LOG.debug("No existing checkpoint.");
return null;
}
LOG.debug("Reading checkpoint {}", snapshotPath);
InputStream is = fc.open(snapshotPath);
// indeterministic class loading behavior
// http://stackoverflow.com/questions/9110677/readresolve-not-working-an-instance-of-guavas-serializedform-appears
final ClassLoader loader = Thread.currentThread().getContextClassLoader();
try (ObjectInputStream ois = new ObjectInputStream(is)
{
@Override
protected Class<?> resolveClass(ObjectStreamClass objectStreamClass)
throws IOException, ClassNotFoundException
{
return Class.forName(objectStreamClass.getName(), true, loader);
}
}) {
return ois.readObject();
} catch (ClassNotFoundException cnfe) {
throw new IOException("Failed to read checkpointed state", cnfe);
}
}