org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent#org.apache.hadoop.mapreduce.jobhistory.HistoryEvent源码实例Demo

下面列出了org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent#org.apache.hadoop.mapreduce.jobhistory.HistoryEvent 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String taskType = line.get("TASK_TYPE");
  String startTime = line.get("START_TIME");
  String splits = line.get("SPLITS");

  if (startTime != null && taskType != null) {
    Task20LineHistoryEventEmitter that =
        (Task20LineHistoryEventEmitter) thatg;

    that.originalStartTime = Long.parseLong(startTime);
    that.originalTaskType =
        Version20LogInterfaceUtils.get20TaskType(taskType);

    return new TaskStartedEvent(taskID, that.originalStartTime,
        that.originalTaskType, splits);
  }

  return null;
}
 
源代码2 项目: hadoop   文件: Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String finishTime = line.get("FINISH_TIME");

  if (finishTime != null) {
    return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
  }

  return null;
}
 
源代码3 项目: hadoop   文件: TopologyBuilder.java
/**
 * Process one {@link HistoryEvent}
 * 
 * @param event
 *          The {@link HistoryEvent} to be processed.
 */
public void process(HistoryEvent event) {
  if (event instanceof TaskAttemptFinishedEvent) {
    processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
  } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
    processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
  } else if (event instanceof TaskStartedEvent) {
    processTaskStartedEvent((TaskStartedEvent) event);
  } else if (event instanceof MapAttemptFinishedEvent) {
    processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
  } else if (event instanceof ReduceAttemptFinishedEvent) {
    processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
  }

  // I do NOT expect these if statements to be exhaustive.
}
 
源代码4 项目: hadoop   文件: Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  JobID jobID = JobID.forName(jobIDName);

  if (jobIDName == null) {
    return null;
  }

  String priority = line.get("JOB_PRIORITY");

  if (priority != null) {
    return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
  }

  return null;
}
 
源代码5 项目: hadoop   文件: Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String launchTime = line.get("LAUNCH_TIME");
  String status = line.get("JOB_STATUS");
  String totalMaps = line.get("TOTAL_MAPS");
  String totalReduces = line.get("TOTAL_REDUCES");
  String uberized = line.get("UBERIZED");

  if (launchTime != null && totalMaps != null && totalReduces != null) {
    return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
        .parseInt(totalMaps), Integer.parseInt(totalReduces), status,
        Boolean.parseBoolean(uberized));
  }

  return null;
}
 
源代码6 项目: hadoop   文件: Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String status = line.get("JOB_STATUS");

  if (status != null) {
    return new JobStatusChangedEvent(jobID, status);
  }

  return null;
}
 
源代码7 项目: hadoop   文件: Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String launchTime = line.get("LAUNCH_TIME");

  if (launchTime != null) {
    Job20LineHistoryEventEmitter that =
        (Job20LineHistoryEventEmitter) thatg;
    return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
        .parseLong(launchTime));
  }

  return null;
}
 
源代码8 项目: hadoop   文件: Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String finishTime = line.get("FINISH_TIME");

  String status = line.get("JOB_STATUS");

  String finishedMaps = line.get("FINISHED_MAPS");
  String finishedReduces = line.get("FINISHED_REDUCES");

  if (status != null && !status.equalsIgnoreCase("success")
      && finishTime != null && finishedMaps != null
      && finishedReduces != null) {
    return new JobUnsuccessfulCompletionEvent(jobID, Long
        .parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
        .parseInt(finishedReduces), status);
  }

  return null;
}
 
源代码9 项目: big-c   文件: Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String taskType = line.get("TASK_TYPE");
  String startTime = line.get("START_TIME");
  String splits = line.get("SPLITS");

  if (startTime != null && taskType != null) {
    Task20LineHistoryEventEmitter that =
        (Task20LineHistoryEventEmitter) thatg;

    that.originalStartTime = Long.parseLong(startTime);
    that.originalTaskType =
        Version20LogInterfaceUtils.get20TaskType(taskType);

    return new TaskStartedEvent(taskID, that.originalStartTime,
        that.originalTaskType, splits);
  }

  return null;
}
 
源代码10 项目: big-c   文件: Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String finishTime = line.get("FINISH_TIME");

  if (finishTime != null) {
    return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
  }

  return null;
}
 
源代码11 项目: big-c   文件: TopologyBuilder.java
/**
 * Process one {@link HistoryEvent}
 * 
 * @param event
 *          The {@link HistoryEvent} to be processed.
 */
public void process(HistoryEvent event) {
  if (event instanceof TaskAttemptFinishedEvent) {
    processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
  } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
    processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
  } else if (event instanceof TaskStartedEvent) {
    processTaskStartedEvent((TaskStartedEvent) event);
  } else if (event instanceof MapAttemptFinishedEvent) {
    processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
  } else if (event instanceof ReduceAttemptFinishedEvent) {
    processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
  }

  // I do NOT expect these if statements to be exhaustive.
}
 
源代码12 项目: big-c   文件: Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  JobID jobID = JobID.forName(jobIDName);

  if (jobIDName == null) {
    return null;
  }

  String priority = line.get("JOB_PRIORITY");

  if (priority != null) {
    return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
  }

  return null;
}
 
源代码13 项目: big-c   文件: Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String launchTime = line.get("LAUNCH_TIME");
  String status = line.get("JOB_STATUS");
  String totalMaps = line.get("TOTAL_MAPS");
  String totalReduces = line.get("TOTAL_REDUCES");
  String uberized = line.get("UBERIZED");

  if (launchTime != null && totalMaps != null && totalReduces != null) {
    return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
        .parseInt(totalMaps), Integer.parseInt(totalReduces), status,
        Boolean.parseBoolean(uberized));
  }

  return null;
}
 
源代码14 项目: big-c   文件: Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String status = line.get("JOB_STATUS");

  if (status != null) {
    return new JobStatusChangedEvent(jobID, status);
  }

  return null;
}
 
源代码15 项目: big-c   文件: Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String launchTime = line.get("LAUNCH_TIME");

  if (launchTime != null) {
    Job20LineHistoryEventEmitter that =
        (Job20LineHistoryEventEmitter) thatg;
    return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
        .parseLong(launchTime));
  }

  return null;
}
 
源代码16 项目: big-c   文件: Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String finishTime = line.get("FINISH_TIME");

  String status = line.get("JOB_STATUS");

  String finishedMaps = line.get("FINISHED_MAPS");
  String finishedReduces = line.get("FINISHED_REDUCES");

  if (status != null && !status.equalsIgnoreCase("success")
      && finishTime != null && finishedMaps != null
      && finishedReduces != null) {
    return new JobUnsuccessfulCompletionEvent(jobID, Long
        .parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
        .parseInt(finishedReduces), status);
  }

  return null;
}
 
源代码17 项目: jumbune   文件: DecoratedJobHistoryParser.java
@Override
public void handleEvent(HistoryEvent event)  { 
  EventType type = event.getEventType();

  switch (type) {
  case MAP_ATTEMPT_FINISHED:
    handleMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
super.handleEvent(event);
    break;
  case REDUCE_ATTEMPT_FINISHED:
    handleReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
super.handleEvent(event);
    break;
  default:
  	super.handleEvent(event);
    break;
  }
}
 
源代码18 项目: hadoop   文件: TestMRAppMaster.java
private void verifyFailedStatus(MRAppMasterTest appMaster,
    String expectedJobState) {
  ArgumentCaptor<JobHistoryEvent> captor = ArgumentCaptor
      .forClass(JobHistoryEvent.class);
  // handle two events: AMStartedEvent and JobUnsuccessfulCompletionEvent
  verify(appMaster.spyHistoryService, times(2))
      .handleEvent(captor.capture());
  HistoryEvent event = captor.getValue().getHistoryEvent();
  assertTrue(event instanceof JobUnsuccessfulCompletionEvent);
  assertEquals(((JobUnsuccessfulCompletionEvent) event).getStatus()
      , expectedJobState);
}
 
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
    HistoryEventEmitter thatg) {
  if (taskAttemptIDName == null) {
    return null;
  }

  TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);

  String finishTime = line.get("FINISH_TIME");
  String status = line.get("TASK_STATUS");

  if (finishTime != null && status != null
      && status.equalsIgnoreCase("success")) {
    String hostName = line.get("HOSTNAME");
    String counters = line.get("COUNTERS");
    String state = line.get("STATE_STRING");
    String shuffleFinish = line.get("SHUFFLE_FINISHED");
    String sortFinish = line.get("SORT_FINISHED");

    if (shuffleFinish != null && sortFinish != null
        && "success".equalsIgnoreCase(status)) {
      ReduceAttempt20LineHistoryEventEmitter that =
          (ReduceAttempt20LineHistoryEventEmitter) thatg;

      return new ReduceAttemptFinishedEvent
        (taskAttemptID,
         that.originalTaskType, status,
         Long.parseLong(shuffleFinish),
         Long.parseLong(sortFinish),
         Long.parseLong(finishTime),
         hostName, -1, null,
         state, maybeParseCounters(counters),
         null);
    }
  }

  return null;
}
 
源代码20 项目: hadoop   文件: Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String status = line.get("TASK_STATUS");
  String finishTime = line.get("FINISH_TIME");

  String error = line.get("ERROR");

  String counters = line.get("COUNTERS");

  if (finishTime != null && error == null
      && (status != null && status.equalsIgnoreCase("success"))) {
    Counters eventCounters = maybeParseCounters(counters);

    Task20LineHistoryEventEmitter that =
        (Task20LineHistoryEventEmitter) thatg;

    if (that.originalTaskType == null) {
      return null;
    }

    return new TaskFinishedEvent(taskID, null, Long.parseLong(finishTime),
        that.originalTaskType, status, eventCounters);
  }

  return null;
}
 
源代码21 项目: hadoop   文件: Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String status = line.get("TASK_STATUS");
  String finishTime = line.get("FINISH_TIME");

  String taskType = line.get("TASK_TYPE");

  String error = line.get("ERROR");

  if (finishTime != null
      && (error != null || (status != null && !status
          .equalsIgnoreCase("success")))) {
    Task20LineHistoryEventEmitter that =
        (Task20LineHistoryEventEmitter) thatg;

    TaskType originalTaskType =
        that.originalTaskType == null ? Version20LogInterfaceUtils
            .get20TaskType(taskType) : that.originalTaskType;

    return new TaskFailedEvent(taskID, Long.parseLong(finishTime),
        originalTaskType, error, status, null);
  }

  return null;
}
 
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
    HistoryEventEmitter thatg) {
  if (taskAttemptIDName == null) {
    return null;
  }

  TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);

  String finishTime = line.get("FINISH_TIME");
  String status = line.get("TASK_STATUS");

  if (finishTime != null && status != null
      && status.equalsIgnoreCase("success")) {
    String hostName = line.get("HOSTNAME");
    String counters = line.get("COUNTERS");
    String state = line.get("STATE_STRING");

    MapAttempt20LineHistoryEventEmitter that =
        (MapAttempt20LineHistoryEventEmitter) thatg;

    if ("success".equalsIgnoreCase(status)) {
      return new MapAttemptFinishedEvent
        (taskAttemptID,
          that.originalTaskType, status,
         Long.parseLong(finishTime),
         Long.parseLong(finishTime),
         hostName, -1, null, state, maybeParseCounters(counters),
         null);
    }
  }

  return null;
}
 
源代码23 项目: hadoop   文件: Job20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
    HistoryEventEmitter thatg) {
  if (jobIDName == null) {
    return null;
  }

  JobID jobID = JobID.forName(jobIDName);

  String finishTime = line.get("FINISH_TIME");

  String status = line.get("JOB_STATUS");

  String finishedMaps = line.get("FINISHED_MAPS");
  String finishedReduces = line.get("FINISHED_REDUCES");

  String failedMaps = line.get("FAILED_MAPS");
  String failedReduces = line.get("FAILED_REDUCES");

  String counters = line.get("COUNTERS");

  if (status != null && status.equalsIgnoreCase("success")
      && finishTime != null && finishedMaps != null
      && finishedReduces != null) {
    return new JobFinishedEvent(jobID, Long.parseLong(finishTime), Integer
        .parseInt(finishedMaps), Integer.parseInt(finishedReduces), Integer
        .parseInt(failedMaps), Integer.parseInt(failedReduces), null, null,
        maybeParseCounters(counters));
  }

  return null;
}
 
源代码24 项目: hadoop   文件: TaskAttempt20LineEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
    HistoryEventEmitter thatg) {
  if (taskAttemptIDName == null) {
    return null;
  }

  TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);

  String startTime = line.get("START_TIME");
  String taskType = line.get("TASK_TYPE");
  String trackerName = line.get("TRACKER_NAME");
  String httpPort = line.get("HTTP_PORT");
  String locality = line.get("LOCALITY");
  if (locality == null) {
    locality = "";
  }
  String avataar = line.get("AVATAAR");
  if (avataar == null) {
    avataar = "";
  }

  if (startTime != null && taskType != null) {
    TaskAttempt20LineEventEmitter that =
        (TaskAttempt20LineEventEmitter) thatg;

    that.originalStartTime = Long.parseLong(startTime);
    that.originalTaskType =
        Version20LogInterfaceUtils.get20TaskType(taskType);

    int port =
        httpPort.equals("") ? DEFAULT_HTTP_PORT : Integer
            .parseInt(httpPort);

    return new TaskAttemptStartedEvent(taskAttemptID,
        that.originalTaskType, that.originalStartTime, trackerName, port, -1,
        locality, avataar);
  }

  return null;
}
 
源代码25 项目: hadoop   文件: TaskAttempt20LineEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
    HistoryEventEmitter thatg) {
  if (taskAttemptIDName == null) {
    return null;
  }

  TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);

  String finishTime = line.get("FINISH_TIME");
  String status = line.get("TASK_STATUS");

  if (finishTime != null && status != null
      && status.equalsIgnoreCase("success")) {
    String hostName = line.get("HOSTNAME");
    String counters = line.get("COUNTERS");
    String state = line.get("STATE_STRING");

    TaskAttempt20LineEventEmitter that =
        (TaskAttempt20LineEventEmitter) thatg;

    ParsedHost pHost = ParsedHost.parse(hostName);

    return new TaskAttemptFinishedEvent(taskAttemptID,
        that.originalTaskType, status, Long.parseLong(finishTime),
        pHost.getRackName(), pHost.getNodeName(), state, 
        maybeParseCounters(counters));
  }

  return null;
}
 
源代码26 项目: hadoop   文件: TaskAttempt20LineEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
    HistoryEventEmitter thatg) {
  if (taskAttemptIDName == null) {
    return null;
  }

  TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);

  String finishTime = line.get("FINISH_TIME");
  String status = line.get("TASK_STATUS");

  if (finishTime != null && status != null
      && !status.equalsIgnoreCase("success")) {
    String hostName = line.get("HOSTNAME");
    String error = line.get("ERROR");

    TaskAttempt20LineEventEmitter that =
        (TaskAttempt20LineEventEmitter) thatg;

    ParsedHost pHost = ParsedHost.parse(hostName);
    String rackName = null;
    
    // Earlier versions of MR logged on hostnames (without rackname) for
    // unsuccessful attempts
    if (pHost != null) {
      rackName = pHost.getRackName();
      hostName = pHost.getNodeName();
    }
    return new TaskAttemptUnsuccessfulCompletionEvent
      (taskAttemptID,
       that.originalTaskType, status, Long.parseLong(finishTime),
       hostName, -1, rackName, error, null);
  }

  return null;
}
 
源代码27 项目: hadoop   文件: TraceBuilder.java
void processJobHistory(JobHistoryParser parser, JobBuilder jobBuilder)
    throws IOException {
  HistoryEvent e;
  while ((e = parser.nextEvent()) != null) {
    jobBuilder.process(e);
    topologyBuilder.process(e);
  }

  parser.close();
}
 
源代码28 项目: big-c   文件: TestMRAppMaster.java
private void verifyFailedStatus(MRAppMasterTest appMaster,
    String expectedJobState) {
  ArgumentCaptor<JobHistoryEvent> captor = ArgumentCaptor
      .forClass(JobHistoryEvent.class);
  // handle two events: AMStartedEvent and JobUnsuccessfulCompletionEvent
  verify(appMaster.spyHistoryService, times(2))
      .handleEvent(captor.capture());
  HistoryEvent event = captor.getValue().getHistoryEvent();
  assertTrue(event instanceof JobUnsuccessfulCompletionEvent);
  assertEquals(((JobUnsuccessfulCompletionEvent) event).getStatus()
      , expectedJobState);
}
 
HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
    HistoryEventEmitter thatg) {
  if (taskAttemptIDName == null) {
    return null;
  }

  TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);

  String finishTime = line.get("FINISH_TIME");
  String status = line.get("TASK_STATUS");

  if (finishTime != null && status != null
      && status.equalsIgnoreCase("success")) {
    String hostName = line.get("HOSTNAME");
    String counters = line.get("COUNTERS");
    String state = line.get("STATE_STRING");
    String shuffleFinish = line.get("SHUFFLE_FINISHED");
    String sortFinish = line.get("SORT_FINISHED");

    if (shuffleFinish != null && sortFinish != null
        && "success".equalsIgnoreCase(status)) {
      ReduceAttempt20LineHistoryEventEmitter that =
          (ReduceAttempt20LineHistoryEventEmitter) thatg;

      return new ReduceAttemptFinishedEvent
        (taskAttemptID,
         that.originalTaskType, status,
         Long.parseLong(shuffleFinish),
         Long.parseLong(sortFinish),
         Long.parseLong(finishTime),
         hostName, -1, null,
         state, maybeParseCounters(counters),
         null);
    }
  }

  return null;
}
 
源代码30 项目: big-c   文件: Task20LineHistoryEventEmitter.java
HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
    HistoryEventEmitter thatg) {
  if (taskIDName == null) {
    return null;
  }

  TaskID taskID = TaskID.forName(taskIDName);

  String status = line.get("TASK_STATUS");
  String finishTime = line.get("FINISH_TIME");

  String error = line.get("ERROR");

  String counters = line.get("COUNTERS");

  if (finishTime != null && error == null
      && (status != null && status.equalsIgnoreCase("success"))) {
    Counters eventCounters = maybeParseCounters(counters);

    Task20LineHistoryEventEmitter that =
        (Task20LineHistoryEventEmitter) thatg;

    if (that.originalTaskType == null) {
      return null;
    }

    return new TaskFinishedEvent(taskID, null, Long.parseLong(finishTime),
        that.originalTaskType, status, eventCounters);
  }

  return null;
}