下面列出了怎么用org.apache.hadoop.mapred.FileOutputCommitter的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Clean up after successful HIVE import.
*
* @param outputPath path to the output directory
* @throws IOException
*/
private void cleanUp(Path outputPath) throws IOException {
FileSystem fs = FileSystem.get(configuration);
// HIVE is not always removing input directory after LOAD DATA statement
// (which is our export directory). We're removing export directory in case
// that is blank for case that user wants to periodically populate HIVE
// table (for example with --hive-overwrite).
try {
if (outputPath != null && fs.exists(outputPath)) {
FileStatus[] statuses = fs.listStatus(outputPath);
if (statuses.length == 0) {
LOG.info("Export directory is empty, removing it.");
fs.delete(outputPath, true);
} else if (statuses.length == 1 && statuses[0].getPath().getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
LOG.info("Export directory is contains the _SUCCESS file only, removing the directory.");
fs.delete(outputPath, true);
} else {
LOG.info("Export directory is not empty, keeping it.");
}
}
} catch(IOException e) {
LOG.error("Issue with cleaning (safe to ignore)", e);
}
}
public void localizeConfiguration(JobConf jobConf)
throws IOException, InterruptedException {
jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
jobConf.setInt(JobContext.TASK_PARTITION,
taskAttemptId.getTaskID().getId());
jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
jobConf.setBoolean(MRJobConfig.TASK_ISMAP, isMap);
Path outputPath = FileOutputFormat.getOutputPath(jobConf);
if (outputPath != null) {
if ((committer instanceof FileOutputCommitter)) {
FileOutputFormat.setWorkOutputPath(jobConf,
((FileOutputCommitter)committer).getTaskAttemptPath(taskAttemptContext));
} else {
FileOutputFormat.setWorkOutputPath(jobConf, outputPath);
}
}
}
public Path getTempTaskOutputPath(JobConf conf, TaskAttemptID taskAttemptID) {
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path p = new Path(outputPath,
(FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
"_" + taskAttemptID.toString()));
try {
FileSystem fs = p.getFileSystem(conf);
return p.makeQualified(fs);
} catch (IOException ie) {
LOG.warn(StringUtils.stringifyException(ie));
return p;
}
}
return null;
}
public void localizeConfiguration(JobConf jobConf)
throws IOException, InterruptedException {
jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
jobConf.setInt(JobContext.TASK_PARTITION,
taskAttemptId.getTaskID().getId());
jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
jobConf.setBoolean(MRJobConfig.TASK_ISMAP, isMap);
Path outputPath = FileOutputFormat.getOutputPath(jobConf);
if (outputPath != null) {
if ((committer instanceof FileOutputCommitter)) {
FileOutputFormat.setWorkOutputPath(jobConf,
((FileOutputCommitter)committer).getTaskAttemptPath(taskAttemptContext));
} else {
FileOutputFormat.setWorkOutputPath(jobConf, outputPath);
}
}
}
public void initCommitter(JobConf job, boolean useNewApi)
throws IOException, InterruptedException {
if (useNewApi) {
if (LOG.isDebugEnabled()) {
LOG.debug("using new api for output committer");
}
this.committer = newOutputFormat.getOutputCommitter(
newApiTaskAttemptContext);
} else {
this.committer = job.getOutputCommitter();
}
Path outputPath = FileOutputFormat.getOutputPath(job);
if (outputPath != null) {
if ((this.committer instanceof FileOutputCommitter)) {
FileOutputFormat.setWorkOutputPath(job,
((FileOutputCommitter) this.committer).getTaskAttemptPath(
oldApiTaskAttemptContext));
} else {
FileOutputFormat.setWorkOutputPath(job, outputPath);
}
}
if (useNewApi) {
this.committer.setupTask(newApiTaskAttemptContext);
} else {
this.committer.setupTask(oldApiTaskAttemptContext);
}
}
public void setupJob(JobConf conf) throws IOException {
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tmpDir.getFileSystem(conf);
if (!fileSys.mkdirs(tmpDir)) {
LOG.error("Mkdirs failed to create " + tmpDir.toString());
}
}
}
public void cleanupJob(JobConf conf) throws IOException {
// do the clean up of temporary directory
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tmpDir.getFileSystem(conf);
if (fileSys.exists(tmpDir)) {
fileSys.delete(tmpDir, true);
}
} else {
LOG.warn("Output path is null in cleanup");
}
}
public void initCommitter(JobConf job, boolean useNewApi)
throws IOException, InterruptedException {
if (useNewApi) {
if (LOG.isDebugEnabled()) {
LOG.debug("using new api for output committer");
}
this.committer = newOutputFormat.getOutputCommitter(
newApiTaskAttemptContext);
} else {
this.committer = job.getOutputCommitter();
}
Path outputPath = FileOutputFormat.getOutputPath(job);
if (outputPath != null) {
if ((this.committer instanceof FileOutputCommitter)) {
FileOutputFormat.setWorkOutputPath(job,
((FileOutputCommitter) this.committer).getTaskAttemptPath(
oldApiTaskAttemptContext));
} else {
FileOutputFormat.setWorkOutputPath(job, outputPath);
}
}
if (useNewApi) {
this.committer.setupTask(newApiTaskAttemptContext);
} else {
this.committer.setupTask(oldApiTaskAttemptContext);
}
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_HTABLE_NAME);
options.addOption(OPTION_CUBE_NAME);
parseOptions(options, args);
String tableName = getOptionValue(OPTION_HTABLE_NAME);
// e.g
// /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
// end with "/"
String input = getOptionValue(OPTION_INPUT_PATH);
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
FsShell shell = new FsShell(conf);
int exitCode = -1;
int retryCount = 10;
while (exitCode != 0 && retryCount >= 1) {
exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
retryCount--;
Thread.sleep(5000);
}
if (exitCode != 0) {
logger.error("Failed to change the file permissions: " + input);
throw new IOException("Failed to change the file permissions: " + input);
}
String[] newArgs = new String[2];
newArgs[0] = input;
newArgs[1] = tableName;
int count = 0;
Path inputPath = new Path(input);
FileSystem fs = HadoopUtil.getFileSystem(inputPath);
FileStatus[] fileStatuses = fs.listStatus(inputPath);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
Path path = fileStatus.getPath();
if (path.getName().equals(FileOutputCommitter.TEMP_DIR_NAME)) {
logger.info("Delete temporary path: " + path);
fs.delete(path, true);
} else {
count++;
}
}
}
int ret = 0;
if (count > 0) {
logger.debug("Start to run LoadIncrementalHFiles");
ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
logger.debug("End to run LoadIncrementalHFiles");
return ret;
} else {
logger.debug("Nothing to load, cube is empty");
return ret;
}
}
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_HTABLE_NAME);
options.addOption(OPTION_CUBE_NAME);
parseOptions(options, args);
String tableName = getOptionValue(OPTION_HTABLE_NAME);
// e.g
// /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
// end with "/"
String input = getOptionValue(OPTION_INPUT_PATH);
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
FsShell shell = new FsShell(conf);
int exitCode = -1;
int retryCount = 10;
while (exitCode != 0 && retryCount >= 1) {
exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
retryCount--;
Thread.sleep(5000);
}
if (exitCode != 0) {
logger.error("Failed to change the file permissions: " + input);
throw new IOException("Failed to change the file permissions: " + input);
}
String[] newArgs = new String[2];
newArgs[0] = input;
newArgs[1] = tableName;
int count = 0;
Path inputPath = new Path(input);
FileSystem fs = HadoopUtil.getFileSystem(inputPath);
FileStatus[] fileStatuses = fs.listStatus(inputPath);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
Path path = fileStatus.getPath();
if (path.getName().equals(FileOutputCommitter.TEMP_DIR_NAME)) {
logger.info("Delete temporary path: " + path);
fs.delete(path, true);
} else {
count++;
}
}
}
int ret = 0;
if (count > 0) {
logger.debug("Start to run LoadIncrementalHFiles");
ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
logger.debug("End to run LoadIncrementalHFiles");
return ret;
} else {
logger.debug("Nothing to load, cube is empty");
return ret;
}
}
@SuppressWarnings("rawtypes")
private org.apache.hadoop.mapreduce.OutputCommitter
getOutputCommitter(OutputCommitterContext context) {
org.apache.hadoop.mapreduce.OutputCommitter committer = null;
newApiCommitter = false;
if (jobConf.getBoolean("mapred.reducer.new-api", false)
|| jobConf.getBoolean("mapred.mapper.new-api", false)) {
newApiCommitter = true;
LOG.info("Using mapred newApiCommitter.");
}
LOG.info("OutputCommitter set in config for outputName="
+ context.getOutputName()
+ ", vertexName=" + context.getVertexName()
+ ", outputCommitterClass="
+ jobConf.get("mapred.output.committer.class"));
if (newApiCommitter) {
TaskAttemptID taskAttemptID = new TaskAttemptID(
Long.toString(context.getApplicationId().getClusterTimestamp()),
context.getApplicationId().getId(),
((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
TaskType.MAP : TaskType.REDUCE)),
0, context.getDAGAttemptNumber());
TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
taskAttemptID);
try {
OutputFormat outputFormat = ReflectionUtils.newInstance(taskContext
.getOutputFormatClass(), jobConf);
committer = outputFormat.getOutputCommitter(taskContext);
} catch (Exception e) {
throw new TezUncheckedException(e);
}
} else {
committer = ReflectionUtils.newInstance(jobConf.getClass(
"mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), jobConf);
}
LOG.info("OutputCommitter for outputName="
+ context.getOutputName()
+ ", vertexName=" + context.getVertexName()
+ ", outputCommitterClass="
+ committer.getClass().getName());
return committer;
}
@SuppressWarnings("rawtypes")
private org.apache.hadoop.mapreduce.OutputCommitter
getOutputCommitter(OutputCommitterContext context) {
org.apache.hadoop.mapreduce.OutputCommitter committer = null;
newApiCommitter = false;
if (jobConf.getBoolean("mapred.reducer.new-api", false)
|| jobConf.getBoolean("mapred.mapper.new-api", false)) {
newApiCommitter = true;
}
LOG.info("Committer for " + getContext().getVertexName() + ":" + getContext().getOutputName() +
" using " + (newApiCommitter ? "new" : "old") + "mapred API");
if (newApiCommitter) {
TaskAttemptID taskAttemptID = new TaskAttemptID(
Long.toString(context.getApplicationId().getClusterTimestamp()),
context.getApplicationId().getId(),
((jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false) ?
TaskType.MAP : TaskType.REDUCE)),
0, context.getDAGAttemptNumber());
TaskAttemptContext taskContext = new TaskAttemptContextImpl(jobConf,
taskAttemptID);
try {
OutputFormat outputFormat = ReflectionUtils.newInstance(taskContext
.getOutputFormatClass(), jobConf);
committer = outputFormat.getOutputCommitter(taskContext);
} catch (Exception e) {
throw new TezUncheckedException(e);
}
} else {
committer = ReflectionUtils.newInstance(jobConf.getClass(
"mapred.output.committer.class", FileOutputCommitter.class,
org.apache.hadoop.mapred.OutputCommitter.class), jobConf);
}
LOG.info("OutputCommitter for outputName="
+ context.getOutputName()
+ ", vertexName=" + context.getVertexName()
+ ", outputCommitterClass="
+ committer.getClass().getName());
return committer;
}