下面列出了org.apache.hadoop.mapred.TaskAttemptID 类实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.addResource("config/all-config.xml");
ctx = new TaskAttemptContextImpl(conf, new TaskAttemptID());
ctx.getConfiguration().setInt(ContentIndexingColumnBasedHandler.NUM_SHARDS, 131);
ctx.getConfiguration().set(ContentIndexingColumnBasedHandler.SHARD_TNAME, "shard");
ctx.getConfiguration().set(ContentIndexingColumnBasedHandler.SHARD_GIDX_TNAME, "shardIndex");
ctx.getConfiguration().set(ContentIndexingColumnBasedHandler.SHARD_GRIDX_TNAME, "shardIndex");
ctx.getConfiguration().set(TypeRegistry.INGEST_DATA_TYPES, "test");
ctx.getConfiguration().set("data.name", "test");
ctx.getConfiguration().set("test.data.auth.id.mode", "NEVER");
ctx.getConfiguration().set("test" + BaseIngestHelper.DEFAULT_TYPE, LcNoDiacriticsType.class.getName());
ctx.getConfiguration().set("test" + TypeRegistry.HANDLER_CLASSES, TestContentIndexingColumnBasedHandler.class.getName());
ctx.getConfiguration().set("test" + TypeRegistry.RAW_READER, TestEventRecordReader.class.getName());
ctx.getConfiguration().set("test" + TypeRegistry.INGEST_HELPER, TestContentBaseIngestHelper.class.getName());
ctx.getConfiguration().set(TypeRegistry.EXCLUDED_HANDLER_CLASSES, "FAKE_HANDLER_CLASS"); // it will die if this field is not faked
helper = new TestContentBaseIngestHelper();
colVis = new ColumnVisibility("");
}
private TaskInputOutputContextImpl getTaskInputOutputContext(final String testFilePath, final Configuration conf) {
return new TaskInputOutputContextImpl(conf, new TaskAttemptID(), null, null, null) {
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public Object getCurrentKey() throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@Deprecated
public Path[] getLocalCacheFiles() throws IOException {
return new Path[] {new Path(testFilePath)};
}
};
}
@Override
public boolean hasNext() {
if (currentReader != null) {
try {
boolean current = currentReader.nextKeyValue();
while (!current && consumingSplit < assignedSplits.size() - 1) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
consumingSplit++;
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
current = currentReader.nextKeyValue();
}
return current;
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Failed to read the next key vale", e);
}
}
return false;
}
@Override
public boolean hasNext() {
if (currentReader != null) {
try {
boolean current = currentReader.nextKeyValue();
while (!current && consumingSplit < assignedSplits.size() - 1) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
consumingSplit++;
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
current = currentReader.nextKeyValue();
}
return current;
} catch (IOException | InterruptedException e) {
throw new RuntimeException("Failed to read the next key vale", e);
}
}
return false;
}
/**
* @param jobConf Job configuration.
* @param taskCtx Task context.
* @param directWrite Direct write flag.
* @param fileName File name.
* @throws IOException In case of IO exception.
*/
HadoopV1OutputCollector(JobConf jobConf, HadoopTaskContext taskCtx, boolean directWrite,
@Nullable String fileName, TaskAttemptID attempt) throws IOException {
this.jobConf = jobConf;
this.taskCtx = taskCtx;
this.attempt = attempt;
if (directWrite) {
jobConf.set("mapreduce.task.attempt.id", attempt.toString());
OutputFormat outFormat = jobConf.getOutputFormat();
writer = outFormat.getRecordWriter(null, jobConf, fileName, Reporter.NULL);
}
else
writer = null;
}
public static TaskID getTaskID(Configuration cfg) {
// first try with the attempt since some Hadoop versions mix the two
String taskAttemptId = HadoopCfgUtils.getTaskAttemptId(cfg);
if (StringUtils.hasText(taskAttemptId)) {
try {
return TaskAttemptID.forName(taskAttemptId).getTaskID();
} catch (IllegalArgumentException ex) {
// the task attempt is invalid (Tez in particular uses the wrong string - see #346)
// try to fallback to task id
return parseTaskIdFromTaskAttemptId(taskAttemptId);
}
}
String taskIdProp = HadoopCfgUtils.getTaskId(cfg);
// double-check task id bug in Hadoop 2.5.x
if (StringUtils.hasText(taskIdProp) && !taskIdProp.contains("attempt")) {
return TaskID.forName(taskIdProp);
}
return null;
}
public void commitTask(JobConf conf, TaskAttemptID taskAttemptID)
throws IOException {
Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID);
if (taskOutputPath != null) {
FileSystem fs = taskOutputPath.getFileSystem(conf);
if (fs.exists(taskOutputPath)) {
Path jobOutputPath = taskOutputPath.getParent().getParent();
// Move the task outputs to their final place
moveTaskOutputs(conf,taskAttemptID, fs, jobOutputPath, taskOutputPath);
// Delete the temporary task-specific output directory
if (!fs.delete(taskOutputPath, true)) {
LOG.info("Failed to delete the temporary output" +
" directory of task: " + taskAttemptID + " - " + taskOutputPath);
}
LOG.info("Saved output of task '" + taskAttemptID + "' to " +
jobOutputPath);
}
}
}
public boolean needsTaskCommit(JobConf conf, TaskAttemptID taskAttemptID)
throws IOException {
try {
Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID);
if (taskOutputPath != null) {
// Get the file-system for the task output directory
FileSystem fs = taskOutputPath.getFileSystem(conf);
// since task output path is created on demand,
// if it exists, task needs a commit
if (fs.exists(taskOutputPath)) {
return true;
}
}
} catch (IOException ioe) {
throw ioe;
}
return false;
}
public Path getTempTaskOutputPath(JobConf conf, TaskAttemptID taskAttemptID) {
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path p = new Path(outputPath,
(FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
"_" + taskAttemptID.toString()));
try {
FileSystem fs = p.getFileSystem(conf);
return p.makeQualified(fs);
} catch (IOException ie) {
LOG.warn(StringUtils.stringifyException(ie));
return p;
}
}
return null;
}
/**
* create the temporary output file for hadoop RecordWriter.
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws java.io.IOException
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
}
/**
* create the temporary output file for hadoop RecordWriter.
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws java.io.IOException
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
}
@Override
public void prepare(TSetContext ctx) {
this.context = ctx;
Configuration hadoopConf = this.wrappedConfiguration.getConfiguration();
jconf = new JobConf(hadoopConf);
try {
format = inputClazz.newInstance();
JobContext jobContext = new JobContextImpl(hadoopConf, new JobID(context.getId(),
context.getIndex()));
List<InputSplit> splits = format.getSplits(jobContext);
for (int i = 0; i < splits.size(); i++) {
if (i % context.getParallelism() == context.getIndex()) {
assignedSplits.add(splits.get(i));
}
}
if (assignedSplits.size() > 0) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
}
} catch (InstantiationException | IllegalAccessException
| InterruptedException | IOException e) {
throw new RuntimeException("Failed to initialize hadoop input", e);
}
}
@Override
public void prepare(TSetContext ctx) {
this.context = ctx;
Configuration hadoopConf = this.wrappedConfiguration.getConfiguration();
jconf = new JobConf(hadoopConf);
try {
format = inputClazz.newInstance();
JobContext jobContext = new JobContextImpl(hadoopConf, new JobID(context.getId(),
context.getIndex()));
List<InputSplit> splits = format.getSplits(jobContext);
for (int i = 0; i < splits.size(); i++) {
if (i % context.getParallelism() == context.getIndex()) {
assignedSplits.add(splits.get(i));
}
}
if (assignedSplits.size() > 0) {
TaskID taskID = new TaskID(context.getId(), context.getIndex(),
TaskType.MAP, context.getIndex());
TaskAttemptID taskAttemptID = new TaskAttemptID(taskID, context.getIndex());
TaskAttemptContextImpl taskAttemptContext =
new TaskAttemptContextImpl(jconf, taskAttemptID);
currentReader = format.createRecordReader(assignedSplits.get(consumingSplit),
taskAttemptContext);
currentReader.initialize(assignedSplits.get(consumingSplit), taskAttemptContext);
}
} catch (InstantiationException | IllegalAccessException
| InterruptedException | IOException e) {
throw new RuntimeException("Failed to initialize hadoop input", e);
}
}
/**
* clean previous std error and outs
*/
private void initStdOut(JobConf configuration) {
TaskAttemptID taskId = TaskAttemptID.forName(configuration
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
File stdErr = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDERR);
// prepare folder
if (!stdOut.getParentFile().exists()) {
stdOut.getParentFile().mkdirs();
} else { // clean logs
stdOut.deleteOnExit();
stdErr.deleteOnExit();
}
}
private String readStdOut(JobConf conf) throws Exception {
TaskAttemptID taskId = TaskAttemptID.forName(conf
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
return readFile(stdOut);
}
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
0, 0);
ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
reduceId, null, progress, null, null, null);
JobID jobId = new JobID();
TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
scheduler.tipFailed(taskId1);
Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
0.0f);
Assert.assertFalse(scheduler.waitUntilDone(1));
TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
scheduler.tipFailed(taskId0);
Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
0.0f);
Assert.assertTrue(scheduler.waitUntilDone(1));
}
private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
int startIdx, int numEvents) {
ArrayList<TaskCompletionEvent> tceList =
new ArrayList<TaskCompletionEvent>(numEvents);
for (int i = 0; i < numEvents; ++i) {
int eventIdx = startIdx + i;
TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
tceList.add(tce);
}
TaskCompletionEvent[] events = {};
return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false);
}
void validateTaskStderr(StreamJob job, TaskType type)
throws IOException {
TaskAttemptID attemptId =
new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0);
String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
attemptId, false);
// trim() is called on expectedStderr here because the method
// MapReduceTestUtil.readTaskLog() returns trimmed String.
assertTrue(log.equals(expectedStderr.trim()));
}
/**
* clean previous std error and outs
*/
private void initStdOut(JobConf configuration) {
TaskAttemptID taskId = TaskAttemptID.forName(configuration
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
File stdErr = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDERR);
// prepare folder
if (!stdOut.getParentFile().exists()) {
stdOut.getParentFile().mkdirs();
} else { // clean logs
stdOut.deleteOnExit();
stdErr.deleteOnExit();
}
}
private String readStdOut(JobConf conf) throws Exception {
TaskAttemptID taskId = TaskAttemptID.forName(conf
.get(MRJobConfig.TASK_ATTEMPT_ID));
File stdOut = TaskLog.getTaskLogFile(taskId, false, TaskLog.LogName.STDOUT);
return readFile(stdOut);
}
@SuppressWarnings("rawtypes")
@Test
public void testTipFailed() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
0, 0);
ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
reduceId, null, progress, null, null, null);
JobID jobId = new JobID();
TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);
scheduler.tipFailed(taskId1);
Assert.assertEquals("Progress should be 0.5", 0.5f, progress.getProgress(),
0.0f);
Assert.assertFalse(scheduler.waitUntilDone(1));
TaskID taskId0 = new TaskID(jobId, TaskType.REDUCE, 0);
scheduler.tipFailed(taskId0);
Assert.assertEquals("Progress should be 1.0", 1.0f, progress.getProgress(),
0.0f);
Assert.assertTrue(scheduler.waitUntilDone(1));
}
private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
int startIdx, int numEvents) {
ArrayList<TaskCompletionEvent> tceList =
new ArrayList<TaskCompletionEvent>(numEvents);
for (int i = 0; i < numEvents; ++i) {
int eventIdx = startIdx + i;
TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
eventIdx, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
tceList.add(tce);
}
TaskCompletionEvent[] events = {};
return new MapTaskCompletionEventsUpdate(tceList.toArray(events), false);
}
void validateTaskStderr(StreamJob job, TaskType type)
throws IOException {
TaskAttemptID attemptId =
new TaskAttemptID(new TaskID(job.jobId_, type, 0), 0);
String log = MapReduceTestUtil.readTaskLog(TaskLog.LogName.STDERR,
attemptId, false);
// trim() is called on expectedStderr here because the method
// MapReduceTestUtil.readTaskLog() returns trimmed String.
assertTrue(log.equals(expectedStderr.trim()));
}
@BeforeClass
public void setUp() throws Exception {
m_workdir = new Path(
System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
m_conf = new JobConf();
m_rand = Utils.createRandom();
unsafe = Utils.getUnsafe();
try {
m_fs = FileSystem.getLocal(m_conf).getRaw();
m_fs.delete(m_workdir, true);
m_fs.mkdirs(m_workdir);
} catch (IOException e) {
throw new IllegalStateException("bad fs init", e);
}
m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
MneConfigHelper.setBaseOutputName(m_conf, null, "chunk-data");
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[]{DurableType.CHUNK});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[]{});
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setMemPoolSize(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[]{DurableType.CHUNK});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[]{});
}
@BeforeClass
public void setUp() throws IOException {
m_workdir = new Path(
System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
m_conf = new JobConf();
m_rand = Utils.createRandom();
try {
m_fs = FileSystem.getLocal(m_conf).getRaw();
m_fs.delete(m_workdir, true);
m_fs.mkdirs(m_workdir);
} catch (IOException e) {
throw new IllegalStateException("bad fs init", e);
}
m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
MneConfigHelper.setBaseOutputName(m_conf, null, "long-data");
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[]{DurableType.LONG});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[]{});
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setMemPoolSize(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 2);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[]{DurableType.LONG});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[]{});
}
@BeforeClass
public void setUp() throws IOException {
m_workdir = new Path(
System.getProperty("test.tmp.dir", DEFAULT_WORK_DIR));
m_conf = new JobConf();
m_rand = Utils.createRandom();
try {
m_fs = FileSystem.getLocal(m_conf).getRaw();
m_fs.delete(m_workdir, true);
m_fs.mkdirs(m_workdir);
} catch (IOException e) {
throw new IllegalStateException("bad fs init", e);
}
m_taid = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
m_tacontext = new TaskAttemptContextImpl(m_conf, m_taid);
MneConfigHelper.setDir(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, m_workdir.toString());
MneConfigHelper.setBaseOutputName(m_conf, null, "person-data");
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new DurableType[]{DurableType.DURABLE});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_INPUT_CONFIG_PREFIX, new Class<?>[]{PersonListEFProxy.class});
MneConfigHelper.setMemServiceName(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SERVICE_NAME);
MneConfigHelper.setSlotKeyId(m_conf, MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, SLOT_KEY_ID);
MneConfigHelper.setMemPoolSize(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, 1024L * 1024 * 1024 * 4);
MneConfigHelper.setDurableTypes(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new DurableType[]{DurableType.DURABLE});
MneConfigHelper.setEntityFactoryProxies(m_conf,
MneConfigHelper.DEFAULT_OUTPUT_CONFIG_PREFIX, new Class<?>[]{PersonListEFProxy.class});
}
/**
* create the temporary output file for hadoop RecordWriter.
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws java.io.IOException
*/
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// enforce sequential open() calls
synchronized (OPEN_MUTEX) {
if (Integer.toString(taskNumber + 1).length() > 6) {
throw new IOException("Task id too large.");
}
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_"
+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s", " ").replace(" ", "0")
+ Integer.toString(taskNumber + 1)
+ "_0");
this.jobConf.set("mapred.task.id", taskAttemptID.toString());
this.jobConf.setInt("mapred.task.partition", taskNumber + 1);
// for hadoop 2.2
this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1);
this.context = new TaskAttemptContextImpl(this.jobConf, taskAttemptID);
this.outputCommitter = this.jobConf.getOutputCommitter();
JobContext jobContext = new JobContextImpl(this.jobConf, new JobID());
this.outputCommitter.setupJob(jobContext);
this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
}
}
@Before
public void setup() throws IOException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
test_vcf_output = File.createTempFile("test_vcf_output", "");
test_vcf_output.delete();
writable = new VariantContextWritable();
Configuration conf = new Configuration();
conf.set("hadoopbam.vcf.output-format", "VCF");
KeyIgnoringVCFOutputFormat<Long> outputFormat = new KeyIgnoringVCFOutputFormat<Long>(conf);
outputFormat.setHeader(readHeader());
taskAttemptContext = new TaskAttemptContextImpl(conf, mock(TaskAttemptID.class));
writer = outputFormat.getRecordWriter(taskAttemptContext, new Path("file://" + test_vcf_output));
}
/**
* Retrieve the diagnostic messages for a given task attempt.
*
* @param taskAttemptId Identifier of the task
* @return an array of diagnostic messages for the task attempt with the id provided.
* @throws java.io.IOException
*/
@Override public String[] getTaskDiagnostics( Object taskAttemptId ) throws IOException {
TaskAttemptID id = (TaskAttemptID) taskAttemptId;
try {
return delegateJob.getTaskDiagnostics( id );
} catch ( InterruptedException e ) {
throw new RuntimeException( e );
}
}
@Test
public void getTaskAttemptId() {
final TaskAttemptID id = new TaskAttemptID( new TaskID(), 0 );
org.apache.hadoop.mapred.TaskCompletionEvent delegate = new org.apache.hadoop.mapred.TaskCompletionEvent() {
public org.apache.hadoop.mapred.TaskAttemptID getTaskAttemptId() {
return id;
}
};
TaskCompletionEventProxy proxy = new TaskCompletionEventProxy( delegate );
assertEquals( id, proxy.getTaskAttemptId() );
}