org.apache.hadoop.mapreduce.Reducer#Context ( )源码实例Demo

下面列出了org.apache.hadoop.mapreduce.Reducer#Context ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: ES-Fastloader   文件: NormalTransformer.java
public NormalTransformer(Reducer.Context context, IndexInfo indexInfo) throws Exception {
    Map<String, String> esTypeMap = indexInfo.getTypeMap();

    HCatSchema hCatSchema = HCatInputFormat.getTableSchema(context.getConfiguration());
    List<HCatFieldSchema> hCatFieldSchemas = hCatSchema.getFields();
    for(HCatFieldSchema hCatFieldSchema : hCatFieldSchemas) {
        String fieldName = hCatFieldSchema.getName();
        String hiveType = hCatFieldSchema.getTypeString();

        if(esTypeMap.containsKey(fieldName)) {
            String esType = esTypeMap.get(fieldName);
            typeList.add(Type.matchESType(fieldName, esType, indexInfo));
        } else {
            typeList.add(Type.matchHiveType(fieldName, hiveType, indexInfo));
        }
    }
}
 
源代码2 项目: hadoop   文件: Chain.java
/**
 * Add reducer that reads from context and writes to a queue
 */
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
    ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
    InterruptedException {

  Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
      Object.class);
  Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
      Object.class);
  RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
      outputQueue, rConf);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) inputContext, rConf);
  ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
  threads.add(runner);
}
 
源代码3 项目: big-c   文件: Chain.java
/**
 * Add reducer that reads from context and writes to a queue
 */
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
    ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
    InterruptedException {

  Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
      Object.class);
  Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
      Object.class);
  RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
      outputQueue, rConf);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) inputContext, rConf);
  ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
  threads.add(runner);
}
 
源代码4 项目: rya   文件: CountPlan.java
@Override
public void reduce(final IntermediateProspect prospect, final Iterable<LongWritable> counts, final Date timestamp, final Reducer.Context context) throws IOException, InterruptedException {
    long sum = 0;
    for(final LongWritable count : counts) {
        sum += count.get();
    }

    final String indexType = prospect.getTripleValueType().getIndexType();

    // not sure if this is the best idea..
    if ((sum >= 0) || indexType.equals(TripleValueType.PREDICATE.getIndexType())) {
        final Mutation m = new Mutation(indexType + DELIM + prospect.getData() + DELIM + ProspectorUtils.getReverseIndexDateTime(timestamp));

        final String dataType = prospect.getDataType();
        final ColumnVisibility visibility = new ColumnVisibility(prospect.getVisibility());
        final Value sumValue = new Value(("" + sum).getBytes(StandardCharsets.UTF_8));
        m.put(COUNT, prospect.getDataType(), visibility, timestamp.getTime(), sumValue);

        context.write(null, m);
    }
}
 
源代码5 项目: jumbune   文件: JsonDataValidationReducer.java
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void setup(Reducer.Context context) throws IOException, InterruptedException {
	super.setup(context);
	maxViolationsInReport = 1000;
	dirPath = context.getConfiguration().get(JsonDataVaildationConstants.SLAVE_DIR);
	ViolationPersistenceBean bean = new ViolationPersistenceBean();
	bean.setLineNum(Integer.MAX_VALUE);
	fileHandlerMap = new DVLRUCache(10);
	offsetLinesMap = new TreeMap<>();

	regexArray = new ArrayList <ViolationPersistenceBean> (maxViolationsInReport);

	dataArray = new ArrayList <ViolationPersistenceBean> (maxViolationsInReport);
	
	nullTypeArray = new ArrayList <ViolationPersistenceBean> (maxViolationsInReport);

	missingArray = new ArrayList <ViolationPersistenceBean> (maxViolationsInReport);

	schemaArray = new ArrayList <ViolationPersistenceBean> (maxViolationsInReport);
	
}
 
源代码6 项目: jumbune   文件: DataValidationReducer.java
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void setup(Reducer.Context context) throws IOException, InterruptedException {
	super.setup(context);
	maxViolationsInReport = context.getConfiguration().getInt(DataValidationConstants.DV_NUM_REPORT_VIOLATION, 1000);
	String dir = context.getConfiguration().get(SLAVE_FILE_LOC);
	dirPath = JobUtil.getAndReplaceHolders(dir);
	fileHandlerMap = new DVLRUCache(DataValidationConstants.TEN);
	
	offsetLinesMap = new TreeMap<>();
	
	ViolationPersistenceBean bean = new ViolationPersistenceBean();
	bean.setLineNum(Integer.MAX_VALUE);
	
	nullMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
	dataTypeMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
	regexMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
	numFieldsMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport); 
	fileNames = new HashSet<String>();
}
 
源代码7 项目: hadoop   文件: Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
    TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
    throws IOException, InterruptedException {
  RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
      context);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) context, rConf);
  reducer.run(reducerContext);
  rw.close(context);
}
 
源代码8 项目: big-c   文件: Chain.java
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
    TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
    throws IOException, InterruptedException {
  RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
      context);
  Reducer.Context reducerContext = createReduceContext(rw,
      (ReduceContext) context, rConf);
  reducer.run(reducerContext);
  rw.close(context);
}
 
@Test
   public void reduce(@Mocked final Reducer.Context defaultContext) throws IOException,InterruptedException {
BitcoinTransactionReducer reducer = new BitcoinTransactionReducer();
final Text defaultKey = new Text("Transaction Input Count:");
final IntWritable oneInt = new IntWritable(1);
final IntWritable twoInt = new IntWritable(2);
final LongWritable resultLong = new LongWritable(3);
final ArrayList al = new ArrayList<IntWritable>();
al.add(oneInt);
al.add(twoInt);
new Expectations() {{
	defaultContext.write(defaultKey,resultLong); times=1;
}};
reducer.reduce(defaultKey,al,defaultContext);
   }
 
@Test
   public void reduce(@Mocked final Reducer.Context defaultContext) throws IOException,InterruptedException {
EthereumBlockReducer reducer = new EthereumBlockReducer();
final Text defaultKey = new Text("Transaction Count:");
final IntWritable oneInt = new IntWritable(1);
final IntWritable twoInt = new IntWritable(2);
final LongWritable resultLong = new LongWritable(3);
final ArrayList al = new ArrayList<IntWritable>();
al.add(oneInt);
al.add(twoInt);
new Expectations() {{
	defaultContext.write(defaultKey,resultLong); times=1;
}};
reducer.reduce(defaultKey,al,defaultContext);
   }
 
源代码11 项目: halvade   文件: PreprocessingTools.java
public int streamElPrep(Reducer.Context context, String output, String rg, 
            int threads, SAMRecordIterator SAMit, 
            SAMFileHeader header, String dictFile, boolean updateRG, boolean keepDups, String RGID) throws InterruptedException, IOException, QualityException {
        long startTime = System.currentTimeMillis();
        String customArgs = HalvadeConf.getCustomArgs(context.getConfiguration(), "elprep", "");  
        String[] command = CommandGenerator.elPrep(bin, "/dev/stdin", output, threads, true, rg, null, !keepDups, customArgs);
//        runProcessAndWait(command);
        ProcessBuilderWrapper builder = new ProcessBuilderWrapper(command, null);
        builder.startProcess(true);        
        BufferedWriter localWriter = builder.getSTDINWriter();
        
        // write header
        final StringWriter headerTextBuffer = new StringWriter();
        new SAMTextHeaderCodec().encode(headerTextBuffer, header);
        final String headerText = headerTextBuffer.toString();
        localWriter.write(headerText, 0, headerText.length());
        
        
        SAMRecord sam;
        int reads = 0;
        while(SAMit.hasNext()) {
            sam = SAMit.next();
            if(updateRG)
                sam.setAttribute(SAMTag.RG.name(), RGID);
            String samString = sam.getSAMString();
            localWriter.write(samString, 0, samString.length());
            reads++;
        }
        localWriter.flush();
        localWriter.close();
                
        int error = builder.waitForCompletion();
        if(error != 0)
            throw new ProcessException("elPrep", error);
        long estimatedTime = System.currentTimeMillis() - startTime;
        Logger.DEBUG("estimated time: " + estimatedTime / 1000);
        if(context != null)
            context.getCounter(HalvadeCounters.TIME_ELPREP).increment(estimatedTime);
        return reads;
    }
 
源代码12 项目: halvade   文件: PreprocessingTools.java
public void setContext(Reducer.Context context) {
        this.context = context;
        mem = "-Xmx" + (int)(0.8*Integer.parseInt(context.getConfiguration().get("mapreduce.reduce.memory.mb"))) + "m";
//        mem = context.getConfiguration().get("mapreduce.reduce.java.opts");
        String customArgs = HalvadeConf.getCustomArgs(context.getConfiguration(), "java", "");  
        if(customArgs != null)
            java.add(customArgs);
    }
 
源代码13 项目: jumbune   文件: DataSourceCompReducer.java
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void setup(Reducer.Context context) {
	
	Configuration conf = context.getConfiguration();
	Type type = new TypeToken<Map<String, String>>() {
	}.getType();

	filesMap = gson.fromJson(conf.get("filesMap"), type);
	validationInfo = gson.fromJson(conf.get("validationInfoJson"), DataSourceCompValidationInfo.class);
	multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}
 
源代码14 项目: halvade   文件: HalvadeReducer.java
protected String checkBinaries(Reducer.Context context) throws IOException {
    Logger.DEBUG("Checking for binaries...");
    String binDir = null;
    URI[] localPaths = context.getCacheArchives();
    for(int i = 0; i < localPaths.length; i++ ) {
        Path path = new Path(localPaths[i].getPath());
        if(path.getName().endsWith("bin.tar.gz")) {
            binDir = "./" + path.getName() + "/bin/";
        }
    }
    printDirectoryTree(new File(binDir), 0);
    return binDir;
}
 
源代码15 项目: halvade   文件: RebuildStarGenomeReducer.java
protected String checkBinaries(Reducer.Context context) throws IOException {
    Logger.DEBUG("Checking for binaries...");
    String binDir = null;
    URI[] localPaths = context.getCacheArchives();
    for(int i = 0; i < localPaths.length; i++ ) {
        Path path = new Path(localPaths[i].getPath());
        if(path.getName().endsWith("bin.tar.gz")) {
            binDir = "./" + path.getName() + "/bin/";
        }
    }
    printDirectoryTree(new File(binDir), 0);
    return binDir;
}
 
源代码16 项目: halvade   文件: GATKTools.java
public void setContext(Reducer.Context context) {
        this.context = context;
//        mem = context.getConfiguration().get("mapreduce.reduce.java.opts");
        mem = "-Xmx" + (int)(0.8*Integer.parseInt(context.getConfiguration().get("mapreduce.reduce.memory.mb"))) + "m";
        String customArgs = HalvadeConf.getCustomArgs(context.getConfiguration(), "java", ""); 
        if(customArgs != null)
            java.add(customArgs);
    }
 
源代码17 项目: ES-Fastloader   文件: TransformerFactory.java
public static Transformer getTransoformer(Reducer.Context context, TaskConfig taskConfig, IndexInfo indexInfo) throws Exception {
    // 默认使用normal转化
    LogUtils.info("use transormer name:default");
    return new NormalTransformer(context, indexInfo);
}
 
源代码18 项目: ES-Fastloader   文件: ESNode.java
public void init(Reducer.Context context) throws Exception {
    // 清理es工作目录
    HdfsUtil.deleteDir(context.getConfiguration(), localWorkPath);
    // 启动es进程
    start();
}
 
源代码19 项目: jumbune   文件: XmlDataValidationReducer.java
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void setup(Reducer.Context context) throws IOException,
InterruptedException {
	super.setup(context);
	
	String dir = context.getConfiguration().get(XmlDataValidationConstants.SLAVE_FILE_LOC);
	
	dirPath = JobUtil.getAndReplaceHolders(dir);
	
	fileHandlerMap = new DVLRUCache(XmlDataValidationConstants.TEN);
	
}
 
源代码20 项目: rya   文件: IndexWorkPlan.java
/**
 * This method is invoked by {@link ProsectorReducer}. It is used to reduce
 * the counts to their final states and write them to output via the
 * {@code context}.l
 *
 * @param prospect - The intermediate prospect that is being reduced.
 * @param counts - The counts that need to be reduced.
 * @param timestamp - The timestamp that identifies this Prospector run.
 * @param context - The reducer context the reduced values will be written to.
 * @throws IOException A problem was encountered while writing to the context.
 * @throws InterruptedException Writes to the context were interrupted.
 */
public void reduce(IntermediateProspect prospect, Iterable<LongWritable> counts, Date timestamp, Reducer.Context context) throws IOException, InterruptedException;
 
 同类方法