下面列出了org.apache.hadoop.mapreduce.TaskType#MAP 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Update fairshare for each JobInfo based on the weight, neededTasks and
* minTasks and the size of the pool. We compute the share by finding the
* ratio of (# of slots / weight) using binary search.
*/
private void updateFairShares(double totalSlots, final TaskType type) {
// Find the proper ratio of (# of slots share / weight) by bineary search
BinarySearcher searcher = new BinarySearcher() {
@Override
double targetFunction(double x) {
return slotsUsedWithWeightToSlotRatio(x, type);
}
};
double ratio = searcher.getSolution(totalSlots, lastWeightToFairShareRatio);
lastWeightToFairShareRatio = ratio;
// Set the fair shares based on the value of R we've converged to
for (JobInfo info : infos.values()) {
if (type == TaskType.MAP) {
info.mapFairShare = computeShare(info, ratio, type);
} else {
info.reduceFairShare = computeShare(info, ratio, type);
}
}
}
private List<InputSplit> createSplits(JobContext jobContext, List<DynamicInputChunk> chunks) throws IOException {
int numMaps = getNumMapTasks(jobContext.getConfiguration());
final int nSplits = Math.min(numMaps, chunks.size());
List<InputSplit> splits = new ArrayList<>(nSplits);
for (int i = 0; i < nSplits; ++i) {
TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
chunks.get(i).assignTo(taskId);
splits
.add(new FileSplit(chunks.get(i).getPath(), 0,
// Setting non-zero length for FileSplit size, to avoid a possible
// future when 0-sized file-splits are considered "empty" and skipped
// over.
getMinRecordsPerChunk(jobContext.getConfiguration()), null));
}
ConfigurationUtil.publish(jobContext.getConfiguration(), CONF_LABEL_NUM_SPLITS, splits.size());
return splits;
}
void runStreamJob(TaskType type, boolean isEmptyInput) throws IOException {
boolean mayExit = false;
StreamJob job = new StreamJob(genArgs(
mr.createJobConf().get(JTConfig.JT_IPC_ADDRESS), map, reduce), mayExit);
int returnValue = job.go();
assertEquals(0, returnValue);
// If input to reducer is empty, dummy reporter(which ignores all
// reporting lines) is set for MRErrorThread in waitOutputThreads(). So
// expectedCounterValue is 0 for empty-input-to-reducer case.
// Output of reducer is also empty for empty-input-to-reducer case.
int expectedCounterValue = 0;
if (type == TaskType.MAP || !isEmptyInput) {
validateTaskStatus(job, type);
// output is from "print STDOUT" statements in perl script
validateJobOutput(job.getConf());
expectedCounterValue = 2;
}
validateUserCounter(job, expectedCounterValue);
validateTaskStderr(job, type);
deleteOutDir(fs);
}
public MergeQueue(Configuration conf, FileSystem fs,
Path[] inputs, boolean deleteInputs,
CompressionCodec codec, RawComparator<K> comparator,
Progressable reporter,
Counters.Counter mergedMapOutputsCounter,
TaskType taskType)
throws IOException {
this.conf = conf;
this.fs = fs;
this.codec = codec;
this.comparator = comparator;
this.reporter = reporter;
if (taskType == TaskType.MAP) {
considerFinalMergeForProgress();
}
for (Path file : inputs) {
LOG.debug("MergeQ: adding: " + file);
segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs,
(file.toString().endsWith(
Task.MERGED_OUTPUT_PREFIX) ?
null : mergedMapOutputsCounter)));
}
// Sort segments on file-lengths
Collections.sort(segments, segmentComparator);
}
/** Get the event type */
public EventType getEventType() {
// Note that the task type can be setup/map/reduce/cleanup but the
// attempt-type can only be map/reduce.
// find out if the task failed or got killed
boolean failed = TaskStatus.State.FAILED.toString().equals(getTaskStatus());
return getTaskId().getTaskType() == TaskType.MAP
? (failed
? EventType.MAP_ATTEMPT_FAILED
: EventType.MAP_ATTEMPT_KILLED)
: (failed
? EventType.REDUCE_ATTEMPT_FAILED
: EventType.REDUCE_ATTEMPT_KILLED);
}
@Override
public void prepare(TSetContext ctx) {
this.context = ctx;
Configuration hadoopConf = this.wrappedConfiguration.getConfiguration();
jconf = new JobConf(hadoopConf);
try {
format = inputClazz.newInstance();
JobContext jobContext = new JobContextImpl(hadoopConf, new JobID(context.getId(),
context.getIndex()));
List<InputSplit> splits = format.getSplits(jobContext);
for (int i = 0; i < splits.size(); i++) {
if (i % context.getParallelism() == context.getIndex()) {
assignedSplits.add(splits.get(i));
}
}
if (assignedSplits.size() > 0) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
}
} catch (InstantiationException | IllegalAccessException
| InterruptedException | IOException e) {
throw new RuntimeException("Failed to initialize hadoop input", e);
}
}
/**
* Find the memory that is already used by all the running tasks
* residing on the given TaskTracker.
*
* @param taskTracker
* @param taskType
* @return amount of memory that is used by the residing tasks,
* null if memory cannot be computed for some reason.
*/
synchronized Long getMemReservedForTasks(
TaskTrackerStatus taskTracker, TaskType taskType) {
long vmem = 0;
for (TaskStatus task : taskTracker.getTaskReports()) {
// the following task states are one in which the slot is
// still occupied and hence memory of the task should be
// accounted in used memory.
if ((task.getRunState() == TaskStatus.State.RUNNING) ||
(task.getRunState() == TaskStatus.State.UNASSIGNED) ||
(task.inTaskCleanupPhase())) {
// Get the memory "allotted" for this task based on number of slots
long myVmem = 0;
if (task.getIsMap() && taskType == TaskType.MAP) {
long memSizePerMapSlot = scheduler.getMemSizeForMapSlot();
myVmem =
memSizePerMapSlot * task.getNumSlots();
} else if (!task.getIsMap()
&& taskType == TaskType.REDUCE) {
long memSizePerReduceSlot = scheduler.getMemSizeForReduceSlot();
myVmem = memSizePerReduceSlot * task.getNumSlots();
}
vmem += myVmem;
}
}
return Long.valueOf(vmem);
}
static public boolean isMap(TaskAttemptID taskAttemptID) {
TaskType type = taskAttemptID.getTaskType();
if (type==TaskType.MAP)
return true;
return false;
}
private TaskAttemptInfo getMockTaskAttemptInfo(TaskAttemptID tai,
TaskAttemptState tas) {
ContainerId ci = mock(ContainerId.class);
Counters counters = mock(Counters.class);
TaskType tt = TaskType.MAP;
long finishTime = System.currentTimeMillis();
TaskAttemptInfo mockTAinfo = mock(TaskAttemptInfo.class);
when(mockTAinfo.getAttemptId()).thenReturn(tai);
when(mockTAinfo.getContainerId()).thenReturn(ci);
when(mockTAinfo.getCounters()).thenReturn(counters);
when(mockTAinfo.getError()).thenReturn("");
when(mockTAinfo.getFinishTime()).thenReturn(finishTime);
when(mockTAinfo.getHostname()).thenReturn("localhost");
when(mockTAinfo.getHttpPort()).thenReturn(23);
when(mockTAinfo.getMapFinishTime()).thenReturn(finishTime - 1000L);
when(mockTAinfo.getPort()).thenReturn(24);
when(mockTAinfo.getRackname()).thenReturn("defaultRack");
when(mockTAinfo.getShuffleFinishTime()).thenReturn(finishTime - 2000L);
when(mockTAinfo.getShufflePort()).thenReturn(25);
when(mockTAinfo.getSortFinishTime()).thenReturn(finishTime - 3000L);
when(mockTAinfo.getStartTime()).thenReturn(finishTime -10000);
when(mockTAinfo.getState()).thenReturn("task in progress");
when(mockTAinfo.getTaskStatus()).thenReturn(tas.toString());
when(mockTAinfo.getTaskType()).thenReturn(tt);
when(mockTAinfo.getTrackerName()).thenReturn("TrackerName");
return mockTAinfo;
}
public List<Event> initialize() throws IOException {
getContext().requestInitialMemory(0l, null); // mandatory call
MRRuntimeProtos.MRInputUserPayloadProto mrUserPayload =
MRHelpers.parseMRInputPayload(getContext().getUserPayload());
Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
"Split information not expected in " + this.getClass().getName());
Configuration conf = MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
this.jobConf = new JobConf(conf);
// Add tokens to the jobConf - in case they are accessed within the RR / IF
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
TaskAttemptID taskAttemptId = new TaskAttemptID(
new TaskID(
Long.toString(getContext().getApplicationId().getClusterTimestamp()),
getContext().getApplicationId().getId(), TaskType.MAP,
getContext().getTaskIndex()),
getContext().getTaskAttemptNumber());
jobConf.set(MRJobConfig.TASK_ATTEMPT_ID,
taskAttemptId.toString());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
getContext().getDAGAttemptNumber());
this.inputRecordCounter = getContext().getCounters().findCounter(
TaskCounter.INPUT_RECORDS_PROCESSED);
useNewApi = this.jobConf.getUseNewMapper();
return null;
}
String[] getOrderedQueues(TaskType type) {
if (type == TaskType.MAP) {
return mapScheduler.getOrderedQueues();
} else if (type == TaskType.REDUCE) {
return reduceScheduler.getOrderedQueues();
}
return null;
}
public MRCombiner(TezTaskContext taskContext) throws IOException {
this.conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
assert(taskContext instanceof TezInputContext || taskContext instanceof TezOutputContext);
if (taskContext instanceof TezOutputContext) {
this.keyClass = ConfigUtils.getIntermediateOutputKeyClass(conf);
this.valClass = ConfigUtils.getIntermediateOutputValueClass(conf);
this.comparator = ConfigUtils.getIntermediateOutputKeyComparator(conf);
this.reporter = new MRTaskReporter((TezOutputContext)taskContext);
} else {
this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
this.reporter = new MRTaskReporter((TezInputContext)taskContext);
}
this.useNewApi = ConfigUtils.useNewApi(conf);
combineInputKeyCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
combineInputValueCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
boolean isMap = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR,false);
this.mrTaskAttemptID = new TaskAttemptID(
new TaskID(String.valueOf(taskContext.getApplicationId()
.getClusterTimestamp()), taskContext.getApplicationId().getId(),
isMap ? TaskType.MAP : TaskType.REDUCE,
taskContext.getTaskIndex()), taskContext.getTaskAttemptNumber());
LOG.info("Using combineKeyClass: " + keyClass + ", combineValueClass: " + valClass + ", combineComparator: " +comparator + ", useNewApi: " + useNewApi);
}
public static org.apache.hadoop.mapred.TaskID
createMockTaskAttemptIDFromTezTaskId(TezTaskID tezTaId, boolean isMap) {
TezVertexID vId = tezTaId.getVertexID();
ApplicationId appId = vId.getDAGId().getApplicationId();
return new org.apache.hadoop.mapred.TaskID(String.valueOf(appId.getClusterTimestamp())
+ String.valueOf(vId.getId()), appId.getId(),
isMap ? TaskType.MAP : TaskType.REDUCE, tezTaId.getId());
}
static public TaskAttemptID createTaskAttemptID(String jtIdentifier, int jobId, boolean isMap,
int taskId, int id) {
if (isMap) {
return new TaskAttemptID(jtIdentifier, jobId, TaskType.MAP, taskId, id);
} else {
return new TaskAttemptID(jtIdentifier, jobId, TaskType.REDUCE, taskId, id);
}
}
public MergeQueue(Configuration conf, FileSystem fs,
Path[] inputs, boolean deleteInputs,
CompressionCodec codec, RawComparator<K> comparator,
Progressable reporter,
Counters.Counter mergedMapOutputsCounter,
TaskType taskType)
throws IOException {
this.conf = conf;
this.fs = fs;
this.codec = codec;
this.comparator = comparator;
this.reporter = reporter;
if (taskType == TaskType.MAP) {
considerFinalMergeForProgress();
}
for (Path file : inputs) {
LOG.debug("MergeQ: adding: " + file);
segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs,
(file.toString().endsWith(
Task.MERGED_OUTPUT_PREFIX) ?
null : mergedMapOutputsCounter)));
}
// Sort segments on file-lengths
Collections.sort(segments, segmentComparator);
}
/**
* Test {@link LoadJob.ResourceUsageMatcherRunner}.
*/
@Test
@SuppressWarnings("unchecked")
public void testResourceUsageMatcherRunner() throws Exception {
Configuration conf = new Configuration();
FakeProgressive progress = new FakeProgressive();
// set the resource calculator plugin
conf.setClass(TTConfig.TT_RESOURCE_CALCULATOR_PLUGIN,
DummyResourceCalculatorPlugin.class,
ResourceCalculatorPlugin.class);
// set the resources
// set the resource implementation class
conf.setClass(ResourceUsageMatcher.RESOURCE_USAGE_EMULATION_PLUGINS,
TestResourceUsageEmulatorPlugin.class,
ResourceUsageEmulatorPlugin.class);
long currentTime = System.currentTimeMillis();
// initialize the matcher class
TaskAttemptID id = new TaskAttemptID("test", 1, TaskType.MAP, 1, 1);
StatusReporter reporter = new DummyReporter(progress);
TaskInputOutputContext context =
new MapContextImpl(conf, id, null, null, null, reporter, null);
FakeResourceUsageMatcherRunner matcher =
new FakeResourceUsageMatcherRunner(context, null);
// check if the matcher initialized the plugin
String identifier = TestResourceUsageEmulatorPlugin.DEFAULT_IDENTIFIER;
long initTime =
TestResourceUsageEmulatorPlugin.testInitialization(identifier, conf);
assertTrue("ResourceUsageMatcherRunner failed to initialize the"
+ " configured plugin", initTime > currentTime);
// check the progress
assertEquals("Progress mismatch in ResourceUsageMatcherRunner",
0, progress.getProgress(), 0D);
// call match() and check progress
progress.setProgress(0.01f);
currentTime = System.currentTimeMillis();
matcher.test();
long emulateTime =
TestResourceUsageEmulatorPlugin.testEmulation(identifier, conf);
assertTrue("ProgressBasedResourceUsageMatcher failed to load and emulate"
+ " the configured plugin", emulateTime > currentTime);
}
@SuppressWarnings("rawtypes")
@Test
public <K, V> void TestSucceedAndFailedCopyMap() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
//mock creation
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
Reporter mockReporter = mock(Reporter.class);
FileSystem mockFileSystem = mock(FileSystem.class);
Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = job.getCombinerClass();
@SuppressWarnings("unchecked") // needed for mock with generic
CombineOutputCollector<K, V> mockCombineOutputCollector =
(CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
Counter mockCounter = mock(Counter.class);
TaskStatus mockTaskStatus = mock(TaskStatus.class);
Progress mockProgress = mock(Progress.class);
MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
Task mockTask = mock(Task.class);
@SuppressWarnings("unchecked")
MapOutput<K, V> output = mock(MapOutput.class);
ShuffleConsumerPlugin.Context<K, V> context =
new ShuffleConsumerPlugin.Context<K, V>(
mockTaskAttemptID, job, mockFileSystem,
mockUmbilical, mockLocalDirAllocator,
mockReporter, mockCompressionCodec,
combinerClass, mockCombineOutputCollector,
mockCounter, mockCounter, mockCounter,
mockCounter, mockCounter, mockCounter,
mockTaskStatus, mockProgress, mockProgress,
mockTask, mockMapOutputFile, null);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
status, null, null, progress, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
MapHost host1 = new MapHost("host1", null);
TaskAttemptID failedAttemptID = new TaskAttemptID(
new org.apache.hadoop.mapred.TaskID(
new JobID("test",0), TaskType.MAP, 0), 0);
TaskAttemptID succeedAttemptID = new TaskAttemptID(
new org.apache.hadoop.mapred.TaskID(
new JobID("test",0), TaskType.MAP, 1), 1);
// handle output fetch failure for failedAttemptID, part I
scheduler.hostFailed(host1.getHostName());
// handle output fetch succeed for succeedAttemptID
long bytes = (long)500 * 1024 * 1024;
scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output);
// handle output fetch failure for failedAttemptID, part II
// for MAPREDUCE-6361: verify no NPE exception get thrown out
scheduler.copyFailed(failedAttemptID, host1, true, false);
}
/**
* Obtain the overall number of the slots limit of a tasktracker
* @param status The status of the tasktracker
* @param type The type of the task
* @return The number of maximum slots
*/
public int getMaxSlots(TaskTrackerStatus status, TaskType type) {
return (type == TaskType.MAP) ? status.getMaxMapSlots() :
status.getMaxReduceSlots();
}
/**
* Constructs a TaskInProgressId object from given parts.
* @param jtIdentifier jobTracker identifier
* @param jobId job number
* @param isMap whether the tip is a map
* @param id the tip number
* @deprecated Use {@link #TaskID(org.apache.hadoop.mapreduce.JobID, TaskType,
* int)}
*/
@Deprecated
public TaskID(String jtIdentifier, int jobId, boolean isMap, int id) {
this(jtIdentifier, jobId, isMap ? TaskType.MAP : TaskType.REDUCE, id);
}
/**
* Get the {@link JobInProgress} for which the fallow slot(s) are held.
* @param taskType {@link TaskType} of the task
* @return the task for which the fallow slot(s) are held,
* <code>null</code> if there are no fallow slots
*/
public JobInProgress getJobForFallowSlot(TaskType taskType) {
return
(taskType == TaskType.MAP) ? jobForFallowMapSlot : jobForFallowReduceSlot;
}