下面列出了org.apache.hadoop.mapreduce.task.ReduceContextImpl#org.apache.hadoop.util.Progress 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
List<Segment<K, V>> segments,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
boolean sortSegments,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments,
TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
List<Segment<K, V>> segments,
int mergeFactor, int inMemSegments, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
boolean sortSegments,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments,
TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
tmpDir,
readsCounter, writesCounter,
mergePhase);
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec,
List<Segment<K, V>> segments,
int mergeFactor, int inMemSegments, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
boolean sortSegments,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments, codec,
TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
tmpDir,
readsCounter, writesCounter,
mergePhase);
}
public static <K extends Object, V extends Object>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class keyClass, Class valueClass,
CompressionCodec codec,
List<Segment> segments,
int mergeFactor, Path tmpDir,
RawComparator comparator, Progressable reporter,
boolean sortSegments,
boolean considerFinalMergeForProgress,
TezCounter readsCounter,
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
sortSegments, codec, considerFinalMergeForProgress).
merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
bytesReadCounter,
mergePhase);
}
@SuppressWarnings("unchecked")
private ValuesIterator createEmptyIterator(boolean inMemory)
throws IOException, InterruptedException {
if (!inMemory) {
streamPaths = new Path[0];
//This will return EmptyIterator
rawKeyValueIterator =
TezMerger.merge(conf, fs, keyClass, valClass, null,
false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator,
new ProgressReporter(), null, null, null, null);
} else {
List<TezMerger.Segment> segments = Lists.newLinkedList();
//This will return EmptyIterator
rawKeyValueIterator =
TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir,
comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"),
new GenericCounter("writesCounter", "y1"),
new GenericCounter("bytesReadCounter", "y2"), new Progress());
}
return new ValuesIterator(rawKeyValueIterator, comparator,
keyClass, valClass, conf, (TezCounter) new GenericCounter("inputKeyCounter", "y3"),
(TezCounter) new GenericCounter("inputValueCounter", "y4"));
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec,
Path[] inputs, boolean deleteInputs,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
throws IOException {
return
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
reporter, null,
TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec,
Path[] inputs, boolean deleteInputs,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator,
Progressable reporter,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Counters.Counter mergedMapOutputsCounter,
Progress mergePhase)
throws IOException {
return
new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
reporter, mergedMapOutputsCounter,
TaskType.REDUCE).merge(
keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
List<Segment<K, V>> segments,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
boolean sortSegments,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments,
TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec,
List<Segment<K, V>> segments,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
boolean sortSegments,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase,
TaskType taskType)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments, codec,
taskType).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
List<Segment<K, V>> segments,
int mergeFactor, int inMemSegments, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
boolean sortSegments,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments,
TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
tmpDir,
readsCounter, writesCounter,
mergePhase);
}
/**
* Create sample data (in memory), with an attached counter and return ValuesIterator
*
* @param inMemory
* @param keyCounter
* @param tupleCounter
* @return ValuesIterator
* @throws IOException
*/
@SuppressWarnings("unchecked")
private ValuesIterator createCountedIterator(boolean inMemory, TezCounter keyCounter, TezCounter tupleCounter)
throws IOException, InterruptedException {
if (!inMemory) {
streamPaths = createFiles();
//Merge all files to get KeyValueIterator
rawKeyValueIterator =
TezMerger.merge(conf, fs, keyClass, valClass, null,
false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator,
new ProgressReporter(), null, null, null, null);
} else {
List<TezMerger.Segment> segments = createInMemStreams();
rawKeyValueIterator =
TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir,
comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"),
new GenericCounter("writesCounter", "y1"),
new GenericCounter("bytesReadCounter", "y2"), new Progress());
}
return new ValuesIterator(rawKeyValueIterator, comparator,
keyClass, valClass, conf, keyCounter, tupleCounter);
}
public static
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class keyClass, Class valueClass,
CompressionCodec codec, boolean ifileReadAhead,
int ifileReadAheadLength, int ifileBufferSize,
Path[] inputs, boolean deleteInputs,
int mergeFactor, Path tmpDir,
RawComparator comparator, Progressable reporter,
TezCounter readsCounter,
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
throws IOException, InterruptedException {
return
new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, false, comparator,
reporter, null).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
bytesReadCounter,
mergePhase);
}
public static <K extends Object, V extends Object>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class keyClass, Class valueClass,
CompressionCodec codec,
List<Segment> segments,
int mergeFactor, int inMemSegments, Path tmpDir,
RawComparator comparator, Progressable reporter,
boolean sortSegments,
TezCounter readsCounter,
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
sortSegments, codec, false).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
tmpDir,
readsCounter, writesCounter,
bytesReadCounter,
mergePhase);
}
public static <K extends Object, V extends Object>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class keyClass, Class valueClass,
List<Segment> segments,
int mergeFactor, Path tmpDir,
RawComparator comparator, Progressable reporter,
boolean sortSegments,
TezCounter readsCounter,
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
sortSegments, false).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
bytesReadCounter, mergePhase);
}
public static <K extends Object, V extends Object>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class keyClass, Class valueClass,
CompressionCodec codec,
List<Segment> segments,
int mergeFactor, int inMemSegments, Path tmpDir,
RawComparator comparator, Progressable reporter,
boolean sortSegments,
TezCounter readsCounter,
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
throws IOException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
sortSegments, codec, false).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
tmpDir,
readsCounter, writesCounter,
bytesReadCounter,
mergePhase);
}
public static <K extends Object, V extends Object>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class keyClass, Class valueClass,
List<Segment> segments,
int mergeFactor, Path tmpDir,
RawComparator comparator, Progressable reporter,
boolean sortSegments,
TezCounter readsCounter,
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
throws IOException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
sortSegments, false).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
bytesReadCounter, mergePhase);
}
public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
JobConf jobConf, FileSystem localFS,
TaskUmbilicalProtocol umbilical,
LocalDirAllocator localDirAllocator,
Reporter reporter, CompressionCodec codec,
Class<? extends Reducer> combinerClass,
CombineOutputCollector<K,V> combineCollector,
Counters.Counter spilledRecordsCounter,
Counters.Counter reduceCombineInputCounter,
Counters.Counter shuffledMapsCounter,
Counters.Counter reduceShuffleBytes,
Counters.Counter failedShuffleCounter,
Counters.Counter mergedMapOutputsCounter,
TaskStatus status, Progress copyPhase, Progress mergePhase,
Task reduceTask, MapOutputFile mapOutputFile,
Map<TaskAttemptID, MapOutputFile> localMapFiles) {
this.reduceId = reduceId;
this.jobConf = jobConf;
this.localFS = localFS;
this. umbilical = umbilical;
this.localDirAllocator = localDirAllocator;
this.reporter = reporter;
this.codec = codec;
this.combinerClass = combinerClass;
this.combineCollector = combineCollector;
this.spilledRecordsCounter = spilledRecordsCounter;
this.reduceCombineInputCounter = reduceCombineInputCounter;
this.shuffledMapsCounter = shuffledMapsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
this.failedShuffleCounter = failedShuffleCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
this.status = status;
this.copyPhase = copyPhase;
this.mergePhase = mergePhase;
this.reduceTask = reduceTask;
this.mapOutputFile = mapOutputFile;
this.localMapFiles = localMapFiles;
}
public TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
int factor, Path tmpDir,
TezCounter readsCounter,
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
throws IOException, InterruptedException {
return merge(keyClass, valueClass, factor, 0, tmpDir,
readsCounter, writesCounter, bytesReadCounter, mergePhase);
}
public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
TaskAttemptID reduceId,
ExceptionReporter reporter,
Progress progress,
Counters.Counter shuffledMapsCounter,
Counters.Counter reduceShuffleBytes,
Counters.Counter failedShuffleCounter) {
totalMaps = job.getNumMapTasks();
abortFailureLimit = Math.max(30, totalMaps / 10);
copyTimeTracker = new CopyTimeTracker();
remainingMaps = totalMaps;
finishedMaps = new boolean[remainingMaps];
this.reporter = reporter;
this.status = status;
this.reduceId = reduceId;
this.progress = progress;
this.shuffledMapsCounter = shuffledMapsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
this.failedShuffleCounter = failedShuffleCounter;
this.startTime = Time.monotonicNow();
lastProgressTime = startTime;
referee.start();
this.maxFailedUniqueFetches = Math.min(totalMaps, 5);
this.maxFetchFailuresBeforeReporting = job.getInt(
MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
this.reportReadErrorImmediately = job.getBoolean(
MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
this.maxHostFailures = job.getInt(
MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES);
}
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
0, 0);
ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
reduceId, null, progress, null, null, null);
JobID jobId = new JobID();
TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
scheduler.tipFailed(taskId1);
Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
0.0f);
Assert.assertFalse(scheduler.waitUntilDone(1));
TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
scheduler.tipFailed(taskId0);
Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
0.0f);
Assert.assertTrue(scheduler.waitUntilDone(1));
}
public static <K extends Object, V extends Object>
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
List<Segment<K, V>> segments,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
throws IOException {
return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
comparator, reporter, false, readsCounter, writesCounter,
mergePhase);
}
public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
int factor, Path tmpDir,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
throws IOException {
return merge(keyClass, valueClass, factor, 0, tmpDir,
readsCounter, writesCounter, mergePhase);
}
public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId,
JobConf jobConf, FileSystem localFS,
TaskUmbilicalProtocol umbilical,
LocalDirAllocator localDirAllocator,
Reporter reporter, CompressionCodec codec,
Class<? extends Reducer> combinerClass,
CombineOutputCollector<K,V> combineCollector,
Counters.Counter spilledRecordsCounter,
Counters.Counter reduceCombineInputCounter,
Counters.Counter shuffledMapsCounter,
Counters.Counter reduceShuffleBytes,
Counters.Counter failedShuffleCounter,
Counters.Counter mergedMapOutputsCounter,
TaskStatus status, Progress copyPhase, Progress mergePhase,
Task reduceTask, MapOutputFile mapOutputFile,
Map<TaskAttemptID, MapOutputFile> localMapFiles) {
this.reduceId = reduceId;
this.jobConf = jobConf;
this.localFS = localFS;
this. umbilical = umbilical;
this.localDirAllocator = localDirAllocator;
this.reporter = reporter;
this.codec = codec;
this.combinerClass = combinerClass;
this.combineCollector = combineCollector;
this.spilledRecordsCounter = spilledRecordsCounter;
this.reduceCombineInputCounter = reduceCombineInputCounter;
this.shuffledMapsCounter = shuffledMapsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
this.failedShuffleCounter = failedShuffleCounter;
this.mergedMapOutputsCounter = mergedMapOutputsCounter;
this.status = status;
this.copyPhase = copyPhase;
this.mergePhase = mergePhase;
this.reduceTask = reduceTask;
this.mapOutputFile = mapOutputFile;
this.localMapFiles = localMapFiles;
}
public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
TaskAttemptID reduceId,
ExceptionReporter reporter,
Progress progress,
Counters.Counter shuffledMapsCounter,
Counters.Counter reduceShuffleBytes,
Counters.Counter failedShuffleCounter) {
totalMaps = job.getNumMapTasks();
abortFailureLimit = Math.max(30, totalMaps / 10);
copyTimeTracker = new CopyTimeTracker();
remainingMaps = totalMaps;
finishedMaps = new boolean[remainingMaps];
this.reporter = reporter;
this.status = status;
this.reduceId = reduceId;
this.progress = progress;
this.shuffledMapsCounter = shuffledMapsCounter;
this.reduceShuffleBytes = reduceShuffleBytes;
this.failedShuffleCounter = failedShuffleCounter;
this.startTime = Time.monotonicNow();
lastProgressTime = startTime;
referee.start();
this.maxFailedUniqueFetches = Math.min(totalMaps, 5);
this.maxFetchFailuresBeforeReporting = job.getInt(
MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
this.reportReadErrorImmediately = job.getBoolean(
MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
this.maxHostFailures = job.getInt(
MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES);
}
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
0, 0);
ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
reduceId, null, progress, null, null, null);
JobID jobId = new JobID();
TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
scheduler.tipFailed(taskId1);
Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
0.0f);
Assert.assertFalse(scheduler.waitUntilDone(1));
TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
scheduler.tipFailed(taskId0);
Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
0.0f);
Assert.assertTrue(scheduler.waitUntilDone(1));
}
private void merge(List<Path> pathList, int mergeFactor, RawComparator rc) throws Exception {
//Merge datasets
TezMerger merger = new TezMerger();
TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
true, mergeFactor, new Path(workDir, "tmp_" + System.nanoTime()),
((rc == null) ? comparator : rc), new Reporter(), null, null,
null,
new Progress());
verifyData(records);
verificationDataSet.clear();
}
@Private
public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
// wait for input so that iterator is available
synchronized(this) {
if (getNumPhysicalInputs() == 0) {
return new TezRawKeyValueIterator() {
@Override
public DataInputBuffer getKey() throws IOException {
throw new RuntimeException("No data available in Input");
}
@Override
public DataInputBuffer getValue() throws IOException {
throw new RuntimeException("No data available in Input");
}
@Override
public boolean next() throws IOException {
return false;
}
@Override
public void close() throws IOException {
}
@Override
public Progress getProgress() {
progress.complete();
return progress;
}
};
}
}
waitForInputReady();
synchronized(this) {
return rawIter;
}
}
@Override
public Progress getProgress() {
return null;
}
public Progress getProgress() {
return null;
}