类org.apache.hadoop.mapreduce.lib.output.MultipleOutputs源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.lib.output.MultipleOutputs的API类实例代码及写法,或者点击链接到github查看源代码。

@Override
protected void doSetup(Context context) throws IOException {
    super.bindCurrentConfiguration(context.getConfiguration());
    mos = new MultipleOutputs(context);

    String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
    String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);

    KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();

    CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
    CubeSegment cubeSegment = cube.getSegmentById(segmentID);
    CubeSegment oldSegment = cube.getOriginalSegmentToOptimize(cubeSegment);

    this.enableSharding = oldSegment.isEnableSharding();
    this.baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
}
 
@Override
public void setup(Context ctx) throws IOException, InterruptedException
{
  super.setup(ctx);

  mos = new MultipleOutputs<>(ctx);

  FileSystem fs = FileSystem.newInstance(ctx.getConfiguration());
  storage = new HadoopFileSystemStore(fs);
  String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
  Query query = storage.recall(queryDir, Query.class);
  QueryInfo queryInfo = query.getQueryInfo();

  outputFile = ctx.getConfiguration().get("pirMR.outputFile");

  response = new Response(queryInfo);
}
 
源代码3 项目: dkpro-c4corpus   文件: WARCWriterReducerClass.java
/**
 * Writes single WARCWritable to the output with specific output file prefix
 *
 * @param warcWritable    warc record
 * @param multipleOutputs output
 * @throws IOException          exception
 * @throws InterruptedException exception
 */
// TODO move somewhere else?
public static void writeSingleWARCWritableToOutput(WARCWritable warcWritable,
        MultipleOutputs<NullWritable, WARCWritable> multipleOutputs)
        throws IOException, InterruptedException
{
    WARCRecord.Header header = warcWritable.getRecord().getHeader();
    String license = header.getField(WARCRecord.WARCRecordFieldConstants.LICENSE);
    String language = header.getField(WARCRecord.WARCRecordFieldConstants.LANGUAGE);
    String noBoilerplate = header
            .getField(WARCRecord.WARCRecordFieldConstants.NO_BOILERPLATE);
    String minimalHtml = header.getField(WARCRecord.WARCRecordFieldConstants.MINIMAL_HTML);

    // set the file name prefix
    String fileName = createOutputFilePrefix(license, language, noBoilerplate, minimalHtml);

    // bottleneck of single reducer for all "Lic_none_Lang_en" pages (majority of Web)
    //        if ("en".equals(language) && LicenseDetector.NO_LICENCE.equals(license)) {
    //            long simHash = Long
    //                    .valueOf(header.getField(WARCRecord.WARCRecordFieldConstants.SIMHASH));
    //            int binNumber = getBinNumberFromSimHash(simHash);
    //            fileName = createOutputFilePrefix(license, language, noBoilerplate);
    //        }

    multipleOutputs.write(NullWritable.get(), warcWritable, fileName);
}
 
源代码4 项目: kylin   文件: HadoopMultipleOutputFormat.java
@Override
public void open(int taskNumber, int numTasks) throws IOException {
    super.open(taskNumber, numTasks);

    synchronized (OPEN_MULTIPLE_MUTEX) {
        try {
            TaskInputOutputContext taskInputOutputContext = new ReduceContextImpl(configuration,
                    context.getTaskAttemptID(), new InputIterator(), new GenericCounter(), new GenericCounter(),
                    recordWriter, outputCommitter, new DummyReporter(), null,
                    BytesWritable.class, BytesWritable.class);
            this.writer = new MultipleOutputs(taskInputOutputContext);
        } catch (InterruptedException e) {
            throw new IOException("Could not create MultipleOutputs.", e);
        }
    }
}
 
源代码5 项目: kylin   文件: ConvergeCuboidDataReducer.java
@Override
protected void doSetup(Context context) throws IOException {
    super.bindCurrentConfiguration(context.getConfiguration());
    mos = new MultipleOutputs(context);

    String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
    String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);

    KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();

    CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
    CubeSegment cubeSegment = cube.getSegmentById(segmentID);
    CubeSegment oldSegment = cube.getOriginalSegmentToOptimize(cubeSegment);

    this.enableSharding = oldSegment.isEnableSharding();
    this.baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
}
 
源代码6 项目: rya   文件: AbstractReasoningTool.java
/**
 * Set up a MapReduce job to output human-readable text.
 */
protected void configureTextOutput(String destination) {
    Path outPath;
    outPath = MRReasoningUtils.getOutputPath(job.getConfiguration(), destination);
    TextOutputFormat.setOutputPath(job, outPath);
    LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
    MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
        TextOutputFormat.class, NullWritable.class, Text.class);
    MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
        TextOutputFormat.class, NullWritable.class, Text.class);
    MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
        TextOutputFormat.class, NullWritable.class, Text.class);
    MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
        TextOutputFormat.class, NullWritable.class, Text.class);
    MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
        TextOutputFormat.class, Text.class, Text.class);
    MultipleOutputs.setCountersEnabled(job, true);
}
 
private void setupReducer(Path output, CubeSegment cubeSeg)
        throws IOException {
    FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance());
    int numberOfReducers = reducerMapping.getTotalReducerNum();
    logger.info("{} has reducers {}.", this.getClass().getName(), numberOfReducers);
    if (numberOfReducers > 250) {
        throw new IllegalArgumentException(
                "The max reducer number for FactDistinctColumnsJob is 250, but now it is "
                        + numberOfReducers
                        + ", decrease 'kylin.engine.mr.uhc-reducer-count'");
    }

    job.setReducerClass(FactDistinctColumnsReducer.class);
    job.setPartitionerClass(FactDistinctColumnPartitioner.class);
    job.setNumReduceTasks(numberOfReducers);

    // make each reducer output to respective dir
    MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
    MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
    MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
    MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);

    FileOutputFormat.setOutputPath(job, output);
    job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());

    // prevent to create zero-sized default output
    LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);

    deletePath(job.getConfiguration(), output);
}
 
源代码8 项目: kylin-on-parquet-v2   文件: UHCDictionaryJob.java
private void setupReducer(Path output, int numberOfReducers) throws IOException {
    job.setReducerClass(UHCDictionaryReducer.class);
    job.setPartitionerClass(UHCDictionaryPartitioner.class);
    job.setNumReduceTasks(numberOfReducers);

    MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
    FileOutputFormat.setOutputPath(job, output);
    job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());

    //prevent to create zero-sized default output
    LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);

    deletePath(job.getConfiguration(), output);
}
 
@Override
protected void doSetup(Context context) throws IOException {
    super.bindCurrentConfiguration(context.getConfiguration());
    Configuration conf = context.getConfiguration();
    mos = new MultipleOutputs(context);

    KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
    String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
    CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
    CubeDesc cubeDesc = cube.getDescriptor();
    List<TblColRef> uhcColumns = cubeDesc.getAllUHCColumns();

    int taskId = context.getTaskAttemptID().getTaskID().getId();
    col = uhcColumns.get(taskId);
    logger.info("column name: " + col.getIdentity());

    if (cube.getDescriptor().getShardByColumns().contains(col)) {
        //for ShardByColumns
        builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
        builder.init(null, 0, null);
    } else {
        //for GlobalDictionaryColumns
        String hdfsDir = conf.get(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR);
        DictionaryInfo dictionaryInfo = new DictionaryInfo(col.getColumnDesc(), col.getDatatype());
        String builderClass = cubeDesc.getDictionaryBuilderClass(col);
        builder = (IDictionaryBuilder) ClassUtil.newInstance(builderClass);
        builder.init(dictionaryInfo, 0, hdfsDir);
    }
}
 
@Override
protected void setup(Context context)
    throws IOException, InterruptedException {
  super.setup(context);
  inputSplit = (MainframeDatasetInputSplit)context.getInputSplit();
  mos = new MultipleOutputs<Text, NullWritable>(context);
  numberOfRecords = 0;
  outkey = new Text();
}
 
源代码11 项目: WIFIProbe   文件: AnalysisReducer.java
@Override
protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
    gson = new Gson();
    text = new Text();
    multipleOutputs = new MultipleOutputs<LongWritable, Text>(context);
}
 
源代码12 项目: WIFIProbe   文件: Task.java
private boolean analyze(final String inputFilePath,
                           final String outputFilePath,
                           final Long startTime) throws Exception {
    Configuration conf = new Configuration();
    conf.setLong(Holistic.START_TIME, startTime);
    conf.setLong(Holistic.EXECUTE_TIME, executeHourTime);

    Job jobAnalyze = Job.getInstance(conf, "analyze");

    jobAnalyze.setJarByClass(Holistic.class);

    MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.NEW_OLD_CUSTOMER,
            TextOutputFormat.class, KeyWrapper.class, Text.class);
    MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.CUSTOMER_FLOW_KEY,
            TextOutputFormat.class, KeyWrapper.class, Text.class);
    MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.CYCLE,
            TextOutputFormat.class, KeyWrapper.class, Text.class);
    MultipleOutputs.addNamedOutput(jobAnalyze, MapKeyConfig.IN_STORE_HOUR,
            TextOutputFormat.class, KeyWrapper.class, Text.class);

    jobAnalyze.setMapperClass(AnalysisMapper.class);
    jobAnalyze.setReducerClass(AnalysisReducer.class);
    jobAnalyze.setCombinerClass(AnalysisCombiner.class);

    jobAnalyze.setOutputKeyClass(LongWritable.class);
    jobAnalyze.setOutputValueClass(Text.class);

    jobAnalyze.setMapOutputKeyClass(KeyWrapper.class);
    jobAnalyze.setMapOutputValueClass(ValueWrapper.class);

    FileInputFormat.addInputPath(jobAnalyze, new Path(inputFilePath));
    FileOutputFormat.setOutputPath(jobAnalyze, new Path(outputFilePath));

    return jobAnalyze.waitForCompletion(true) ;
}
 
源代码13 项目: hadoop   文件: MultipleOutputs.java
/**
 * Creates and initializes multiple outputs support,
 * it should be instantiated in the Mapper/Reducer setup method.
 *
 * @param context the TaskInputOutputContext object
 */
public MultipleOutputs(
    TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
  this.context = context;
  namedOutputs = Collections.unmodifiableSet(
    new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
  recordWriters = new HashMap<String, RecordWriter<?, ?>>();
  countersEnabled = getCountersEnabled(context);
}
 
源代码14 项目: incubator-retired-pirk   文件: ExpTableReducer.java
@Override
public void setup(Context ctx) throws IOException, InterruptedException
{
  super.setup(ctx);
  mos = new MultipleOutputs<>(ctx);
  reducerID = String.format("%05d", ctx.getTaskAttemptID().getTaskID().getId());
  logger.info("reducerID = " + reducerID);
}
 
源代码15 项目: incubator-retired-pirk   文件: ColumnMultReducer.java
@Override
public void setup(Context ctx) throws IOException, InterruptedException
{
  super.setup(ctx);

  outputValue = new Text();
  mos = new MultipleOutputs<>(ctx);

  FileSystem fs = FileSystem.newInstance(ctx.getConfiguration());
  String queryDir = ctx.getConfiguration().get("pirMR.queryInputDir");
  query = new HadoopFileSystemStore(fs).recall(queryDir, Query.class);
}
 
源代码16 项目: big-c   文件: MultipleOutputs.java
/**
 * Creates and initializes multiple outputs support,
 * it should be instantiated in the Mapper/Reducer setup method.
 *
 * @param context the TaskInputOutputContext object
 */
public MultipleOutputs(
    TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
  this.context = context;
  namedOutputs = Collections.unmodifiableSet(
    new HashSet<String>(MultipleOutputs.getNamedOutputsList(context)));
  recordWriters = new HashMap<String, RecordWriter<?, ?>>();
  countersEnabled = getCountersEnabled(context);
}
 
源代码17 项目: data-polygamy   文件: AggregationReducer.java
@Override
public void setup(Context context)
        throws IOException, InterruptedException {
    String[] datasetNames = context.getConfiguration().get("dataset-name","").split(",");
    String[] datasetIds = context.getConfiguration().get("dataset-id","").split(",");
    for (int i = 0; i < datasetNames.length; i++)
        idToDataset.put(Integer.parseInt(datasetIds[i]), datasetNames[i]);
    out = new MultipleOutputs<SpatioTemporalWritable,FloatArrayWritable>(context);
    //out = new MultipleOutputs<Text,Text>(context);
}
 
源代码18 项目: data-polygamy   文件: ScalarFunctionDataMapper.java
@Override
public void setup(Context context)
        throws IOException, InterruptedException {
    
    FileSplit fileSplit = (FileSplit) context.getInputSplit();
    String[] fileSplitTokens = fileSplit.getPath().getParent().toString().split("/");
    dataset = fileSplitTokens[fileSplitTokens.length-1];
    
    out = new MultipleOutputs<Text,Text>(context);
}
 
源代码19 项目: data-polygamy   文件: FeatureDataMapper.java
@Override
public void setup(Context context)
        throws IOException, InterruptedException {
    
    FileSplit fileSplit = (FileSplit) context.getInputSplit();
    String[] fileSplitTokens = fileSplit.getPath().getParent().toString().split("/");
    dataset = fileSplitTokens[fileSplitTokens.length-1];
    
    out = new MultipleOutputs<Text,Text>(context);
}
 
@Override
protected void doSetup(Context context) throws IOException, InterruptedException {
    mos = new MultipleOutputs(context);
    KylinConfig config;
    try {
        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
    dicCols = config.getMrHiveDictColumnsExcludeRefColumns();
}
 
源代码21 项目: kylin   文件: BuildGlobalHiveDictTotalBuildJob.java
private void setOutput(Job job, String[] dicColsArr, String outputBase) {
    // make each reducer output to respective dir
    ///user/prod_kylin/tmp/kylin2/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/bs_order_scene_day_new_cube_clone/dict_column=DM_ES_REPORT_ORDER_VIEW0420_DRIVER_ID/part_sort
    for (int i = 0; i < dicColsArr.length; i++) {
        MultipleOutputs.addNamedOutput(job, i + "", TextOutputFormat.class, Text.class, LongWritable.class);
    }
    Path outputPath = new Path(outputBase);
    FileOutputFormat.setOutputPath(job, outputPath);
}
 
源代码22 项目: kylin   文件: BuildGlobalHiveDictPartBuildJob.java
private void setOutput(Job job, String[] dicColsArr, String outputBase) {
    // make each reducer output to respective dir
    // eg: /user/kylin/tmp/kylin/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/kylin_sales_cube_mr/dict_column=KYLIN_SALES_SELLER_ID/part_sort
    for (int i = 0; i < dicColsArr.length; i++) {
        MultipleOutputs.addNamedOutput(job, i + "", TextOutputFormat.class, LongWritable.class, Text.class);
    }
    Path outputPath = new Path(outputBase);
    FileOutputFormat.setOutputPath(job, outputPath);
}
 
源代码23 项目: kylin   文件: FactDistinctColumnsJob.java
private void setupReducer(Path output, CubeSegment cubeSeg)
        throws IOException {
    FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance());
    int numberOfReducers = reducerMapping.getTotalReducerNum();
    logger.info("{} has reducers {}.", this.getClass().getName(), numberOfReducers);
    if (numberOfReducers > 250) {
        throw new IllegalArgumentException(
                "The max reducer number for FactDistinctColumnsJob is 250, but now it is "
                        + numberOfReducers
                        + ", decrease 'kylin.engine.mr.uhc-reducer-count'");
    }

    job.setReducerClass(FactDistinctColumnsReducer.class);
    job.setPartitionerClass(FactDistinctColumnPartitioner.class);
    job.setNumReduceTasks(numberOfReducers);

    // make each reducer output to respective dir
    MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
    MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
    MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
    MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);

    FileOutputFormat.setOutputPath(job, output);
    job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());

    // prevent to create zero-sized default output
    LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);

    deletePath(job.getConfiguration(), output);
}
 
源代码24 项目: kylin   文件: UHCDictionaryJob.java
private void setupReducer(Path output, int numberOfReducers) throws IOException {
    job.setReducerClass(UHCDictionaryReducer.class);
    job.setPartitionerClass(UHCDictionaryPartitioner.class);
    job.setNumReduceTasks(numberOfReducers);

    MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
    FileOutputFormat.setOutputPath(job, output);
    job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());

    //prevent to create zero-sized default output
    LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);

    deletePath(job.getConfiguration(), output);
}
 
源代码25 项目: kylin   文件: UHCDictionaryReducer.java
@Override
protected void doSetup(Context context) throws IOException {
    super.bindCurrentConfiguration(context.getConfiguration());
    Configuration conf = context.getConfiguration();
    mos = new MultipleOutputs(context);

    KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
    String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
    CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
    CubeDesc cubeDesc = cube.getDescriptor();
    List<TblColRef> uhcColumns = cubeDesc.getAllUHCColumns();

    int taskId = context.getTaskAttemptID().getTaskID().getId();
    col = uhcColumns.get(taskId);
    logger.info("column name: " + col.getIdentity());

    if (cube.getDescriptor().getShardByColumns().contains(col)) {
        //for ShardByColumns
        builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
        builder.init(null, 0, null);
    } else {
        //for GlobalDictionaryColumns
        String hdfsDir = conf.get(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR);
        DictionaryInfo dictionaryInfo = new DictionaryInfo(col.getColumnDesc(), col.getDatatype());
        String builderClass = cubeDesc.getDictionaryBuilderClass(col);
        builder = (IDictionaryBuilder) ClassUtil.newInstance(builderClass);
        builder.init(dictionaryInfo, 0, hdfsDir);
    }
}
 
源代码26 项目: rya   文件: AbstractReasoningTool.java
/**
 * Set up the MapReduce job to output a schema (TBox).
 */
protected void configureSchemaOutput() {
    Path outPath = MRReasoningUtils.getSchemaPath(job.getConfiguration());
    SequenceFileOutputFormat.setOutputPath(job, outPath);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(SchemaWritable.class);
    LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
    MultipleOutputs.addNamedOutput(job, "schemaobj",
        SequenceFileOutputFormat.class, NullWritable.class, SchemaWritable.class);
    MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
        TextOutputFormat.class, Text.class, Text.class);
    MultipleOutputs.setCountersEnabled(job, true);
}
 
源代码27 项目: rya   文件: AbstractReasoningTool.java
/**
 * Set up a MapReduce job to output newly derived triples.
 * @param   intermediate    True if this is intermediate data. Outputs
 *                          to [base]-[iteration]-[temp].
 */
protected void configureDerivationOutput(boolean intermediate) {
    Path outPath;
    Configuration conf = job.getConfiguration();
    int iteration = MRReasoningUtils.getCurrentIteration(conf);
    if (intermediate) {
        outPath = MRReasoningUtils.getOutputPath(conf,
            MRReasoningUtils.OUTPUT_BASE + iteration
            + MRReasoningUtils.TEMP_SUFFIX);
    }
    else {
        outPath = MRReasoningUtils.getOutputPath(conf,
            MRReasoningUtils.OUTPUT_BASE + iteration);
    }
    SequenceFileOutputFormat.setOutputPath(job, outPath);
    LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
    MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
        SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
    MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
        SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
    MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
        SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
    MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
        SequenceFileOutputFormat.class, Derivation.class, NullWritable.class);
    MultipleOutputs.setCountersEnabled(job, true);
    // Set up an output for diagnostic info, if needed
    MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
        TextOutputFormat.class, Text.class, Text.class);
}
 
源代码28 项目: rya   文件: ForwardChain.java
@Override
protected void setup(Context context) {
    debugOut = new MultipleOutputs<>(context);
    Configuration conf = context.getConfiguration();
    if (schema == null) {
        schema = MRReasoningUtils.loadSchema(context.getConfiguration());
    }
    debug = MRReasoningUtils.debug(conf);
}
 
源代码29 项目: rya   文件: ForwardChain.java
@Override
public void setup(Context context) {
    mout = new MultipleOutputs<>(context);
    Configuration conf = context.getConfiguration();
    if (schema == null) {
        schema = MRReasoningUtils.loadSchema(conf);
    }
    debug = MRReasoningUtils.debug(conf);
}
 
源代码30 项目: rya   文件: DuplicateElimination.java
@Override
public void setup(Context context) {
    Configuration conf = context.getConfiguration();
    debug = MRReasoningUtils.debug(conf);
    if (debug) {
        debugOut = new MultipleOutputs<>(context);
    }
}
 
 同包方法