下面列出了org.apache.hadoop.mapred.JobContextImpl#org.apache.hadoop.mapred.TaskAttemptContextImpl 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public boolean hasNext() {
if (currentReader != null) {
try {
boolean current = currentReader.nextKeyValue();
while (!current && consumingSplit < assignedSplits.size() - 1) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
consumingSplit++;
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
current = currentReader.nextKeyValue();
}
return current;
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Failed to read the next key vale", e);
}
}
return false;
}
@Override
public boolean hasNext() {
if (currentReader != null) {
try {
boolean current = currentReader.nextKeyValue();
while (!current && consumingSplit < assignedSplits.size() - 1) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
consumingSplit++;
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
current = currentReader.nextKeyValue();
}
return current;
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Failed to read the next key vale", e);
}
}
return false;
}
/**
* create the temporary output file for hadoop RecordWriter.
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws java.io.IOException
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
}
/**
* create the temporary output file for hadoop RecordWriter.
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws java.io.IOException
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
}
@Override
public void prepare(TSetContext ctx) {
this.context = ctx;
Configuration hadoopConf = this.wrappedConfiguration.getConfiguration();
jconf = new JobConf(hadoopConf);
try {
format = inputClazz.newInstance();
JobContext jobContext = new JobContextImpl(hadoopConf, new JobID(context.getId(),
context.getIndex()));
List<InputSplit> splits = format.getSplits(jobContext);
for (int i = 0; i < splits.size(); i++) {
if (i % context.getParallelism() == context.getIndex()) {
assignedSplits.add(splits.get(i));
}
}
if (assignedSplits.size() > 0) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
}
} catch (InstantiationException | IllegalAccessException
| InterruptedException | IOException e) {
throw new RuntimeException("Failed to initialize hadoop input", e);
}
}
@Override
public void prepare(TSetContext ctx) {
this.context = ctx;
Configuration hadoopConf = this.wrappedConfiguration.getConfiguration();
jconf = new JobConf(hadoopConf);
try {
format = inputClazz.newInstance();
JobContext jobContext = new JobContextImpl(hadoopConf, new JobID(context.getId(),
context.getIndex()));
List<InputSplit> splits = format.getSplits(jobContext);
for (int i = 0; i < splits.size(); i++) {
if (i % context.getParallelism() == context.getIndex()) {
assignedSplits.add(splits.get(i));
}
}
if (assignedSplits.size() > 0) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
}
} catch (InstantiationException | IllegalAccessException
| InterruptedException | IOException e) {
throw new RuntimeException("Failed to initialize hadoop input", e);
}
}
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
TaskAttemptContext taskContext =
new TaskAttemptContextImpl(taskAttempt.conf,
TypeConverter.fromYarn(taskAttempt.attemptId));
taskAttempt.eventHandler.handle(new CommitterTaskAbortEvent(
taskAttempt.attemptId, taskContext));
}
public void testAbort() throws IOException {
JobConf job = new JobConf();
setConfForFileOutputCommitter(job);
JobContext jContext = new JobContextImpl(job, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
FileOutputFormat.setWorkOutputPath(job, committer
.getTaskAttemptPath(tContext));
// do setup
committer.setupJob(jContext);
committer.setupTask(tContext);
String file = "test.txt";
// A reporter that does nothing
Reporter reporter = Reporter.NULL;
// write output
FileSystem localFs = FileSystem.getLocal(job);
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(localFs,
job, file, reporter);
writeOutput(theRecordWriter, reporter);
// do abort
committer.abortTask(tContext);
File expectedFile = new File(new Path(committer
.getTaskAttemptPath(tContext), file).toString());
assertFalse("task temp dir still exists", expectedFile.exists());
committer.abortJob(jContext, JobStatus.State.FAILED);
expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
.toString());
assertFalse("job temp dir "+expectedFile+" still exists", expectedFile.exists());
assertEquals("Output directory not empty", 0, new File(outDir.toString())
.listFiles().length);
FileUtil.fullyDelete(new File(outDir.toString()));
}
@SuppressWarnings("unchecked")
@Override
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
TaskAttemptContext taskContext =
new TaskAttemptContextImpl(taskAttempt.conf,
TypeConverter.fromYarn(taskAttempt.attemptId));
taskAttempt.eventHandler.handle(new CommitterTaskAbortEvent(
taskAttempt.attemptId, taskContext));
}
public void testAbort() throws IOException {
JobConf job = new JobConf();
setConfForFileOutputCommitter(job);
JobContext jContext = new JobContextImpl(job, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
FileOutputFormat.setWorkOutputPath(job, committer
.getTaskAttemptPath(tContext));
// do setup
committer.setupJob(jContext);
committer.setupTask(tContext);
String file = "test.txt";
// A reporter that does nothing
Reporter reporter = Reporter.NULL;
// write output
FileSystem localFs = FileSystem.getLocal(job);
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(localFs,
job, file, reporter);
writeOutput(theRecordWriter, reporter);
// do abort
committer.abortTask(tContext);
File expectedFile = new File(new Path(committer
.getTaskAttemptPath(tContext), file).toString());
assertFalse("task temp dir still exists", expectedFile.exists());
committer.abortJob(jContext, JobStatus.State.FAILED);
expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
.toString());
assertFalse("job temp dir "+expectedFile+" still exists", expectedFile.exists());
assertEquals("Output directory not empty", 0, new File(outDir.toString())
.listFiles().length);
FileUtil.fullyDelete(new File(outDir.toString()));
}
@BeforeClass
public void setUp() throws Exception {
m_workdir = new Path(
System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
m_conf = new JobConf();
m_rand = Utils.createRandom();
unsafe = Utils.getUnsafe();
try {
m_fs = FileSystem.getLocal(m_conf).getRaw();
m_fs.delete(m_workdir, true);
m_fs.mkdirs(m_workdir);
} catch (IOException e) {
throw new IllegalStateException("bad fs init", e);
}
m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
MneConfigHelper.setBaseOutputName(m_conf, null, "chunk-data");
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[]{DurableType.CHUNK});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[]{});
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setMemPoolSize(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[]{DurableType.CHUNK});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[]{});
}
@BeforeClass
public void setUp() throws IOException {
m_workdir = new Path(
System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
m_conf = new JobConf();
m_rand = Utils.createRandom();
try {
m_fs = FileSystem.getLocal(m_conf).getRaw();
m_fs.delete(m_workdir, true);
m_fs.mkdirs(m_workdir);
} catch (IOException e) {
throw new IllegalStateException("bad fs init", e);
}
m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
MneConfigHelper.setBaseOutputName(m_conf, null, "long-data");
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[]{DurableType.LONG});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[]{});
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setMemPoolSize(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 2);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[]{DurableType.LONG});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[]{});
}
@BeforeClass
public void setUp() throws IOException {
m_workdir = new Path(
System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
m_conf = new JobConf();
m_rand = Utils.createRandom();
try {
m_fs = FileSystem.getLocal(m_conf).getRaw();
m_fs.delete(m_workdir, true);
m_fs.mkdirs(m_workdir);
} catch (IOException e) {
throw new IllegalStateException("bad fs init", e);
}
m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
MneConfigHelper.setBaseOutputName(m_conf, null, "person-data");
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[]{DurableType.DURABLE});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[]{PersonListEFProxy.class});
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setMemPoolSize(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[]{DurableType.DURABLE});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[]{PersonListEFProxy.class});
}
/**
* Commit task.
*
* @throws IOException In failed.
*/
public void commit() throws IOException {
if (writer != null) {
OutputCommitter outputCommitter = jobConf.getOutputCommitter();
TaskAttemptContext taskCtx = new TaskAttemptContextImpl(jobConf, attempt);
if (outputCommitter.needsTaskCommit(taskCtx))
outputCommitter.commitTask(taskCtx);
}
}
/**
* Abort task.
*/
public void abort() {
try {
if (writer != null)
jobConf.getOutputCommitter().abortTask(new TaskAttemptContextImpl(jobConf, attempt));
}
catch (IOException ignore) {
// No-op.
}
}
/**
* create the temporary output file for hadoop RecordWriter.
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws java.io.IOException
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
StorageDescriptor sd = hiveTablePartition.getStorageDescriptor();
Object serdeLib = Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
Preconditions.checkArgument(serdeLib instanceof Serializer && serdeLib instanceof Deserializer,
"Expect a SerDe lib implementing both Serializer and Deserializer, but actually got " + serdeLib.getClass().getName());
recordSerDe = (Serializer) serdeLib;
ReflectionUtils.setConf(recordSerDe, jobConf);
// TODO: support partition properties, for now assume they're same as table properties
SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, tableProperties, null);
outputClass = recordSerDe.getSerializedClass();
} catch (IllegalAccessException | SerDeException | InstantiationException | ClassNotFoundException e) {
throw new FlinkRuntimeException("Error initializing Hive serializer", e);
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber).length()) + "s", " ").replace(" ", "0")
+ taskNumber + "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
if (!isDynamicPartition) {
staticWriter = writerForLocation(hiveTablePartition.getStorageDescriptor().getLocation());
} else {
dynamicPartitionOffset = fieldNames.length - partitionColumns.size() + hiveTablePartition.getPartitionSpec().size();
}
numNonPartitionColumns = isPartitioned ? fieldNames.length - partitionColumns.size() : fieldNames.length;
hiveConversions = new HiveObjectConversion[numNonPartitionColumns];
List<ObjectInspector> objectInspectors = new ArrayList<>(hiveConversions.length);
for (int i = 0; i < numNonPartitionColumns; i++) {
ObjectInspector objectInspector = HiveInspectors.getObjectInspector(fieldTypes[i]);
objectInspectors.add(objectInspector);
hiveConversions[i] = HiveInspectors.getConversion(objectInspector, fieldTypes[i].getLogicalType());
}
if (!isPartitioned) {
rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(fieldNames),
objectInspectors);
} else {
rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
Arrays.asList(fieldNames).subList(0, fieldNames.length - partitionColumns.size()),
objectInspectors);
defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
}
}
@SuppressWarnings("unchecked")
public void testCommitter() throws Exception {
JobConf job = new JobConf();
setConfForFileOutputCommitter(job);
JobContext jContext = new JobContextImpl(job, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
FileOutputFormat.setWorkOutputPath(job,
committer.getTaskAttemptPath(tContext));
committer.setupJob(jContext);
committer.setupTask(tContext);
String file = "test.txt";
// A reporter that does nothing
Reporter reporter = Reporter.NULL;
// write output
FileSystem localFs = FileSystem.getLocal(job);
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter =
theOutputFormat.getRecordWriter(localFs, job, file, reporter);
writeOutput(theRecordWriter, reporter);
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
// validate output
File expectedFile = new File(new Path(outDir, file).toString());
StringBuffer expectedOutput = new StringBuffer();
expectedOutput.append(key1).append('\t').append(val1).append("\n");
expectedOutput.append(val1).append("\n");
expectedOutput.append(val2).append("\n");
expectedOutput.append(key2).append("\n");
expectedOutput.append(key1).append("\n");
expectedOutput.append(key2).append('\t').append(val2).append("\n");
String output = UtilsForTests.slurp(expectedFile);
assertEquals(output, expectedOutput.toString());
FileUtil.fullyDelete(new File(outDir.toString()));
}
@SuppressWarnings("unchecked")
public void testCommitter() throws Exception {
JobConf job = new JobConf();
setConfForFileOutputCommitter(job);
JobContext jContext = new JobContextImpl(job, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
FileOutputFormat.setWorkOutputPath(job,
committer.getTaskAttemptPath(tContext));
committer.setupJob(jContext);
committer.setupTask(tContext);
String file = "test.txt";
// A reporter that does nothing
Reporter reporter = Reporter.NULL;
// write output
FileSystem localFs = FileSystem.getLocal(job);
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter =
theOutputFormat.getRecordWriter(localFs, job, file, reporter);
writeOutput(theRecordWriter, reporter);
// do commit
committer.commitTask(tContext);
committer.commitJob(jContext);
// validate output
File expectedFile = new File(new Path(outDir, file).toString());
StringBuffer expectedOutput = new StringBuffer();
expectedOutput.append(key1).append('\t').append(val1).append("\n");
expectedOutput.append(val1).append("\n");
expectedOutput.append(val2).append("\n");
expectedOutput.append(key2).append("\n");
expectedOutput.append(key1).append("\n");
expectedOutput.append(key2).append('\t').append(val2).append("\n");
String output = UtilsForTests.slurp(expectedFile);
assertEquals(output, expectedOutput.toString());
FileUtil.fullyDelete(new File(outDir.toString()));
}
/**
* Setup task.
*
* @throws IOException If failed.
*/
public void setup() throws IOException {
if (writer != null)
jobConf.getOutputCommitter().setupTask(new TaskAttemptContextImpl(jobConf, attempt));
}