下面列出了org.apache.hadoop.io.SecureIOUtils#openForRead ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static FileInputStream openLogFileForRead(String containerIdStr, File logFile,
Context context) throws IOException {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationId applicationId = containerId.getApplicationAttemptId()
.getApplicationId();
String user = context.getApplications().get(
applicationId).getUser();
try {
return SecureIOUtils.openForRead(logFile, user, null);
} catch (IOException e) {
if (e.getMessage().contains(
"did not match expected owner '" + user
+ "'")) {
LOG.error(
"Exception reading log file " + logFile.getAbsolutePath(), e);
throw new IOException("Exception reading log file. Application submitted by '"
+ user
+ "' doesn't own requested log file : "
+ logFile.getName(), e);
} else {
throw new IOException("Exception reading log file. It might be because log "
+ "file was aggregated : " + logFile.getName(), e);
}
}
}
public static FileInputStream openLogFileForRead(String containerIdStr, File logFile,
Context context) throws IOException {
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
ApplicationId applicationId = containerId.getApplicationAttemptId()
.getApplicationId();
String user = context.getApplications().get(
applicationId).getUser();
try {
return SecureIOUtils.openForRead(logFile, user, null);
} catch (IOException e) {
if (e.getMessage().contains(
"did not match expected owner '" + user
+ "'")) {
LOG.error(
"Exception reading log file " + logFile.getAbsolutePath(), e);
throw new IOException("Exception reading log file. Application submitted by '"
+ user
+ "' doesn't own requested log file : "
+ logFile.getName(), e);
} else {
throw new IOException("Exception reading log file. It might be because log "
+ "file was aggregated : " + logFile.getName(), e);
}
}
}
/**
* Read a log file from start to end positions. The offsets may be negative,
* in which case they are relative to the end of the file. For example,
* Reader(taskid, kind, 0, -1) is the entire file and
* Reader(taskid, kind, -4197, -1) is the last 4196 bytes.
* @param taskid the id of the task to read the log file for
* @param kind the kind of log to read
* @param start the offset to read from (negative is relative to tail)
* @param end the offset to read upto (negative is relative to tail)
* @param isCleanup whether the attempt is cleanup attempt or not
* @throws IOException
*/
public Reader(TaskAttemptID taskid, LogName kind,
long start, long end, boolean isCleanup) throws IOException {
// find the right log file
LogFileDetail fileDetail = getLogFileDetail(taskid, kind, isCleanup);
// calculate the start and stop
long size = fileDetail.length;
if (start < 0) {
start += size + 1;
}
if (end < 0) {
end += size + 1;
}
start = Math.max(0, Math.min(start, size));
end = Math.max(0, Math.min(end, size));
start += fileDetail.start;
end += fileDetail.start;
bytesRemaining = end - start;
String owner = obtainLogDirOwner(taskid);
file = SecureIOUtils.openForRead(new File(fileDetail.location, kind.toString()),
owner, null);
// skip upto start
long pos = 0;
while (pos < start) {
long result = file.skip(start - pos);
if (result < 0) {
bytesRemaining = 0;
break;
}
pos += result;
}
}
/**
* Read a log file from start to end positions. The offsets may be negative,
* in which case they are relative to the end of the file. For example,
* Reader(taskid, kind, 0, -1) is the entire file and
* Reader(taskid, kind, -4197, -1) is the last 4196 bytes.
* @param taskid the id of the task to read the log file for
* @param kind the kind of log to read
* @param start the offset to read from (negative is relative to tail)
* @param end the offset to read upto (negative is relative to tail)
* @param isCleanup whether the attempt is cleanup attempt or not
* @throws IOException
*/
public Reader(TaskAttemptID taskid, LogName kind,
long start, long end, boolean isCleanup) throws IOException {
// find the right log file
LogFileDetail fileDetail = getLogFileDetail(taskid, kind, isCleanup);
// calculate the start and stop
long size = fileDetail.length;
if (start < 0) {
start += size + 1;
}
if (end < 0) {
end += size + 1;
}
start = Math.max(0, Math.min(start, size));
end = Math.max(0, Math.min(end, size));
start += fileDetail.start;
end += fileDetail.start;
bytesRemaining = end - start;
String owner = obtainLogDirOwner(taskid);
file = SecureIOUtils.openForRead(new File(fileDetail.location, kind.toString()),
owner, null);
// skip upto start
long pos = 0;
while (pos < start) {
long result = file.skip(start - pos);
if (result < 0) {
bytesRemaining = 0;
break;
}
pos += result;
}
}
@VisibleForTesting
public FileInputStream secureOpenFile(File logFile) throws IOException {
return SecureIOUtils.openForRead(logFile, getUser(), null);
}
private static LogFileDetail getLogFileDetail(TaskAttemptID taskid,
LogName filter,
boolean isCleanup)
throws IOException {
File indexFile = getIndexFile(taskid, isCleanup);
BufferedReader fis = new BufferedReader(new InputStreamReader(
SecureIOUtils.openForRead(indexFile, obtainLogDirOwner(taskid), null),
Charsets.UTF_8));
//the format of the index file is
//LOG_DIR: <the dir where the task logs are really stored>
//stdout:<start-offset in the stdout file> <length>
//stderr:<start-offset in the stderr file> <length>
//syslog:<start-offset in the syslog file> <length>
LogFileDetail l = new LogFileDetail();
String str = null;
try {
str = fis.readLine();
if (str == null) { // the file doesn't have anything
throw new IOException("Index file for the log of " + taskid
+ " doesn't exist.");
}
l.location = str.substring(str.indexOf(LogFileDetail.LOCATION)
+ LogFileDetail.LOCATION.length());
// special cases are the debugout and profile.out files. They are
// guaranteed
// to be associated with each task attempt since jvm reuse is disabled
// when profiling/debugging is enabled
if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
l.length = new File(l.location, filter.toString()).length();
l.start = 0;
fis.close();
return l;
}
str = fis.readLine();
while (str != null) {
// look for the exact line containing the logname
if (str.contains(filter.toString())) {
str = str.substring(filter.toString().length() + 1);
String[] startAndLen = str.split(" ");
l.start = Long.parseLong(startAndLen[0]);
l.length = Long.parseLong(startAndLen[1]);
break;
}
str = fis.readLine();
}
fis.close();
fis = null;
} finally {
IOUtils.cleanup(LOG, fis);
}
return l;
}
@VisibleForTesting
public FileInputStream secureOpenFile(File logFile) throws IOException {
return SecureIOUtils.openForRead(logFile, getUser(), null);
}
private static LogFileDetail getLogFileDetail(TaskAttemptID taskid,
LogName filter,
boolean isCleanup)
throws IOException {
File indexFile = getIndexFile(taskid, isCleanup);
BufferedReader fis = new BufferedReader(new InputStreamReader(
SecureIOUtils.openForRead(indexFile, obtainLogDirOwner(taskid), null),
Charsets.UTF_8));
//the format of the index file is
//LOG_DIR: <the dir where the task logs are really stored>
//stdout:<start-offset in the stdout file> <length>
//stderr:<start-offset in the stderr file> <length>
//syslog:<start-offset in the syslog file> <length>
LogFileDetail l = new LogFileDetail();
String str = null;
try {
str = fis.readLine();
if (str == null) { // the file doesn't have anything
throw new IOException("Index file for the log of " + taskid
+ " doesn't exist.");
}
l.location = str.substring(str.indexOf(LogFileDetail.LOCATION)
+ LogFileDetail.LOCATION.length());
// special cases are the debugout and profile.out files. They are
// guaranteed
// to be associated with each task attempt since jvm reuse is disabled
// when profiling/debugging is enabled
if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
l.length = new File(l.location, filter.toString()).length();
l.start = 0;
fis.close();
return l;
}
str = fis.readLine();
while (str != null) {
// look for the exact line containing the logname
if (str.contains(filter.toString())) {
str = str.substring(filter.toString().length() + 1);
String[] startAndLen = str.split(" ");
l.start = Long.parseLong(startAndLen[0]);
l.length = Long.parseLong(startAndLen[1]);
break;
}
str = fis.readLine();
}
fis.close();
fis = null;
} finally {
IOUtils.cleanup(LOG, fis);
}
return l;
}