下面列出了怎么用org.apache.hadoop.mapreduce.TaskCompletionEvent的API类实例代码及写法,或者点击链接到github查看源代码。
private TaskCompletionEvent getOneTaskFailure(Job job) throws IOException, InterruptedException {
TaskCompletionEvent lastEvent = null;
int index = 0;
int failCount = 0;
TaskCompletionEvent[] events = job.getTaskCompletionEvents(index);
//This returns either nothing (if no task executions or no exceptions at all) or the last failure event within a subset of the exceptions from the first
//index at which exceptions are found in the task completion events
if (events == null) {
return lastEvent;
}
while (events.length > 0 && failCount == 0) {
for (TaskCompletionEvent event : events) {
if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
failCount++;
lastEvent = event;
}
}
index += 10;
events = job.getTaskCompletionEvents(index);
}
return lastEvent;
}
private TaskCompletionEvent getOneTaskFailure(Job job) throws IOException, InterruptedException {
TaskCompletionEvent lastEvent = null;
int index = 0;
int failCount = 0;
TaskCompletionEvent[] events = job.getTaskCompletionEvents(index);
//This returns either nothing (if no task executions or no exceptions at all) or the last failure event within a subset of the exceptions from the first
//index at which exceptions are found in the task completion events
if (events == null) {
return lastEvent;
}
while (events.length > 0 && failCount == 0) {
for (TaskCompletionEvent event : events) {
if (event.getStatus().equals(TaskCompletionEvent.Status.FAILED)) {
failCount++;
lastEvent = event;
}
}
index += 10;
events = job.getTaskCompletionEvents(index);
}
return lastEvent;
}
private static List<TaskCompletionEvent> getAllTaskCompletionEvent(Job completedJob) {
List<TaskCompletionEvent> completionEvents = new LinkedList<>();
while (true) {
try {
TaskCompletionEvent[] bunchOfEvents;
bunchOfEvents = completedJob.getTaskCompletionEvents(completionEvents.size());
if (bunchOfEvents == null || bunchOfEvents.length == 0) {
break;
}
completionEvents.addAll(Arrays.asList(bunchOfEvents));
} catch (IOException e) {
break;
}
}
return completionEvents;
}
/**
* Get good files
* The problem happens when speculative task attempt initialized but then killed in the middle of processing.
* Some partial file was generated at {tmp_output}/_temporary/1/_temporary/attempt_xxx_xxx/xxxx(Avro file
* might have .avro as extension file name), without being committed to its final destination
* at {tmp_output}/xxxx.
*
* @param job Completed MR job
* @param fs File system that can handle file system
* @param acceptableExtension file extension acceptable as "good files".
* @return all successful files that has been committed
*/
public static List<Path> getGoodFiles(Job job, Path tmpPath, FileSystem fs, List<String> acceptableExtension)
throws IOException {
List<TaskCompletionEvent> failedEvents = getUnsuccessfulTaskCompletionEvent(job);
List<Path> allFilePaths = DatasetHelper.getApplicableFilePaths(fs, tmpPath, acceptableExtension);
List<Path> goodPaths = new ArrayList<>();
for (Path filePath : allFilePaths) {
if (isFailedPath(filePath, failedEvents)) {
fs.delete(filePath, false);
log.error("{} is a bad path so it was deleted", filePath);
} else {
goodPaths.add(filePath);
}
}
return goodPaths;
}
/**
* List the events for the given job
* @param jobId the job id for the job's events to list
* @throws IOException
*/
private void listEvents(Job job, int fromEventId, int numEvents)
throws IOException, InterruptedException {
TaskCompletionEvent[] events = job.
getTaskCompletionEvents(fromEventId, numEvents);
System.out.println("Task completion events for " + job.getJobID());
System.out.println("Number of events (from " + fromEventId + ") are: "
+ events.length);
for(TaskCompletionEvent event: events) {
System.out.println(event.getStatus() + " " +
event.getTaskAttemptId() + " " +
getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
}
}
/**
* List the events for the given job
* @param jobId the job id for the job's events to list
* @throws IOException
*/
private void listEvents(Job job, int fromEventId, int numEvents)
throws IOException, InterruptedException {
TaskCompletionEvent[] events = job.
getTaskCompletionEvents(fromEventId, numEvents);
System.out.println("Task completion events for " + job.getJobID());
System.out.println("Number of events (from " + fromEventId + ") are: "
+ events.length);
for(TaskCompletionEvent event: events) {
System.out.println(event.getStatus() + " " +
event.getTaskAttemptId() + " " +
getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
}
}
public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobId,
int fromEventId, int maxEvents)
throws IOException, InterruptedException {
// FIXME seems like there is support in client to query task failure
// related information
// However, api does not make sense for DAG
return new TaskCompletionEvent[0];
}
public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobId,
int fromEventId, int maxEvents)
throws IOException, InterruptedException {
// FIXME seems like there is support in client to query task failure
// related information
// However, api does not make sense for DAG
return new TaskCompletionEvent[0];
}
public void updateJobCounter() {
try {
Counters counters = job.getCounters();
if (counters == null) {
String errorMsg = "no counters for job " + getMrJobId();
logger.warn(errorMsg);
output.append(errorMsg);
} else {
this.output.append(counters.toString()).append("\n");
logger.debug(counters.toString());
mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
rawInputBytesRead = String.valueOf(counters.findCounter(RawDataCounter.BYTES).getValue());
String outputFolder = job.getConfiguration().get("mapreduce.output.fileoutputformat.outputdir",
KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
logger.debug("outputFolder is " + outputFolder);
Path outputPath = new Path(outputFolder);
String fsScheme = outputPath.getFileSystem(job.getConfiguration()).getScheme();
long bytesWritten = counters.findCounter(fsScheme, FileSystemCounter.BYTES_WRITTEN).getValue();
if (bytesWritten == 0) {
logger.debug("Seems no counter found for " + fsScheme);
bytesWritten = counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue();
}
hdfsBytesWritten = String.valueOf(bytesWritten);
}
JobStatus jobStatus = job.getStatus();
if (jobStatus.getState() == JobStatus.State.FAILED) {
logger.warn("Job Diagnostics:" + jobStatus.getFailureInfo());
output.append("Job Diagnostics:").append(jobStatus.getFailureInfo()).append("\n");
TaskCompletionEvent taskEvent = getOneTaskFailure(job);
if (taskEvent != null) {
String[] fails = job.getTaskDiagnostics(taskEvent.getTaskAttemptId());
logger.warn("Failure task Diagnostics:");
output.append("Failure task Diagnostics:").append("\n");
for (String failure : fails) {
logger.warn(failure);
output.append(failure).append("\n");
}
}
}
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
output.append(e.getLocalizedMessage());
}
}
@Override
public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
int arg2) throws IOException, InterruptedException {
return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
}
public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
return new MapTaskCompletionEventsUpdate(
org.apache.hadoop.mapred.TaskCompletionEvent.EMPTY_ARRAY, false);
}
public TaskCompletionEvent[] getTaskCompletionEvents(
org.apache.hadoop.mapreduce.JobID jobid
, int fromEventId, int maxEvents) throws IOException {
return TaskCompletionEvent.EMPTY_ARRAY;
}
@Override
public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
int arg2) throws IOException, InterruptedException {
return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
}
public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
return new MapTaskCompletionEventsUpdate(
org.apache.hadoop.mapred.TaskCompletionEvent.EMPTY_ARRAY, false);
}
public TaskCompletionEvent[] getTaskCompletionEvents(
org.apache.hadoop.mapreduce.JobID jobid
, int fromEventId, int maxEvents) throws IOException {
return TaskCompletionEvent.EMPTY_ARRAY;
}
/** {@inheritDoc} */
@Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents)
throws IOException, InterruptedException {
return new TaskCompletionEvent[0];
}
public void updateJobCounter() {
try {
Counters counters = job.getCounters();
if (counters == null) {
String errorMsg = "no counters for job " + getMrJobId();
logger.warn(errorMsg);
output.append(errorMsg);
} else {
this.output.append(counters.toString()).append("\n");
logger.debug(counters.toString());
mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
rawInputBytesRead = String.valueOf(counters.findCounter(RawDataCounter.BYTES).getValue());
String outputFolder = job.getConfiguration().get("mapreduce.output.fileoutputformat.outputdir",
KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
logger.debug("outputFolder is " + outputFolder);
Path outputPath = new Path(outputFolder);
String fsScheme = outputPath.getFileSystem(job.getConfiguration()).getScheme();
long bytesWritten = counters.findCounter(fsScheme, FileSystemCounter.BYTES_WRITTEN).getValue();
if (bytesWritten == 0) {
logger.debug("Seems no counter found for " + fsScheme);
bytesWritten = counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue();
}
hdfsBytesWritten = String.valueOf(bytesWritten);
}
JobStatus jobStatus = job.getStatus();
if (jobStatus.getState() == JobStatus.State.FAILED) {
logger.warn("Job Diagnostics:" + jobStatus.getFailureInfo());
output.append("Job Diagnostics:").append(jobStatus.getFailureInfo()).append("\n");
TaskCompletionEvent taskEvent = getOneTaskFailure(job);
if (taskEvent != null) {
String[] fails = job.getTaskDiagnostics(taskEvent.getTaskAttemptId());
logger.warn("Failure task Diagnostics:");
output.append("Failure task Diagnostics:").append("\n");
for (String failure : fails) {
logger.warn(failure);
output.append(failure).append("\n");
}
}
}
} catch (Exception e) {
logger.error(e.getLocalizedMessage(), e);
output.append(e.getLocalizedMessage());
}
}
private static List<TaskCompletionEvent> getUnsuccessfulTaskCompletionEvent(Job completedJob) {
return getAllTaskCompletionEvent(completedJob).stream()
.filter(te -> te.getStatus() != TaskCompletionEvent.Status.SUCCEEDED)
.collect(Collectors.toList());
}
private static boolean isFailedPath(Path path, List<TaskCompletionEvent> failedEvents) {
return path.toString().contains("_temporary") || failedEvents.stream()
.anyMatch(
event -> path.toString().contains(Path.SEPARATOR + event.getTaskAttemptId().toString() + Path.SEPARATOR));
}
@Override
public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
int arg2) throws IOException, InterruptedException {
return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
}
@Override
public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
int arg2) throws IOException, InterruptedException {
return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
}
/**
* Get task completion events for the jobid, starting from fromEventId.
* Returns empty array if no events are available.
* @param jobid job id
* @param fromEventId event id to start from.
* @param maxEvents the max number of events we want to look at
* @return array of task completion events.
* @throws IOException
*/
public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid,
int fromEventId, int maxEvents) throws IOException, InterruptedException;
/**
* Get task completion events for the jobid, starting from fromEventId.
* Returns empty array if no events are available.
* @param jobid job id
* @param fromEventId event id to start from.
* @param maxEvents the max number of events we want to look at
* @return array of task completion events.
* @throws IOException
*/
public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid,
int fromEventId, int maxEvents) throws IOException, InterruptedException;