下面列出了怎么用org.apache.hadoop.mapred.TaskLog.LogName的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* test without TASK_LOG_DIR
*
* @throws IOException
*/
@Test (timeout=50000)
public void testTaskLogWithoutTaskLogDir() throws IOException {
// TaskLog tasklog= new TaskLog();
System.clearProperty(YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR);
// test TaskLog
assertEquals(TaskLog.getMRv2LogDir(), null);
TaskAttemptID taid = mock(TaskAttemptID.class);
JobID jid = new JobID("job", 1);
when(taid.getJobID()).thenReturn(jid);
when(taid.toString()).thenReturn("JobId");
File f = TaskLog.getTaskLogFile(taid, true, LogName.STDOUT);
assertTrue(f.getAbsolutePath().endsWith("stdout"));
}
/**
* test without TASK_LOG_DIR
*
* @throws IOException
*/
@Test (timeout=50000)
public void testTaskLogWithoutTaskLogDir() throws IOException {
// TaskLog tasklog= new TaskLog();
System.clearProperty(YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR);
// test TaskLog
assertEquals(TaskLog.getMRv2LogDir(), null);
TaskAttemptID taid = mock(TaskAttemptID.class);
JobID jid = new JobID("job", 1);
when(taid.getJobID()).thenReturn(jid);
when(taid.toString()).thenReturn("JobId");
File f = TaskLog.getTaskLogFile(taid, true, LogName.STDOUT);
assertTrue(f.getAbsolutePath().endsWith("stdout"));
}
/**
* @param lInfo
* @param taskLogFileDetails
* @param updatedTaskLogFileDetails
* @param logName
*/
private void revertIndexFileInfo(PerJVMInfo lInfo,
Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails,
Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails,
LogName logName) {
if (TaskLog.LOGS_TRACKED_BY_INDEX_FILES.contains(logName)) {
for (Task task : lInfo.allAttempts) {
if (!updatedTaskLogFileDetails.containsKey(task)) {
updatedTaskLogFileDetails.put(task,
new HashMap<LogName, LogFileDetail>());
}
updatedTaskLogFileDetails.get(task).put(logName,
taskLogFileDetails.get(task).get(logName));
}
}
}
/**
* Check if truncation of logs is needed for the given jvmInfo. If all the
* tasks that ran in a JVM are within the log-limits, then truncation is not
* needed. Otherwise it is needed.
*
* @param lInfo
* @param taskLogFileDetails
* @param logName
* @return true if truncation is needed, false otherwise
*/
private boolean isTruncationNeeded(PerJVMInfo lInfo,
Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails,
LogName logName) {
boolean truncationNeeded = false;
LogFileDetail logFileDetail = null;
for (Task task : lInfo.allAttempts) {
long taskRetainSize =
(task.isMapTask() ? mapRetainSize : reduceRetainSize);
Map<LogName, LogFileDetail> allLogsFileDetails =
taskLogFileDetails.get(task);
logFileDetail = allLogsFileDetails.get(logName);
if (taskRetainSize > MINIMUM_RETAIN_SIZE_FOR_TRUNCATION
&& logFileDetail.length > taskRetainSize) {
truncationNeeded = true;
break;
}
}
return truncationNeeded;
}
/**
* Reads tasklog and returns it as string after trimming it.
*
* @param filter
* Task log filter; can be STDOUT, STDERR, SYSLOG, DEBUGOUT, PROFILE
* @param taskId
* The task id for which the log has to collected
* @param isCleanup
* whether the task is a cleanup attempt or not.
* @return task log as string
* @throws IOException
*/
public static String readTaskLog(TaskLog.LogName filter,
org.apache.hadoop.mapred.TaskAttemptID taskId, boolean isCleanup)
throws IOException {
// string buffer to store task log
StringBuffer result = new StringBuffer();
int res;
// reads the whole tasklog into inputstream
InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1,
isCleanup);
// construct string log from inputstream.
byte[] b = new byte[65536];
while (true) {
res = taskLogReader.read(b);
if (res > 0) {
result.append(new String(b));
} else {
break;
}
}
taskLogReader.close();
// trim the string and return it
String str = result.toString();
str = str.trim();
return str;
}
public String readTaskLog(TaskLog.LogName filter,
org.apache.hadoop.mapred.TaskAttemptID taskId, boolean isCleanup)
throws IOException {
// string buffer to store task log
StringBuffer result = new StringBuffer();
int res;
// reads the whole tasklog into inputstream
InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1,
isCleanup);
// construct string log from inputstream.
byte[] b = new byte[65536];
while (true) {
res = taskLogReader.read(b);
if (res > 0) {
result.append(new String(b));
} else {
break;
}
}
taskLogReader.close();
// trim the string and return it
String str = result.toString();
str = str.trim();
return str;
}
/**
* Reads tasklog and returns it as string after trimming it.
*
* @param filter
* Task log filter; can be STDOUT, STDERR, SYSLOG, DEBUGOUT, PROFILE
* @param taskId
* The task id for which the log has to collected
* @param isCleanup
* whether the task is a cleanup attempt or not.
* @return task log as string
* @throws IOException
*/
public static String readTaskLog(TaskLog.LogName filter,
org.apache.hadoop.mapred.TaskAttemptID taskId, boolean isCleanup)
throws IOException {
// string buffer to store task log
StringBuffer result = new StringBuffer();
int res;
// reads the whole tasklog into inputstream
InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1,
isCleanup);
// construct string log from inputstream.
byte[] b = new byte[65536];
while (true) {
res = taskLogReader.read(b);
if (res > 0) {
result.append(new String(b));
} else {
break;
}
}
taskLogReader.close();
// trim the string and return it
String str = result.toString();
str = str.trim();
return str;
}
public String readTaskLog(TaskLog.LogName filter,
org.apache.hadoop.mapred.TaskAttemptID taskId, boolean isCleanup)
throws IOException {
// string buffer to store task log
StringBuffer result = new StringBuffer();
int res;
// reads the whole tasklog into inputstream
InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1,
isCleanup);
// construct string log from inputstream.
byte[] b = new byte[65536];
while (true) {
res = taskLogReader.read(b);
if (res > 0) {
result.append(new String(b));
} else {
break;
}
}
taskLogReader.close();
// trim the string and return it
String str = result.toString();
str = str.trim();
return str;
}
/**
* Get the logFileDetails of all the list of attempts passed.
*
* @param lInfo
* @return a map of task to the log-file detail
* @throws IOException
*/
private Map<Task, Map<LogName, LogFileDetail>> getAllLogsFileDetails(
final List<Task> allAttempts) throws IOException {
Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails =
new HashMap<Task, Map<LogName, LogFileDetail>>();
for (Task task : allAttempts) {
Map<LogName, LogFileDetail> allLogsFileDetails;
allLogsFileDetails =
TaskLog.getAllLogsFileDetails(task.getTaskID(),
task.isTaskCleanupTask());
taskLogFileDetails.put(task, allLogsFileDetails);
}
return taskLogFileDetails;
}
/**
* Truncation of logs is done. Now sync the index files to reflect the
* truncated sizes.
*
* @param firstAttempt
* @param updatedTaskLogFileDetails
*/
private void updateIndicesAfterLogTruncation(TaskAttemptID firstAttempt,
Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails) {
for (Entry<Task, Map<LogName, LogFileDetail>> entry :
updatedTaskLogFileDetails.entrySet()) {
Task task = entry.getKey();
Map<LogName, LogFileDetail> logFileDetails = entry.getValue();
Map<LogName, Long[]> logLengths = new HashMap<LogName, Long[]>();
// set current and previous lengths
for (LogName logName : TaskLog.LOGS_TRACKED_BY_INDEX_FILES) {
logLengths.put(logName, new Long[] { Long.valueOf(0L),
Long.valueOf(0L) });
LogFileDetail lfd = logFileDetails.get(logName);
if (lfd != null) {
// Set previous lengths
logLengths.get(logName)[0] = Long.valueOf(lfd.start);
// Set current lengths
logLengths.get(logName)[1] = Long.valueOf(lfd.start + lfd.length);
}
}
try {
TaskLog.writeToIndexFile(firstAttempt, task.getTaskID(),
task.isTaskCleanupTask(), logLengths);
} catch (IOException ioe) {
LOG.warn("Exception in updateIndicesAfterLogTruncation : "
+ StringUtils.stringifyException(ioe));
LOG.warn("Exception encountered while updating index file of task "
+ task.getTaskID()
+ ". Ignoring and continuing with other tasks.");
}
}
}
void writeRealBytes(TaskAttemptID firstAttemptID,
TaskAttemptID attemptID, LogName logName, long numBytes, char data)
throws IOException {
File logFile = TaskLog.getTaskLogFile(firstAttemptID, logName);
LOG.info("Going to write " + numBytes + " real bytes to the log file "
+ logFile);
if (!logFile.getParentFile().exists()
&& !logFile.getParentFile().mkdirs()) {
throw new IOException("Couldn't create all ancestor dirs for "
+ logFile);
}
File attemptDir = TaskLog.getBaseDir(attemptID.toString());
if (!attemptDir.exists() && !attemptDir.mkdirs()) {
throw new IOException("Couldn't create all ancestor dirs for "
+ logFile);
}
// Need to call up front to set currenttaskid.
TaskLog.syncLogs(firstAttemptID, attemptID);
FileWriter writer = new FileWriter(logFile, true);
for (long i = 0; i < numBytes; i++) {
writer.write(data);
}
writer.close();
TaskLog.syncLogs(firstAttemptID, attemptID);
LOG.info("Written " + numBytes + " real bytes to the log file "
+ logFile);
}
/**
* Test cases which don't need any truncation of log-files. Without JVM-reuse.
*
* @throws IOException
*/
@Test
public void testNoTruncationNeeded() throws IOException {
TaskTracker taskTracker = new TaskTracker();
TaskLogsMonitor logsMonitor = new TaskLogsMonitor(1000L, 1000L);
taskTracker.setTaskLogsMonitor(logsMonitor);
TaskID baseId = new TaskID();
int taskcount = 0;
TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
// Let the tasks write logs within retain-size
writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 500, 'H');
logsMonitor.monitorTaskLogs();
File attemptDir = TaskLog.getBaseDir(attemptID.toString());
assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
// Finish the task and the JVM too.
logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
// There should be no truncation of the log-file.
logsMonitor.monitorTaskLogs();
assertTrue(attemptDir.exists());
File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
assertEquals(500, logFile.length());
// The index file should also be proper.
assertEquals(500, getAllLogsFileLengths(attemptID, false).get(
LogName.SYSLOG).longValue());
logsMonitor.monitorTaskLogs();
assertEquals(500, logFile.length());
}
/**
* Test the disabling of truncation of log-file.
*
* @throws IOException
*/
@Test
public void testDisabledLogTruncation() throws IOException {
TaskTracker taskTracker = new TaskTracker();
// Anything less than 0 disables the truncation.
TaskLogsMonitor logsMonitor = new TaskLogsMonitor(-1L, -1L);
taskTracker.setTaskLogsMonitor(logsMonitor);
TaskID baseId = new TaskID();
int taskcount = 0;
TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
// Let the tasks write some logs
writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
logsMonitor.monitorTaskLogs();
File attemptDir = TaskLog.getBaseDir(attemptID.toString());
assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
// Finish the task and the JVM too.
logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
// The log-file should not be truncated.
logsMonitor.monitorTaskLogs();
assertTrue(attemptDir.exists());
File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
assertEquals(1500, logFile.length());
// The index file should also be proper.
assertEquals(1500, getAllLogsFileLengths(attemptID, false).get(
LogName.SYSLOG).longValue());
}
/**
* Test the truncation of log-file when JVMs are not reused.
*
* @throws IOException
*/
@Test
public void testLogTruncationOnFinishing() throws IOException {
TaskTracker taskTracker = new TaskTracker();
TaskLogsMonitor logsMonitor = new TaskLogsMonitor(1000L, 1000L);
taskTracker.setTaskLogsMonitor(logsMonitor);
TaskID baseId = new TaskID();
int taskcount = 0;
TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
// Let the tasks write logs more than retain-size
writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
logsMonitor.monitorTaskLogs();
File attemptDir = TaskLog.getBaseDir(attemptID.toString());
assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
// Finish the task and the JVM too.
logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
// The log-file should now be truncated.
logsMonitor.monitorTaskLogs();
assertTrue(attemptDir.exists());
File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
assertEquals(1000, logFile.length());
// The index file should also be proper.
assertEquals(1000, getAllLogsFileLengths(attemptID, false).get(
LogName.SYSLOG).longValue());
logsMonitor.monitorTaskLogs();
assertEquals(1000, logFile.length());
}
private static String getTaskLogFile(LogName filter) {
return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR +
filter.toString();
}
public static void setVMEnv(Map<String, String> environment,
Task task) {
JobConf conf = task.conf;
// Add the env variables passed by the user
String mapredChildEnv = getChildEnv(conf, task.isMapTask());
MRApps.setEnvFromInputString(environment, mapredChildEnv, conf);
// Set logging level in the environment.
// This is so that, if the child forks another "bin/hadoop" (common in
// streaming) it will have the correct loglevel.
environment.put(
"HADOOP_ROOT_LOGGER",
MRApps.getChildLogLevel(conf, task.isMapTask()) + ",console");
// TODO: The following is useful for instance in streaming tasks. Should be
// set in ApplicationMaster's env by the RM.
String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
if (hadoopClientOpts == null) {
hadoopClientOpts = "";
} else {
hadoopClientOpts = hadoopClientOpts + " ";
}
environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
// setEnvFromInputString above will add env variable values from
// mapredChildEnv to existing variables. We want to overwrite
// HADOOP_ROOT_LOGGER and HADOOP_CLIENT_OPTS if the user set it explicitly.
Map<String, String> tmpEnv = new HashMap<String, String>();
MRApps.setEnvFromInputString(tmpEnv, mapredChildEnv, conf);
String[] keys = { "HADOOP_ROOT_LOGGER", "HADOOP_CLIENT_OPTS" };
for (String key : keys) {
if (tmpEnv.containsKey(key)) {
environment.put(key, tmpEnv.get(key));
}
}
// Add stdout/stderr env
environment.put(
MRJobConfig.STDOUT_LOGFILE_ENV,
getTaskLogFile(TaskLog.LogName.STDOUT)
);
environment.put(
MRJobConfig.STDERR_LOGFILE_ENV,
getTaskLogFile(TaskLog.LogName.STDERR)
);
}
public static List<String> getVMCommand(
InetSocketAddress taskAttemptListenerAddr, Task task,
JVMId jvmID) {
TaskAttemptID attemptID = task.getTaskID();
JobConf conf = task.conf;
Vector<String> vargs = new Vector<String>(8);
vargs.add(MRApps.crossPlatformifyMREnv(task.conf, Environment.JAVA_HOME)
+ "/bin/java");
// Add child (task) java-vm options.
//
// The following symbols if present in mapred.{map|reduce}.child.java.opts
// value are replaced:
// + @[email protected] is interpolated with value of TaskID.
// Other occurrences of @ will not be altered.
//
// Example with multiple arguments and substitutions, showing
// jvm GC logging, and start of a passwordless JVM JMX agent so can
// connect with jconsole and the likes to watch child memory, threads
// and get thread dumps.
//
// <property>
// <name>mapred.map.child.java.opts</name>
// <value>-Xmx 512M -verbose:gc -Xloggc:/tmp/@[email protected] \
// -Dcom.sun.management.jmxremote.authenticate=false \
// -Dcom.sun.management.jmxremote.ssl=false \
// </value>
// </property>
//
// <property>
// <name>mapred.reduce.child.java.opts</name>
// <value>-Xmx 1024M -verbose:gc -Xloggc:/tmp/@[email protected] \
// -Dcom.sun.management.jmxremote.authenticate=false \
// -Dcom.sun.management.jmxremote.ssl=false \
// </value>
// </property>
//
String javaOpts = getChildJavaOpts(conf, task.isMapTask());
javaOpts = javaOpts.replace("@[email protected]", attemptID.toString());
String [] javaOptsSplit = javaOpts.split(" ");
for (int i = 0; i < javaOptsSplit.length; i++) {
vargs.add(javaOptsSplit[i]);
}
Path childTmpDir = new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
MRApps.addLog4jSystemProperties(task, vargs, conf);
if (conf.getProfileEnabled()) {
if (conf.getProfileTaskRange(task.isMapTask()
).isIncluded(task.getPartition())) {
final String profileParams = conf.get(task.isMapTask()
? MRJobConfig.TASK_MAP_PROFILE_PARAMS
: MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, conf.getProfileParams());
vargs.add(String.format(profileParams,
getTaskLogFile(TaskLog.LogName.PROFILE)));
}
}
// Add main class and its arguments
vargs.add(YarnChild.class.getName()); // main of Child
// pass TaskAttemptListener's address
vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress());
vargs.add(Integer.toString(taskAttemptListenerAddr.getPort()));
vargs.add(attemptID.toString()); // pass task identifier
// Finally add the jvmID
vargs.add(String.valueOf(jvmID.getId()));
vargs.add("1>" + getTaskLogFile(TaskLog.LogName.STDOUT));
vargs.add("2>" + getTaskLogFile(TaskLog.LogName.STDERR));
// Final commmand
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
mergedCommand.append(str).append(" ");
}
Vector<String> vargsFinal = new Vector<String>(1);
vargsFinal.add(mergedCommand.toString());
return vargsFinal;
}
/**
* test TaskAttemptID
*
* @throws IOException
*/
@Test (timeout=50000)
public void testTaskLog() throws IOException {
// test TaskLog
System.setProperty(
YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR, "testString");
assertEquals(TaskLog.getMRv2LogDir(), "testString");
TaskAttemptID taid = mock(TaskAttemptID.class);
JobID jid = new JobID("job", 1);
when(taid.getJobID()).thenReturn(jid);
when(taid.toString()).thenReturn("JobId");
File f = TaskLog.getTaskLogFile(taid, true, LogName.STDOUT);
assertTrue(f.getAbsolutePath().endsWith("testString"
+ File.separatorChar + "stdout"));
// test getRealTaskLogFileLocation
File indexFile = TaskLog.getIndexFile(taid, true);
if (!indexFile.getParentFile().exists()) {
indexFile.getParentFile().mkdirs();
}
indexFile.delete();
indexFile.createNewFile();
TaskLog.syncLogs("location", taid, true);
assertTrue(indexFile.getAbsolutePath().endsWith(
"userlogs" + File.separatorChar + "job_job_0001"
+ File.separatorChar + "JobId.cleanup"
+ File.separatorChar + "log.index"));
f = TaskLog.getRealTaskLogFileLocation(taid, true, LogName.DEBUGOUT);
if (f != null) {
assertTrue(f.getAbsolutePath().endsWith("location"
+ File.separatorChar + "debugout"));
FileUtils.copyFile(indexFile, f);
}
// test obtainLogDirOwner
assertTrue(TaskLog.obtainLogDirOwner(taid).length() > 0);
// test TaskLog.Reader
assertTrue(readTaskLog(TaskLog.LogName.DEBUGOUT, taid, true).length() > 0);
}
private static String getTaskLogFile(LogName filter) {
return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR +
filter.toString();
}
public static void setVMEnv(Map<String, String> environment,
Task task) {
JobConf conf = task.conf;
// Add the env variables passed by the user
String mapredChildEnv = getChildEnv(conf, task.isMapTask());
MRApps.setEnvFromInputString(environment, mapredChildEnv, conf);
// Set logging level in the environment.
// This is so that, if the child forks another "bin/hadoop" (common in
// streaming) it will have the correct loglevel.
environment.put(
"HADOOP_ROOT_LOGGER",
MRApps.getChildLogLevel(conf, task.isMapTask()) + ",console");
// TODO: The following is useful for instance in streaming tasks. Should be
// set in ApplicationMaster's env by the RM.
String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
if (hadoopClientOpts == null) {
hadoopClientOpts = "";
} else {
hadoopClientOpts = hadoopClientOpts + " ";
}
environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
// setEnvFromInputString above will add env variable values from
// mapredChildEnv to existing variables. We want to overwrite
// HADOOP_ROOT_LOGGER and HADOOP_CLIENT_OPTS if the user set it explicitly.
Map<String, String> tmpEnv = new HashMap<String, String>();
MRApps.setEnvFromInputString(tmpEnv, mapredChildEnv, conf);
String[] keys = { "HADOOP_ROOT_LOGGER", "HADOOP_CLIENT_OPTS" };
for (String key : keys) {
if (tmpEnv.containsKey(key)) {
environment.put(key, tmpEnv.get(key));
}
}
// Add stdout/stderr env
environment.put(
MRJobConfig.STDOUT_LOGFILE_ENV,
getTaskLogFile(TaskLog.LogName.STDOUT)
);
environment.put(
MRJobConfig.STDERR_LOGFILE_ENV,
getTaskLogFile(TaskLog.LogName.STDERR)
);
}
public static List<String> getVMCommand(
InetSocketAddress taskAttemptListenerAddr, Task task,
JVMId jvmID) {
TaskAttemptID attemptID = task.getTaskID();
JobConf conf = task.conf;
Vector<String> vargs = new Vector<String>(8);
vargs.add(MRApps.crossPlatformifyMREnv(task.conf, Environment.JAVA_HOME)
+ "/bin/java");
// Add child (task) java-vm options.
//
// The following symbols if present in mapred.{map|reduce}.child.java.opts
// value are replaced:
// + @[email protected] is interpolated with value of TaskID.
// Other occurrences of @ will not be altered.
//
// Example with multiple arguments and substitutions, showing
// jvm GC logging, and start of a passwordless JVM JMX agent so can
// connect with jconsole and the likes to watch child memory, threads
// and get thread dumps.
//
// <property>
// <name>mapred.map.child.java.opts</name>
// <value>-Xmx 512M -verbose:gc -Xloggc:/tmp/@[email protected] \
// -Dcom.sun.management.jmxremote.authenticate=false \
// -Dcom.sun.management.jmxremote.ssl=false \
// </value>
// </property>
//
// <property>
// <name>mapred.reduce.child.java.opts</name>
// <value>-Xmx 1024M -verbose:gc -Xloggc:/tmp/@[email protected] \
// -Dcom.sun.management.jmxremote.authenticate=false \
// -Dcom.sun.management.jmxremote.ssl=false \
// </value>
// </property>
//
String javaOpts = getChildJavaOpts(conf, task.isMapTask());
javaOpts = javaOpts.replace("@[email protected]", attemptID.toString());
String [] javaOptsSplit = javaOpts.split(" ");
for (int i = 0; i < javaOptsSplit.length; i++) {
vargs.add(javaOptsSplit[i]);
}
Path childTmpDir = new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
vargs.add("-Djava.io.tmpdir=" + childTmpDir);
MRApps.addLog4jSystemProperties(task, vargs, conf);
if (conf.getProfileEnabled()) {
if (conf.getProfileTaskRange(task.isMapTask()
).isIncluded(task.getPartition())) {
final String profileParams = conf.get(task.isMapTask()
? MRJobConfig.TASK_MAP_PROFILE_PARAMS
: MRJobConfig.TASK_REDUCE_PROFILE_PARAMS, conf.getProfileParams());
vargs.add(String.format(profileParams,
getTaskLogFile(TaskLog.LogName.PROFILE)));
}
}
// Add main class and its arguments
vargs.add(YarnChild.class.getName()); // main of Child
// pass TaskAttemptListener's address
vargs.add(taskAttemptListenerAddr.getAddress().getHostAddress());
vargs.add(Integer.toString(taskAttemptListenerAddr.getPort()));
vargs.add(attemptID.toString()); // pass task identifier
// Finally add the jvmID
vargs.add(String.valueOf(jvmID.getId()));
vargs.add("1>" + getTaskLogFile(TaskLog.LogName.STDOUT));
vargs.add("2>" + getTaskLogFile(TaskLog.LogName.STDERR));
// Final commmand
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
mergedCommand.append(str).append(" ");
}
Vector<String> vargsFinal = new Vector<String>(1);
vargsFinal.add(mergedCommand.toString());
return vargsFinal;
}
/**
* test TaskAttemptID
*
* @throws IOException
*/
@Test (timeout=50000)
public void testTaskLog() throws IOException {
// test TaskLog
System.setProperty(
YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR, "testString");
assertEquals(TaskLog.getMRv2LogDir(), "testString");
TaskAttemptID taid = mock(TaskAttemptID.class);
JobID jid = new JobID("job", 1);
when(taid.getJobID()).thenReturn(jid);
when(taid.toString()).thenReturn("JobId");
File f = TaskLog.getTaskLogFile(taid, true, LogName.STDOUT);
assertTrue(f.getAbsolutePath().endsWith("testString"
+ File.separatorChar + "stdout"));
// test getRealTaskLogFileLocation
File indexFile = TaskLog.getIndexFile(taid, true);
if (!indexFile.getParentFile().exists()) {
indexFile.getParentFile().mkdirs();
}
indexFile.delete();
indexFile.createNewFile();
TaskLog.syncLogs("location", taid, true);
assertTrue(indexFile.getAbsolutePath().endsWith(
"userlogs" + File.separatorChar + "job_job_0001"
+ File.separatorChar + "JobId.cleanup"
+ File.separatorChar + "log.index"));
f = TaskLog.getRealTaskLogFileLocation(taid, true, LogName.DEBUGOUT);
if (f != null) {
assertTrue(f.getAbsolutePath().endsWith("location"
+ File.separatorChar + "debugout"));
FileUtils.copyFile(indexFile, f);
}
// test obtainLogDirOwner
assertTrue(TaskLog.obtainLogDirOwner(taid).length() > 0);
// test TaskLog.Reader
assertTrue(readTaskLog(TaskLog.LogName.DEBUGOUT, taid, true).length() > 0);
}
/**
* Test the truncation of log-file when JVM-reuse is enabled.
*
* @throws IOException
*/
@Test
public void testLogTruncationOnFinishingWithJVMReuse() throws IOException {
TaskTracker taskTracker = new TaskTracker();
TaskLogsMonitor logsMonitor = new TaskLogsMonitor(150L, 150L);
taskTracker.setTaskLogsMonitor(logsMonitor);
TaskID baseTaskID = new TaskID();
int attemptsCount = 0;
// Assuming the job's retain size is 150
TaskAttemptID attempt1 = new TaskAttemptID(baseTaskID, attemptsCount++);
Task task1 = new MapTask(null, attempt1, 0, null, null, 0, null);
// Let the tasks write logs more than retain-size
writeRealBytes(attempt1, attempt1, LogName.SYSLOG, 200, 'A');
logsMonitor.monitorTaskLogs();
File attemptDir = TaskLog.getBaseDir(attempt1.toString());
assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
// Start another attempt in the same JVM
TaskAttemptID attempt2 = new TaskAttemptID(baseTaskID, attemptsCount++);
Task task2 = new MapTask(null, attempt2, 0, null, null, 0, null);
logsMonitor.monitorTaskLogs();
// Let attempt2 also write some logs
writeRealBytes(attempt1, attempt2, LogName.SYSLOG, 100, 'B');
logsMonitor.monitorTaskLogs();
// Start yet another attempt in the same JVM
TaskAttemptID attempt3 = new TaskAttemptID(baseTaskID, attemptsCount++);
Task task3 = new MapTask(null, attempt3, 0, null, null, 0, null);
logsMonitor.monitorTaskLogs();
// Let attempt3 also write some logs
writeRealBytes(attempt1, attempt3, LogName.SYSLOG, 225, 'C');
logsMonitor.monitorTaskLogs();
// Finish the JVM.
logsMonitor.addProcessForLogTruncation(attempt1,
Arrays.asList((new Task[] { task1, task2, task3 })));
// The log-file should now be truncated.
logsMonitor.monitorTaskLogs();
assertTrue(attemptDir.exists());
File logFile = TaskLog.getTaskLogFile(attempt1, LogName.SYSLOG);
assertEquals(400, logFile.length());
// The index files should also be proper.
assertEquals(150, getAllLogsFileLengths(attempt1, false).get(
LogName.SYSLOG).longValue());
assertEquals(100, getAllLogsFileLengths(attempt2, false).get(
LogName.SYSLOG).longValue());
assertEquals(150, getAllLogsFileLengths(attempt3, false).get(
LogName.SYSLOG).longValue());
// assert the data.
FileReader reader =
new FileReader(TaskLog.getTaskLogFile(attempt1, LogName.SYSLOG));
int ch, bytesRead = 0;
boolean dataValid = true;
while ((ch = reader.read()) != -1) {
bytesRead++;
if (bytesRead <= 150) {
if ((char) ch != 'A') {
LOG.warn("Truncation didn't happen properly. At "
+ (bytesRead + 1) + "th byte, expected 'A' but found "
+ (char) ch);
dataValid = false;
}
} else if (bytesRead <= 250) {
if ((char) ch != 'B') {
LOG.warn("Truncation didn't happen properly. At "
+ (bytesRead + 1) + "th byte, expected 'B' but found "
+ (char) ch);
dataValid = false;
}
} else if ((char) ch != 'C') {
LOG.warn("Truncation didn't happen properly. At " + (bytesRead + 1)
+ "th byte, expected 'C' but found " + (char) ch);
dataValid = false;
}
}
assertTrue("Log-truncation didn't happen properly!", dataValid);
logsMonitor.monitorTaskLogs();
assertEquals(400, logFile.length());
}
/**
* Test logs monitoring with {@link MiniMRCluster}
*
* @throws IOException
*/
@Test
public void testLogsMonitoringWithMiniMR() throws IOException {
MiniMRCluster mr = null;
try {
JobConf clusterConf = new JobConf();
clusterConf.setLong(TaskTracker.MAP_USERLOG_RETAIN_SIZE, 10000L);
clusterConf.setLong(TaskTracker.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
mr = new MiniMRCluster(1, "file:///", 3, null, null, clusterConf);
JobConf conf = mr.createJobConf();
Path inDir = new Path(TEST_ROOT_DIR + "/input");
Path outDir = new Path(TEST_ROOT_DIR + "/output");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox jumped over the lazy dog";
DataOutputStream file = fs.create(new Path(inDir, "part-0"));
file.writeBytes(input);
file.close();
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(1);
conf.setNumReduceTasks(0);
conf.setMapperClass(LoggingMapper.class);
RunningJob job = JobClient.runJob(conf);
assertTrue(job.getJobState() == JobStatus.SUCCEEDED);
for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
long length =
TaskLog.getTaskLogFile(tce.getTaskAttemptId(),
TaskLog.LogName.STDOUT).length();
assertTrue("STDOUT log file length for " + tce.getTaskAttemptId()
+ " is " + length + " and not <=10000", length <= 10000);
}
} finally {
if (mr != null) {
mr.shutdown();
}
}
}
/**
* Test the truncation of DEBUGOUT file by {@link TaskLogsMonitor}
* @throws IOException
*/
@Test
public void testDebugLogsTruncationWithMiniMR() throws IOException {
MiniMRCluster mr = null;
try {
JobConf clusterConf = new JobConf();
clusterConf.setLong(TaskTracker.MAP_USERLOG_RETAIN_SIZE, 10000L);
clusterConf.setLong(TaskTracker.REDUCE_USERLOG_RETAIN_SIZE, 10000L);
mr = new MiniMRCluster(1, "file:///", 3, null, null, clusterConf);
JobConf conf = mr.createJobConf();
Path inDir = new Path(TEST_ROOT_DIR + "/input");
Path outDir = new Path(TEST_ROOT_DIR + "/output");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox jumped over the lazy dog";
DataOutputStream file = fs.create(new Path(inDir, "part-0"));
file.writeBytes(input);
file.close();
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(1);
conf.setMaxMapAttempts(1);
conf.setNumReduceTasks(0);
conf.setMapperClass(TestMiniMRMapRedDebugScript.MapClass.class);
// copy debug script to cache from local file system.
Path scriptPath = new Path(TEST_ROOT_DIR, "debug-script.txt");
String debugScriptContent =
"for ((i=0;i<1000;i++)); " + "do "
+ "echo \"Lots of logs! Lots of logs! "
+ "Waiting to be truncated! Lots of logs!\";" + "done";
DataOutputStream scriptFile = fs.create(scriptPath);
scriptFile.writeBytes(debugScriptContent);
scriptFile.close();
new File(scriptPath.toUri().getPath()).setExecutable(true);
URI uri = scriptPath.toUri();
DistributedCache.createSymlink(conf);
DistributedCache.addCacheFile(uri, conf);
conf.setMapDebugScript(scriptPath.toUri().getPath());
RunningJob job = null;
try {
JobClient jc = new JobClient(conf);
job = jc.submitJob(conf);
try {
jc.monitorAndPrintJob(conf, job);
} catch (InterruptedException e) {
//
}
} catch (IOException ioe) {
} finally{
for (TaskCompletionEvent tce : job.getTaskCompletionEvents(0)) {
File debugOutFile =
TaskLog.getTaskLogFile(tce.getTaskAttemptId(),
TaskLog.LogName.DEBUGOUT);
if (debugOutFile.exists()) {
long length = debugOutFile.length();
assertTrue("DEBUGOUT log file length for "
+ tce.getTaskAttemptId() + " is " + length
+ " and not =10000", length == 10000);
}
}
}
} finally {
if (mr != null) {
mr.shutdown();
}
}
}