下面列出了怎么用org.apache.hadoop.mapreduce.TaskType的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Update fairshare for each JobInfo based on the weight, neededTasks and
* minTasks and the size of the pool. We compute the share by finding the
* ratio of (# of slots / weight) using binary search.
*/
private void updateFairShares(double totalSlots, final TaskType type) {
// Find the proper ratio of (# of slots share / weight) by bineary search
BinarySearcher searcher = new BinarySearcher() {
@Override
double targetFunction(double x) {
return slotsUsedWithWeightToSlotRatio(x, type);
}
};
double ratio = searcher.getSolution(totalSlots, lastWeightToFairShareRatio);
lastWeightToFairShareRatio = ratio;
// Set the fair shares based on the value of R we've converged to
for (JobInfo info : infos.values()) {
if (type == TaskType.MAP) {
info.mapFairShare = computeShare(info, ratio, type);
} else {
info.reduceFairShare = computeShare(info, ratio, type);
}
}
}
@Override
public void init() throws IOException {
super.init();
Configuration taskConf = new Configuration();
Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
taskConf.set(FileOutputFormat.OUTDIR, stagingResultDir.toString());
ExecutionBlockId ebId = taskAttemptId.getTaskId().getExecutionBlockId();
writerContext = new TaskAttemptContextImpl(taskConf,
new TaskAttemptID(ebId.getQueryId().toString(), ebId.getId(), TaskType.MAP,
taskAttemptId.getTaskId().getId(), taskAttemptId.getId()));
HFileOutputFormat2 hFileOutputFormat2 = new HFileOutputFormat2();
try {
writer = hFileOutputFormat2.getRecordWriter(writerContext);
committer = new FileOutputCommitter(FileOutputFormat.getOutputPath(writerContext), writerContext);
workingFilePath = committer.getWorkPath();
} catch (InterruptedException e) {
throw new IOException(e.getMessage(), e);
}
LOG.info("Created hbase file writer: " + workingFilePath);
}
public void setDatum(Object oDatum) {
this.datum = (MapAttemptFinished)oDatum;
this.attemptId = TaskAttemptID.forName(datum.attemptId.toString());
this.taskType = TaskType.valueOf(datum.taskType.toString());
this.taskStatus = datum.taskStatus.toString();
this.mapFinishTime = datum.mapFinishTime;
this.finishTime = datum.finishTime;
this.hostname = datum.hostname.toString();
this.rackName = datum.rackname.toString();
this.port = datum.port;
this.state = datum.state.toString();
this.counters = EventReader.fromAvro(datum.counters);
this.clockSplits = AvroArrayUtils.fromAvro(datum.clockSplits);
this.cpuUsages = AvroArrayUtils.fromAvro(datum.cpuUsages);
this.gpuUsages = AvroArrayUtils.fromAvro(datum.gpuUsages);
this.vMemKbytes = AvroArrayUtils.fromAvro(datum.vMemKbytes);
this.physMemKbytes = AvroArrayUtils.fromAvro(datum.physMemKbytes);
}
@SuppressWarnings("deprecation")
private long getTaskCounterUsage (JobClient client, JobID id, int numReports,
int taskId, TaskType type)
throws Exception {
TaskReport[] reports = null;
if (TaskType.MAP.equals(type)) {
reports = client.getMapTaskReports(id);
} else if (TaskType.REDUCE.equals(type)) {
reports = client.getReduceTaskReports(id);
}
assertNotNull("No reports found for task type '" + type.name()
+ "' in job " + id, reports);
// make sure that the total number of reports match the expected
assertEquals("Mismatch in task id", numReports, reports.length);
Counters counters = reports[taskId].getCounters();
return counters.getCounter(TaskCounter.COMMITTED_HEAP_BYTES);
}
private void checkTasksResource(TaskType type) throws IOException {
synchronized (lockObject) {
if (!job.inited()) {
return;
}
if (type == TaskType.REDUCE && !job.areReducersInitialized()) {
return;
}
TaskInProgress[] tasks = job.getTasks(type);
for (TaskInProgress tip : tasks) {
// Check that tip is either:
if (tip.isRunnable()) {
// There should be requests for this tip since it is not done yet
List<ResourceRequest> requestIds =
taskToContextMap.get(tip).resourceRequests;
if (requestIds == null || requestIds.size() == 0) {
// This task should be runnable, but it doesn't
// have requests which means it will never run
throw new IOException("Tip " + tip.getTIPId() +
" doesn't have resources " + "requested");
}
}
}
}
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
this.hadoopConfig = getHadoopConfig(this.config);
/**
* PLASE NOTE:
* If you are a Eclipse+Maven Integration user and you have two (or more) warnings here, please
* close the pact-hbase project OR set the maven profile to hadoop_yarn
*
* pact-hbase requires hadoop_yarn, but Eclipse is not able to parse maven profiles properly. Therefore,
* it imports the pact-hbase project even if it is not included in the standard profile (hadoop_v1)
*/
final TaskAttemptID attemptId = new TaskAttemptID(this.jtID, this.jobId, TaskType.MAP, taskNumber - 1, 0);
this.context = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(this.hadoopConfig, attemptId);
final HFileOutputFormat outFormat = new HFileOutputFormat();
try {
this.writer = outFormat.getRecordWriter(this.context);
} catch (InterruptedException iex) {
throw new IOException("Opening the writer was interrupted.", iex);
}
}
void runStreamJob(TaskType type, boolean isEmptyInput) throws IOException {
boolean mayExit = false;
StreamJob job = new StreamJob(genArgs(
mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS), map, reduce), mayExit);
int returnValue = job.go();
assertEquals(0, returnValue);
// If input to reducer is empty, dummy reporter(which ignores all
// reporting lines) is set for MRErrorThread in waitOutputThreads(). So
// expectedCounterValue is 0 for empty-input-to-reducer case.
// Output of reducer is also empty for empty-input-to-reducer case.
int expectedCounterValue = 0;
if (type == TaskType.MAP || !isEmptyInput) {
validateTaskStatus(job, type);
// output is from "print STDOUT" statements in perl script
validateJobOutput(job.getConf());
expectedCounterValue = 2;
}
validateUserCounter(job, expectedCounterValue);
validateTaskStderr(job, type);
deleteOutDir(fs);
}
/**
* Constructor.
*
* @param taskType Type of the task (i.e. Map, Reduce)
* @param numSlots Number of slots available for scheduling
* @param actualNumSlots Actual number of slots on this TaskTracker
* (metrics)
*/
public TaskLauncher(TaskType taskType, int numSlots, int actualNumSlots) {
this.maxSlots = numSlots;
this.actualMaxSlots = actualNumSlots;
this.numFreeSlots = new IntWritable(numSlots);
this.tasksToLaunch = new LinkedList<TaskLaunchData>();
setDaemon(true);
setName("TaskLauncher for " + taskType + " tasks");
this.taskType = taskType;
// Initialize the last free times for all the slots based on the actual
// number of slots
lastFreeMsecsQueue = new LinkedList<Long>();
long currentTime = System.currentTimeMillis();
for (int i = 0; i < actualNumSlots; ++i) {
lastFreeMsecsQueue.add(currentTime);
}
}
@SuppressWarnings("deprecation")
private long getTaskCounterUsage (JobClient client, JobID id, int numReports,
int taskId, TaskType type)
throws Exception {
TaskReport[] reports = null;
if (TaskType.MAP.equals(type)) {
reports = client.getMapTaskReports(id);
} else if (TaskType.REDUCE.equals(type)) {
reports = client.getReduceTaskReports(id);
}
assertNotNull("No reports found for task type '" + type.name()
+ "' in job " + id, reports);
// make sure that the total number of reports match the expected
assertEquals("Mismatch in task id", numReports, reports.length);
Counters counters = reports[taskId].getCounters();
return counters.getCounter(TaskCounter.COMMITTED_HEAP_BYTES);
}
void validateTaskStatus(StreamJob job, TaskType type) throws IOException {
// Map Task has 2 phases: map, sort
// Reduce Task has 3 phases: copy, sort, reduce
String finalPhaseInTask;
TaskReport[] reports;
if (type == TaskType.MAP) {
reports = job.jc_.getMapTaskReports(job.jobId_);
finalPhaseInTask = "sort";
} else {// reduce task
reports = job.jc_.getReduceTaskReports(job.jobId_);
finalPhaseInTask = "reduce";
}
assertEquals(1, reports.length);
assertEquals(expectedStatus + " > " + finalPhaseInTask,
reports[0].getState());
}
private void addHostCapacity(String hostName) {
synchronized (taskTrackers) {
int numTrackersOnHost = 0;
// add the capacity of trackers on the host
for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
updateTotalTaskCapacity(status);
int mapSlots = taskScheduler.getMaxSlots(status, TaskType.MAP);
int reduceSlots = taskScheduler.getMaxSlots(status, TaskType.REDUCE);
numTrackersOnHost++;
getInstrumentation().decBlackListedMapSlots(mapSlots);
getInstrumentation().decBlackListedReduceSlots(reduceSlots);
}
uniqueHostsMap.put(hostName,
numTrackersOnHost);
decrBlackListedTrackers(numTrackersOnHost);
}
}
@Override
public void recoverTask(int taskIndex, int attemptId) throws IOException {
if (!initialized) {
throw new RuntimeException("Committer not initialized");
}
TaskAttemptID taskAttemptID = new TaskAttemptID(
Long.toString(context.getApplicationId().getClusterTimestamp())
+ String.valueOf(context.getVertexIndex()),
context.getApplicationId().getId(),
((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
TaskType.MAP : TaskType.REDUCE)),
taskIndex, attemptId);
TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
taskAttemptID);
committer.recoverTask(taskContext);
}
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// Make one mapper slower for speculative execution
TaskAttemptID taid = context.getTaskAttemptID();
long sleepTime = 100;
Configuration conf = context.getConfiguration();
boolean test_speculate_map =
conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false);
// IF TESTING MAPPER SPECULATIVE EXECUTION:
// Make the "*_m_000000_0" attempt take much longer than the others.
// When speculative execution is enabled, this should cause the attempt
// to be killed and restarted. At that point, the attempt ID will be
// "*_m_000000_1", so sleepTime will still remain 100ms.
if ( (taid.getTaskType() == TaskType.MAP) && test_speculate_map
&& (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) {
sleepTime = 10000;
}
try{
Thread.sleep(sleepTime);
} catch(InterruptedException ie) {
// Ignore
}
context.write(value, new IntWritable(1));
}
@Override
public void open(String uId) throws Exception {
this.hash = uId.hashCode();
Job job = ((ConfigurableHDFSFileSink<K, V>) getWriteOperation().getSink()).jobInstance();
FileOutputFormat.setOutputPath(job, new Path(path));
// Each Writer is responsible for writing one bundle of elements and is represented by one
// unique Hadoop task based on uId/hash. All tasks share the same job ID. Since Dataflow
// handles retrying of failed bundles, each task has one attempt only.
JobID jobId = job.getJobID();
TaskID taskId = new TaskID(jobId, TaskType.REDUCE, hash);
configure(job);
context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID(taskId, 0));
FileOutputFormat<K, V> outputFormat = formatClass.newInstance();
recordWriter = outputFormat.getRecordWriter(context);
outputCommitter = (FileOutputCommitter) outputFormat.getOutputCommitter(context);
}
void validateTaskStderr(StreamJob job, TaskType type)
throws IOException {
TaskAttemptID attemptId =
new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0);
String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
attemptId, false);
// trim() is called on expectedStderr here because the method
// MapReduceTestUtil.readTaskLog() returns trimmed String.
assertTrue(log.equals(expectedStderr.trim()));
}
@Override
public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
switch (taskType) {
case MAP:
return new TaskInfo(m_bytesIn[taskNumber], m_recsIn[taskNumber],
m_bytesOut[taskNumber], m_recsOut[taskNumber], -1);
case REDUCE:
return new TaskInfo(r_bytesIn[taskNumber], r_recsIn[taskNumber],
r_bytesOut[taskNumber], r_recsOut[taskNumber], -1);
default:
throw new IllegalArgumentException("Not interested");
}
}
/**
* Get all the tasks of the desired type in this job.
* @param type {@link TaskType} of the tasks required
* @return An array of {@link TaskInProgress} matching the given type.
* Returns an empty array if no tasks are found for the given type.
*/
TaskInProgress[] getTasks(TaskType type) {
TaskInProgress[] tasks = null;
switch (type) {
case MAP:
{
tasks = maps;
}
break;
case REDUCE:
{
tasks = reduces;
}
break;
case JOB_SETUP:
{
tasks = setup;
}
break;
case JOB_CLEANUP:
{
tasks = cleanup;
}
break;
default:
{
tasks = new TaskInProgress[0];
}
break;
}
return tasks;
}
/** Get the event type */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the
// attempt-type can only be map/reduce.
return getTaskId().getTaskType() == TaskType.MAP
? EventType.MAP_ATTEMPT_STARTED
: EventType.REDUCE_ATTEMPT_STARTED;
}
private static TaskAttemptCompletionEvent createTce(int eventId,
boolean isMap, TaskAttemptCompletionEventStatus status) {
JobId jid = MRBuilderUtils.newJobId(12345, 1, 1);
TaskId tid = MRBuilderUtils.newTaskId(jid, 0,
isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
: org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
TaskAttemptCompletionEvent tce = recordFactory
.newRecordInstance(TaskAttemptCompletionEvent.class);
tce.setEventId(eventId);
tce.setAttemptId(attemptId);
tce.setStatus(status);
return tce;
}
/**
* Refresh speculative task candidates and running tasks. This needs to be
* called periodically to obtain fresh values.
*/
void refresh(long now) {
refreshCandidateSpeculativeMaps(now);
refreshCandidateSpeculativeReduces(now);
refreshTaskCountsAndWaitTime(TaskType.MAP, now);
refreshTaskCountsAndWaitTime(TaskType.REDUCE, now);
}
/**
* simple test TaskUpdatedEvent and TaskUpdated
*
* @throws Exception
*/
@Test(timeout = 10000)
public void testTaskUpdated() throws Exception {
JobID jid = new JobID("001", 1);
TaskID tid = new TaskID(jid, TaskType.REDUCE, 2);
TaskUpdatedEvent test = new TaskUpdatedEvent(tid, 1234L);
assertEquals(test.getTaskId().toString(), tid.toString());
assertEquals(test.getFinishTime(), 1234L);
}
static TaskType get20TaskType(String taskType) {
try {
return TaskType.valueOf(taskType);
} catch (IllegalArgumentException e) {
if ("CLEANUP".equals(taskType)) {
return TaskType.JOB_CLEANUP;
}
if ("SETUP".equals(taskType)) {
return TaskType.JOB_SETUP;
}
return null;
}
}
public TaskAttemptUnsuccessfulCompletionEvent
(TaskAttemptID id, TaskType taskType,
String status, long finishTime,
String hostname, int port, String rackName,
String error, int[][] allSplits) {
this(id, taskType, status, finishTime, hostname, port,
rackName, error, EMPTY_COUNTERS, null);
}
static TaskType get20TaskType(String taskType) {
try {
return TaskType.valueOf(taskType);
} catch (IllegalArgumentException e) {
if ("CLEANUP".equals(taskType)) {
return TaskType.JOB_CLEANUP;
}
if ("SETUP".equals(taskType)) {
return TaskType.JOB_SETUP;
}
return null;
}
}
@Override
public boolean canAssignReduce(TaskTrackerStatus tracker,
int totalRunnableReduces, int totalReduceSlots) {
int maxSlots = getFSMaxSlots(tracker.getTrackerName(), TaskType.REDUCE);
if (LOG.isDebugEnabled()) {
LOG.debug("fsMaxSlots:" + maxSlots +
" ttMaxSlots:" + tracker.getMaxReduceSlots() +
" ttOccupied:" + tracker.countOccupiedReduceSlots());
}
maxSlots = Math.min(maxSlots, tracker.getMaxReduceSlots());
return tracker.countOccupiedReduceSlots() < getCap(totalRunnableReduces,
maxSlots, totalReduceSlots);
}
/**
* Create an event to record task failure
* @param id Task ID
* @param finishTime Finish time of the task
* @param taskType Type of the task
* @param error Error String
* @param status Status
* @param failedDueToAttempt The attempt id due to which the task failed
* @param counters Counters for the task
*/
public TaskFailedEvent(TaskID id, long finishTime,
TaskType taskType, String error, String status,
TaskAttemptID failedDueToAttempt, Counters counters) {
this.id = id;
this.finishTime = finishTime;
this.taskType = taskType;
this.error = error;
this.status = status;
this.failedDueToAttempt = failedDueToAttempt;
this.counters = counters;
}
@Test
public void testReinit() throws Exception {
// Test that a split containing multiple files works correctly,
// with the child RecordReader getting its initialize() method
// called a second time.
TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
Configuration conf = new Configuration();
TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId);
// This will create a CombineFileRecordReader that itself contains a
// DummyRecordReader.
InputFormat inputFormat = new ChildRRInputFormat();
Path [] files = { new Path("file1"), new Path("file2") };
long [] lengths = { 1, 1 };
CombineFileSplit split = new CombineFileSplit(files, lengths);
RecordReader rr = inputFormat.createRecordReader(split, context);
assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);
// first initialize() call comes from MapTask. We'll do it here.
rr.initialize(split, context);
// First value is first filename.
assertTrue(rr.nextKeyValue());
assertEquals("file1", rr.getCurrentValue().toString());
// The inner RR will return false, because it only emits one (k, v) pair.
// But there's another sub-split to process. This returns true to us.
assertTrue(rr.nextKeyValue());
// And the 2nd rr will have its initialize method called correctly.
assertEquals("file2", rr.getCurrentValue().toString());
// But after both child RR's have returned their singleton (k, v), this
// should also return false.
assertFalse(rr.nextKeyValue());
}
@Override
public synchronized void speculateReduce(TaskAttemptID taskAttemptID,
boolean isUsingProcessingRate) {
aggregateJobStats.incNumSpeculativeReduces();
SpecStats.SpecType specType = isUsingProcessingRate ?
SpecStats.SpecType.PROCESSING : SpecStats.SpecType.PROGRESS;
specStats.incStat(SpecStats.TaskType.REDUCE, specType,
SpecStats.StatType.LAUNCHED_TASKS, 1);
}
/**
* Create an event to record start of a task
* @param id Task Id
* @param startTime Start time of the task
* @param taskType Type of the task
* @param splitLocations Split locations, applicable for map tasks
*/
public TaskStartedEvent(TaskID id, long startTime,
TaskType taskType, String splitLocations) {
datum.taskid = new Utf8(id.toString());
datum.splitLocations = new Utf8(splitLocations);
datum.startTime = startTime;
datum.taskType = new Utf8(taskType.name());
}
@Deprecated
static StringBuilder getTaskAttemptIDsPatternWOPrefix(String jtIdentifier
, Integer jobId, TaskType type, Integer taskId, Integer attemptId) {
StringBuilder builder = new StringBuilder();
builder.append(TaskID.getTaskIDsPatternWOPrefix(jtIdentifier
, jobId, type, taskId))
.append(SEPARATOR)
.append(attemptId != null ? attemptId : "[0-9]*");
return builder;
}