下面列出了怎么用org.apache.hadoop.mapreduce.lib.output.MultipleOutputs的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void doSetup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
mos = new MultipleOutputs(context);
String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeSegment cubeSegment = cube.getSegmentById(segmentID);
CubeSegment oldSegment = cube.getOriginalSegmentToOptimize(cubeSegment);
this.enableSharding = oldSegment.isEnableSharding();
this.baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
}
@Override
public void setup(Context ctx) throws IOException, InterruptedException
{
super.setup(ctx);
mos = new MultipleOutputs<>(ctx);
FileSystem fs = FileSystem.newInstance(ctx.getConfiguration());
storage = new HadoopFileSystemStore(fs);
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
Query query = storage.recall(queryDir, Query.class);
QueryInfo queryInfo = query.getQueryInfo();
outputFile = ctx.getConfiguration().get("pirMR.outputFile");
response = new Response(queryInfo);
}
/**
* Writes single WARCWritable to the output with specific output file prefix
*
* @param warcWritable warc record
* @param multipleOutputs output
* @throws IOException exception
* @throws InterruptedException exception
*/
// TODO move somewhere else?
public static void writeSingleWARCWritableToOutput(WARCWritable warcWritable,
MultipleOutputs<NullWritable, WARCWritable> multipleOutputs)
throws IOException, InterruptedException
{
WARCRecord.Header header = warcWritable.getRecord().getHeader();
String license = header.getField(WARCRecord.WARCRecordFieldConstants.LICENSE);
String language = header.getField(WARCRecord.WARCRecordFieldConstants.LANGUAGE);
String noBoilerplate = header
.getField(WARCRecord.WARCRecordFieldConstants.NO_BOILERPLATE);
String minimalHtml = header.getField(WARCRecord.WARCRecordFieldConstants.MINIMAL_HTML);
// set the file name prefix
String fileName = createOutputFilePrefix(license, language, noBoilerplate, minimalHtml);
// bottleneck of single reducer for all "Lic_none_Lang_en" pages (majority of Web)
// if ("en".equals(language) && LicenseDetector.NO_LICENCE.equals(license)) {
// long simHash = Long
// .valueOf(header.getField(WARCRecord.WARCRecordFieldConstants.SIMHASH));
// int binNumber = getBinNumberFromSimHash(simHash);
// fileName = createOutputFilePrefix(license, language, noBoilerplate);
// }
multipleOutputs.write(NullWritable.get(), warcWritable, fileName);
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
synchronized (OPEN_MULTIPLE_MUTEX) {
try {
TaskInputOutputContext taskInputOutputContext = new ReduceContextImpl(configuration,
context.getTaskAttemptID(), new InputIterator(), new GenericCounter(), new GenericCounter(),
recordWriter, outputCommitter, new DummyReporter(), null,
BytesWritable.class, BytesWritable.class);
this.writer = new MultipleOutputs(taskInputOutputContext);
} catch (InterruptedException e) {
throw new IOException("Could not create MultipleOutputs.", e);
}
}
}
@Override
protected void doSetup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
mos = new MultipleOutputs(context);
String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeSegment cubeSegment = cube.getSegmentById(segmentID);
CubeSegment oldSegment = cube.getOriginalSegmentToOptimize(cubeSegment);
this.enableSharding = oldSegment.isEnableSharding();
this.baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
}
/**
* Set up a MapReduce job to output human-readable text.
*/
protected void configureTextOutput(String destination) {
Path outPath;
outPath = MRReasoningUtils.getOutputPath(job.getConfiguration(), destination);
TextOutputFormat.setOutputPath(job, outPath);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(job, true);
}
private void setupReducer(Path output, CubeSegment cubeSeg)
throws IOException {
FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance());
int numberOfReducers = reducerMapping.getTotalReducerNum();
logger.info("{} has reducers {}.", this.getClass().getName(), numberOfReducers);
if (numberOfReducers > 250) {
throw new IllegalArgumentException(
"The max reducer number for FactDistinctColumnsJob is 250, but now it is "
+ numberOfReducers
+ ", decrease 'kylin.engine.mr.uhc-reducer-count'");
}
job.setReducerClass(FactDistinctColumnsReducer.class);
job.setPartitionerClass(FactDistinctColumnPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
// make each reducer output to respective dir
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
// prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
deletePath(job.getConfiguration(), output);
}
private void setupReducer(Path output, int numberOfReducers) throws IOException {
job.setReducerClass(UHCDictionaryReducer.class);
job.setPartitionerClass(UHCDictionaryPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
//prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
deletePath(job.getConfiguration(), output);
}
@Override
protected void doSetup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
Configuration conf = context.getConfiguration();
mos = new MultipleOutputs(context);
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeDesc cubeDesc = cube.getDescriptor();
List<TblColRef> uhcColumns = cubeDesc.getAllUHCColumns();
int taskId = context.getTaskAttemptID().getTaskID().getId();
col = uhcColumns.get(taskId);
logger.info("column name: " + col.getIdentity());
if (cube.getDescriptor().getShardByColumns().contains(col)) {
//for ShardByColumns
builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
builder.init(null, 0, null);
} else {
//for GlobalDictionaryColumns
String hdfsDir = conf.get(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR);
DictionaryInfo dictionaryInfo = new DictionaryInfo(col.getColumnDesc(), col.getDatatype());
String builderClass = cubeDesc.getDictionaryBuilderClass(col);
builder = (IDictionaryBuilder) ClassUtil.newInstance(builderClass);
builder.init(dictionaryInfo, 0, hdfsDir);
}
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
inputSplit = (MainframeDatasetInputSplit)context.getInputSplit();
mos = new MultipleOutputs<Text, NullWritable>(context);
numberOfRecords = 0;
outkey = new Text();
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
gson = new Gson();
text = new Text();
multipleOutputs = new MultipleOutputs<LongWritable, Text>(context);
}
private boolean analyze(final String inputFilePath,
final String outputFilePath,
final Long startTime) throws Exception {
Configuration conf = new Configuration();
conf.setLong(Holistic.START_TIME, startTime);
conf.setLong(Holistic.EXECUTE_TIME, executeHourTime);
Job jobAnalyze = Job.getInstance(conf, "analyze");
jobAnalyze.setJarByClass(Holistic.class);
MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.NEW_OLD_CUSTOMER,
TextOutputFormat.class, KeyWrapper.class, Text.class);
MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.CUSTOMER_FLOW_KEY,
TextOutputFormat.class, KeyWrapper.class, Text.class);
MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.CYCLE,
TextOutputFormat.class, KeyWrapper.class, Text.class);
MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.IN_STORE_HOUR,
TextOutputFormat.class, KeyWrapper.class, Text.class);
jobAnalyze.setMapperClass(AnalysisMapper.class);
jobAnalyze.setReducerClass(AnalysisReducer.class);
jobAnalyze.setCombinerClass(AnalysisCombiner.class);
jobAnalyze.setOutputKeyClass(LongWritable.class);
jobAnalyze.setOutputValueClass(Text.class);
jobAnalyze.setMapOutputKeyClass(KeyWrapper.class);
jobAnalyze.setMapOutputValueClass(ValueWrapper.class);
FileInputFormat.addInputPath(jobAnalyze, new Path(inputFilePath));
FileOutputFormat.setOutputPath(jobAnalyze, new Path(outputFilePath));
return jobAnalyze.waitForCompletion(true) ;
}
/**
* Creates and initializes multiple outputs support,
* it should be instantiated in the Mapper/Reducer setup method.
*
* @param context the TaskInputOutputContext object
*/
public MultipleOutputs(
TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
this.context = context;
namedOutputs = Collections.unmodifiableSet(
new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
recordWriters = new HashMap<String, RecordWriter<?, ?>>();
countersEnabled = getCountersEnabled(context);
}
@Override
public void setup(Context ctx) throws IOException, InterruptedException
{
super.setup(ctx);
mos = new MultipleOutputs<>(ctx);
reducerID = String.format("%05d", ctx.getTaskAttemptID().getTaskID().getId());
logger.info("reducerID = " + reducerID);
}
@Override
public void setup(Context ctx) throws IOException, InterruptedException
{
super.setup(ctx);
outputValue = new Text();
mos = new MultipleOutputs<>(ctx);
FileSystem fs = FileSystem.newInstance(ctx.getConfiguration());
String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class);
}
/**
* Creates and initializes multiple outputs support,
* it should be instantiated in the Mapper/Reducer setup method.
*
* @param context the TaskInputOutputContext object
*/
public MultipleOutputs(
TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
this.context = context;
namedOutputs = Collections.unmodifiableSet(
new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
recordWriters = new HashMap<String, RecordWriter<?, ?>>();
countersEnabled = getCountersEnabled(context);
}
@Override
public void setup(Context context)
throws IOException, InterruptedException {
String[] datasetNames = context.getConfiguration().get("dataset-name","").split(",");
String[] datasetIds = context.getConfiguration().get("dataset-id","").split(",");
for (int i = 0; i < datasetNames.length; i++)
idToDataset.put(Integer.parseInt(datasetIds[i]), datasetNames[i]);
out = new MultipleOutputs<SpatioTemporalWritable,FloatArrayWritable>(context);
//out = new MultipleOutputs<Text,Text>(context);
}
@Override
public void setup(Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String[] fileSplitTokens = fileSplit.getPath().getParent().toString().split("/");
dataset = fileSplitTokens[fileSplitTokens.length-1];
out = new MultipleOutputs<Text,Text>(context);
}
@Override
public void setup(Context context)
throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String[] fileSplitTokens = fileSplit.getPath().getParent().toString().split("/");
dataset = fileSplitTokens[fileSplitTokens.length-1];
out = new MultipleOutputs<Text,Text>(context);
}
@Override
protected void doSetup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs(context);
KylinConfig config;
try {
config = AbstractHadoopJob.loadKylinPropsAndMetadata();
} catch (IOException e) {
throw new RuntimeException(e);
}
dicCols = config.getMrHiveDictColumnsExcludeRefColumns();
}
private void setOutput(Job job, String[] dicColsArr, String outputBase) {
// make each reducer output to respective dir
///user/prod_kylin/tmp/kylin2/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/bs_order_scene_day_new_cube_clone/dict_column=DM_ES_REPORT_ORDER_VIEW0420_DRIVER_ID/part_sort
for (int i = 0; i < dicColsArr.length; i++) {
MultipleOutputs.addNamedOutput(job, i + "", TextOutputFormat.class, Text.class, LongWritable.class);
}
Path outputPath = new Path(outputBase);
FileOutputFormat.setOutputPath(job, outputPath);
}
private void setOutput(Job job, String[] dicColsArr, String outputBase) {
// make each reducer output to respective dir
// eg: /user/kylin/tmp/kylin/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/kylin_sales_cube_mr/dict_column=KYLIN_SALES_SELLER_ID/part_sort
for (int i = 0; i < dicColsArr.length; i++) {
MultipleOutputs.addNamedOutput(job, i + "", TextOutputFormat.class, LongWritable.class, Text.class);
}
Path outputPath = new Path(outputBase);
FileOutputFormat.setOutputPath(job, outputPath);
}
private void setupReducer(Path output, CubeSegment cubeSeg)
throws IOException {
FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance());
int numberOfReducers = reducerMapping.getTotalReducerNum();
logger.info("{} has reducers {}.", this.getClass().getName(), numberOfReducers);
if (numberOfReducers > 250) {
throw new IllegalArgumentException(
"The max reducer number for FactDistinctColumnsJob is 250, but now it is "
+ numberOfReducers
+ ", decrease 'kylin.engine.mr.uhc-reducer-count'");
}
job.setReducerClass(FactDistinctColumnsReducer.class);
job.setPartitionerClass(FactDistinctColumnPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
// make each reducer output to respective dir
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
// prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
deletePath(job.getConfiguration(), output);
}
private void setupReducer(Path output, int numberOfReducers) throws IOException {
job.setReducerClass(UHCDictionaryReducer.class);
job.setPartitionerClass(UHCDictionaryPartitioner.class);
job.setNumReduceTasks(numberOfReducers);
MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
FileOutputFormat.setOutputPath(job, output);
job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
//prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
deletePath(job.getConfiguration(), output);
}
@Override
protected void doSetup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
Configuration conf = context.getConfiguration();
mos = new MultipleOutputs(context);
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeDesc cubeDesc = cube.getDescriptor();
List<TblColRef> uhcColumns = cubeDesc.getAllUHCColumns();
int taskId = context.getTaskAttemptID().getTaskID().getId();
col = uhcColumns.get(taskId);
logger.info("column name: " + col.getIdentity());
if (cube.getDescriptor().getShardByColumns().contains(col)) {
//for ShardByColumns
builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
builder.init(null, 0, null);
} else {
//for GlobalDictionaryColumns
String hdfsDir = conf.get(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR);
DictionaryInfo dictionaryInfo = new DictionaryInfo(col.getColumnDesc(), col.getDatatype());
String builderClass = cubeDesc.getDictionaryBuilderClass(col);
builder = (IDictionaryBuilder) ClassUtil.newInstance(builderClass);
builder.init(dictionaryInfo, 0, hdfsDir);
}
}
/**
* Set up the MapReduce job to output a schema (TBox).
*/
protected void configureSchemaOutput() {
Path outPath = MRReasoningUtils.getSchemaPath(job.getConfiguration());
SequenceFileOutputFormat.setOutputPath(job, outPath);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(SchemaWritable.class);
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
MultipleOutputs.addNamedOutput(job, "schemaobj",
SequenceFileOutputFormat.class, NullWritable.class, SchemaWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(job, true);
}
/**
* Set up a MapReduce job to output newly derived triples.
* @param intermediate True if this is intermediate data. Outputs
* to [base]-[iteration]-[temp].
*/
protected void configureDerivationOutput(boolean intermediate) {
Path outPath;
Configuration conf = job.getConfiguration();
int iteration = MRReasoningUtils.getCurrentIteration(conf);
if (intermediate) {
outPath = MRReasoningUtils.getOutputPath(conf,
MRReasoningUtils.OUTPUT_BASE + iteration
+ MRReasoningUtils.TEMP_SUFFIX);
}
else {
outPath = MRReasoningUtils.getOutputPath(conf,
MRReasoningUtils.OUTPUT_BASE + iteration);
}
SequenceFileOutputFormat.setOutputPath(job, outPath);
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
SequenceFileOutputFormat.class, Derivation.class, NullWritable.class);
MultipleOutputs.setCountersEnabled(job, true);
// Set up an output for diagnostic info, if needed
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
}
@Override
protected void setup(Context context) {
debugOut = new MultipleOutputs<>(context);
Configuration conf = context.getConfiguration();
if (schema == null) {
schema = MRReasoningUtils.loadSchema(context.getConfiguration());
}
debug = MRReasoningUtils.debug(conf);
}
@Override
public void setup(Context context) {
mout = new MultipleOutputs<>(context);
Configuration conf = context.getConfiguration();
if (schema == null) {
schema = MRReasoningUtils.loadSchema(conf);
}
debug = MRReasoningUtils.debug(conf);
}
@Override
public void setup(Context context) {
Configuration conf = context.getConfiguration();
debug = MRReasoningUtils.debug(conf);
if (debug) {
debugOut = new MultipleOutputs<>(context);
}
}