下面列出了org.apache.hadoop.mapreduce.Reducer#Context ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public NormalTransformer(Reducer.Context context, IndexInfo indexInfo) throws Exception {
Map<String, String> esTypeMap = indexInfo.getTypeMap();
HCatSchema hCatSchema = HCatInputFormat.getTableSchema(context.getConfiguration());
List<HCatFieldSchema> hCatFieldSchemas = hCatSchema.getFields();
for(HCatFieldSchema hCatFieldSchema : hCatFieldSchemas) {
String fieldName = hCatFieldSchema.getName();
String hiveType = hCatFieldSchema.getTypeString();
if(esTypeMap.containsKey(fieldName)) {
String esType = esTypeMap.get(fieldName);
typeList.add(Type.matchESType(fieldName, esType, indexInfo));
} else {
typeList.add(Type.matchHiveType(fieldName, hiveType, indexInfo));
}
}
}
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
/**
* Add reducer that reads from context and writes to a queue
*/
@SuppressWarnings("unchecked")
void addReducer(TaskInputOutputContext inputContext,
ChainBlockingQueue<KeyValuePair<?, ?>> outputQueue) throws IOException,
InterruptedException {
Class<?> keyOutClass = rConf.getClass(REDUCER_OUTPUT_KEY_CLASS,
Object.class);
Class<?> valueOutClass = rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS,
Object.class);
RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass,
outputQueue, rConf);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) inputContext, rConf);
ReduceRunner runner = new ReduceRunner(reducerContext, reducer, rw);
threads.add(runner);
}
@Override
public void reduce(final IntermediateProspect prospect, final Iterable<LongWritable> counts, final Date timestamp, final Reducer.Context context) throws IOException, InterruptedException {
long sum = 0;
for(final LongWritable count : counts) {
sum += count.get();
}
final String indexType = prospect.getTripleValueType().getIndexType();
// not sure if this is the best idea..
if ((sum >= 0) || indexType.equals(TripleValueType.PREDICATE.getIndexType())) {
final Mutation m = new Mutation(indexType + DELIM + prospect.getData() + DELIM + ProspectorUtils.getReverseIndexDateTime(timestamp));
final String dataType = prospect.getDataType();
final ColumnVisibility visibility = new ColumnVisibility(prospect.getVisibility());
final Value sumValue = new Value(("" + sum).getBytes(StandardCharsets.UTF_8));
m.put(COUNT, prospect.getDataType(), visibility, timestamp.getTime(), sumValue);
context.write(null, m);
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void setup(Reducer.Context context) throws IOException, InterruptedException {
super.setup(context);
maxViolationsInReport = 1000;
dirPath = context.getConfiguration().get(JsonDataVaildationConstants.SLAVE_DIR);
ViolationPersistenceBean bean = new ViolationPersistenceBean();
bean.setLineNum(Integer.MAX_VALUE);
fileHandlerMap = new DVLRUCache(10);
offsetLinesMap = new TreeMap<>();
regexArray = new ArrayList <ViolationPersistenceBean> (maxViolationsInReport);
dataArray = new ArrayList <ViolationPersistenceBean> (maxViolationsInReport);
nullTypeArray = new ArrayList <ViolationPersistenceBean> (maxViolationsInReport);
missingArray = new ArrayList <ViolationPersistenceBean> (maxViolationsInReport);
schemaArray = new ArrayList <ViolationPersistenceBean> (maxViolationsInReport);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void setup(Reducer.Context context) throws IOException, InterruptedException {
super.setup(context);
maxViolationsInReport = context.getConfiguration().getInt(DataValidationConstants.DV_NUM_REPORT_VIOLATION, 1000);
String dir = context.getConfiguration().get(SLAVE_FILE_LOC);
dirPath = JobUtil.getAndReplaceHolders(dir);
fileHandlerMap = new DVLRUCache(DataValidationConstants.TEN);
offsetLinesMap = new TreeMap<>();
ViolationPersistenceBean bean = new ViolationPersistenceBean();
bean.setLineNum(Integer.MAX_VALUE);
nullMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
dataTypeMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
regexMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
numFieldsMap = new MultiValueTreeMap<String,ViolationPersistenceBean>(maxViolationsInReport);
fileNames = new HashSet<String>();
}
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
@SuppressWarnings("unchecked")
<KEYIN, VALUEIN, KEYOUT, VALUEOUT> void runReducer(
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context)
throws IOException, InterruptedException {
RecordWriter<KEYOUT, VALUEOUT> rw = new ChainRecordWriter<KEYOUT, VALUEOUT>(
context);
Reducer.Context reducerContext = createReduceContext(rw,
(ReduceContext) context, rConf);
reducer.run(reducerContext);
rw.close(context);
}
@Test
public void reduce(@Mocked final Reducer.Context defaultContext) throws IOException,InterruptedException {
BitcoinTransactionReducer reducer = new BitcoinTransactionReducer();
final Text defaultKey = new Text("Transaction Input Count:");
final IntWritable oneInt = new IntWritable(1);
final IntWritable twoInt = new IntWritable(2);
final LongWritable resultLong = new LongWritable(3);
final ArrayList al = new ArrayList<IntWritable>();
al.add(oneInt);
al.add(twoInt);
new Expectations() {{
defaultContext.write(defaultKey,resultLong); times=1;
}};
reducer.reduce(defaultKey,al,defaultContext);
}
@Test
public void reduce(@Mocked final Reducer.Context defaultContext) throws IOException,InterruptedException {
EthereumBlockReducer reducer = new EthereumBlockReducer();
final Text defaultKey = new Text("Transaction Count:");
final IntWritable oneInt = new IntWritable(1);
final IntWritable twoInt = new IntWritable(2);
final LongWritable resultLong = new LongWritable(3);
final ArrayList al = new ArrayList<IntWritable>();
al.add(oneInt);
al.add(twoInt);
new Expectations() {{
defaultContext.write(defaultKey,resultLong); times=1;
}};
reducer.reduce(defaultKey,al,defaultContext);
}
public int streamElPrep(Reducer.Context context, String output, String rg,
int threads, SAMRecordIterator SAMit,
SAMFileHeader header, String dictFile, boolean updateRG, boolean keepDups, String RGID) throws InterruptedException, IOException, QualityException {
long startTime = System.currentTimeMillis();
String customArgs = HalvadeConf.getCustomArgs(context.getConfiguration(), "elprep", "");
String[] command = CommandGenerator.elPrep(bin, "/dev/stdin", output, threads, true, rg, null, !keepDups, customArgs);
// runProcessAndWait(command);
ProcessBuilderWrapper builder = new ProcessBuilderWrapper(command, null);
builder.startProcess(true);
BufferedWriter localWriter = builder.getSTDINWriter();
// write header
final StringWriter headerTextBuffer = new StringWriter();
new SAMTextHeaderCodec().encode(headerTextBuffer, header);
final String headerText = headerTextBuffer.toString();
localWriter.write(headerText, 0, headerText.length());
SAMRecord sam;
int reads = 0;
while(SAMit.hasNext()) {
sam = SAMit.next();
if(updateRG)
sam.setAttribute(SAMTag.RG.name(), RGID);
String samString = sam.getSAMString();
localWriter.write(samString, 0, samString.length());
reads++;
}
localWriter.flush();
localWriter.close();
int error = builder.waitForCompletion();
if(error != 0)
throw new ProcessException("elPrep", error);
long estimatedTime = System.currentTimeMillis() - startTime;
Logger.DEBUG("estimated time: " + estimatedTime / 1000);
if(context != null)
context.getCounter(HalvadeCounters.TIME_ELPREP).increment(estimatedTime);
return reads;
}
public void setContext(Reducer.Context context) {
this.context = context;
mem = "-Xmx" + (int)(0.8*Integer.parseInt(context.getConfiguration().get("mapreduce.reduce.memory.mb"))) + "m";
// mem = context.getConfiguration().get("mapreduce.reduce.java.opts");
String customArgs = HalvadeConf.getCustomArgs(context.getConfiguration(), "java", "");
if(customArgs != null)
java.add(customArgs);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void setup(Reducer.Context context) {
Configuration conf = context.getConfiguration();
Type type = new TypeToken<Map<String, String>>() {
}.getType();
filesMap = gson.fromJson(conf.get("filesMap"), type);
validationInfo = gson.fromJson(conf.get("validationInfoJson"), DataSourceCompValidationInfo.class);
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}
protected String checkBinaries(Reducer.Context context) throws IOException {
Logger.DEBUG("Checking for binaries...");
String binDir = null;
URI[] localPaths = context.getCacheArchives();
for(int i = 0; i < localPaths.length; i++ ) {
Path path = new Path(localPaths[i].getPath());
if(path.getName().endsWith("bin.tar.gz")) {
binDir = "./" + path.getName() + "/bin/";
}
}
printDirectoryTree(new File(binDir), 0);
return binDir;
}
protected String checkBinaries(Reducer.Context context) throws IOException {
Logger.DEBUG("Checking for binaries...");
String binDir = null;
URI[] localPaths = context.getCacheArchives();
for(int i = 0; i < localPaths.length; i++ ) {
Path path = new Path(localPaths[i].getPath());
if(path.getName().endsWith("bin.tar.gz")) {
binDir = "./" + path.getName() + "/bin/";
}
}
printDirectoryTree(new File(binDir), 0);
return binDir;
}
public void setContext(Reducer.Context context) {
this.context = context;
// mem = context.getConfiguration().get("mapreduce.reduce.java.opts");
mem = "-Xmx" + (int)(0.8*Integer.parseInt(context.getConfiguration().get("mapreduce.reduce.memory.mb"))) + "m";
String customArgs = HalvadeConf.getCustomArgs(context.getConfiguration(), "java", "");
if(customArgs != null)
java.add(customArgs);
}
public static Transformer getTransoformer(Reducer.Context context, TaskConfig taskConfig, IndexInfo indexInfo) throws Exception {
// 默认使用normal转化
LogUtils.info("use transormer name:default");
return new NormalTransformer(context, indexInfo);
}
public void init(Reducer.Context context) throws Exception {
// 清理es工作目录
HdfsUtil.deleteDir(context.getConfiguration(), localWorkPath);
// 启动es进程
start();
}
@SuppressWarnings({ "rawtypes", "unchecked" })
protected void setup(Reducer.Context context) throws IOException,
InterruptedException {
super.setup(context);
String dir = context.getConfiguration().get(XmlDataValidationConstants.SLAVE_FILE_LOC);
dirPath = JobUtil.getAndReplaceHolders(dir);
fileHandlerMap = new DVLRUCache(XmlDataValidationConstants.TEN);
}
/**
* This method is invoked by {@link ProsectorReducer}. It is used to reduce
* the counts to their final states and write them to output via the
* {@code context}.l
*
* @param prospect - The intermediate prospect that is being reduced.
* @param counts - The counts that need to be reduced.
* @param timestamp - The timestamp that identifies this Prospector run.
* @param context - The reducer context the reduced values will be written to.
* @throws IOException A problem was encountered while writing to the context.
* @throws InterruptedException Writes to the context were interrupted.
*/
public void reduce(IntermediateProspect prospect, Iterable<LongWritable> counts, Date timestamp, Reducer.Context context) throws IOException, InterruptedException;