下面列出了怎么用org.apache.hadoop.mapreduce.Reducer.Context的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void reduce(Text key, Iterable<MergeRecord> vals, Context c)
throws IOException, InterruptedException {
SqoopRecord bestRecord = null;
try {
for (MergeRecord val : vals) {
if (null == bestRecord && !val.isNewRecord()) {
// Use an old record if we don't have a new record.
bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
} else if (val.isNewRecord()) {
bestRecord = (SqoopRecord) val.getSqoopRecord().clone();
}
}
} catch (CloneNotSupportedException cnse) {
throw new IOException(cnse);
}
if (null != bestRecord) {
writeRecord(bestRecord, c);
}
}
public void map(Object key, Text value,
Context context) throws IOException, InterruptedException{
int rating, reviewIndex, movieIndex;
String reviews = new String();
String tok = new String();
String ratingStr = new String();
String line = ((Text)value).toString();
movieIndex = line.indexOf(":");
if (movieIndex > 0) {
reviews = line.substring(movieIndex + 1);
StringTokenizer token = new StringTokenizer(reviews, ",");
while (token.hasMoreTokens()) {
tok = token.nextToken();
reviewIndex = tok.indexOf("_");
ratingStr = tok.substring(reviewIndex + 1);
rating = Integer.parseInt(ratingStr);
context.write(new IntWritable(rating), one);
}
}
}
public void map(VarLongWritable key,VectorWritable value,Context context) throws IOException, InterruptedException{
long userID=key.get();
Vector userVector=value.get();
Iterator<Vector.Element> it=userVector.nonZeroes().iterator();
IntWritable itemi=new IntWritable();
while(it.hasNext()){
Vector.Element e=it.next();
int itemIndex=e.index();
float preferenceValue=(float)e.get();
itemi.set(itemIndex);
context.write(itemi, new VectorOrPrefWritable(userID,preferenceValue));
System.out.println("item :"+itemi+",userand val:"+userID+","+preferenceValue);
}
}
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
private void setContextTaskId(final int taskId) {
Context context = reduceDriver.getContext();
when(context.getTaskAttemptID()).thenAnswer(new Answer<TaskAttemptID>() {
@Override
public TaskAttemptID answer(InvocationOnMock invocation) throws Throwable {
return TaskAttemptID.forName("attempt__0000_r_" + taskId + "_0");
}
});
}
@Override
public Iterator<SolrInputDocument> orderUpdates(Text key, Iterator<SolrInputDocument> updates, Context ctx) {
SolrInputDocument firstUpdate = null;
while (updates.hasNext()) {
if (firstUpdate == null) {
firstUpdate = updates.next();
assert firstUpdate != null;
} else {
throw new IllegalArgumentException("Update conflict! Documents with the same unique key are forbidden: "
+ key);
}
}
assert firstUpdate != null;
return Collections.singletonList(firstUpdate).iterator();
}
/** Returns the most recent document among the colliding updates */
protected Iterator<SolrInputDocument> getMaximum(Iterator<SolrInputDocument> updates, String fieldName,
Comparator child, Context context) {
SolrInputDocumentComparator comp = new SolrInputDocumentComparator(fieldName, child);
SolrInputDocument max = null;
long numDupes = 0;
long numOutdated = 0;
while (updates.hasNext()) {
SolrInputDocument next = updates.next();
assert next != null;
if (max == null) {
max = next;
} else {
int c = comp.compare(next, max);
if (c == 0) {
LOG.debug("Ignoring document version because it is a duplicate: {}", next);
numDupes++;
} else if (c > 0) {
LOG.debug("Ignoring document version because it is outdated: {}", max);
max = next;
numOutdated++;
} else {
LOG.debug("Ignoring document version because it is outdated: {}", next);
numOutdated++;
}
}
}
assert max != null;
if (numDupes > 0) {
context.getCounter(COUNTER_GROUP, DUPLICATES_COUNTER_NAME).increment(numDupes);
}
if (numOutdated > 0) {
context.getCounter(COUNTER_GROUP, OUTDATED_COUNTER_NAME).increment(numOutdated);
}
return Collections.singletonList(max).iterator();
}
public void reduce(IntWritable key,Iterable<VectorOrPrefWritable> values ,Context context ) throws IOException, InterruptedException{
for(VectorOrPrefWritable va:values){
context.write(key, va);
System.err.println("key"+key.toString()+",vlaue"+va);
}
}
abstract protected void writeRecord(SqoopRecord record, Context c)
throws IOException, InterruptedException;
public void setContext(@SuppressWarnings("rawtypes") final Context context) {
this.context = context;
}
@SuppressWarnings("rawtypes")
public Context getContext() {
return this.context;
}
@Override
public Iterator<SolrInputDocument> orderUpdates(Text key, Iterator<SolrInputDocument> updates, Context ctx) {
return getMaximum(updates, getOrderByFieldName(), new SolrInputDocumentComparator.TimeStampComparator(), ctx);
}
@Override
public Iterator<SolrInputDocument> orderUpdates(Text key, Iterator<SolrInputDocument> updates, Context ctx) {
return updates;
}
@Override
public Iterator<SolrInputDocument> orderUpdates(Text key, Iterator<SolrInputDocument> updates, Context ctx) {
return sort(updates, getOrderByFieldName(), new SolrInputDocumentComparator.TimeStampComparator());
}
@Override
public boolean inIllustrator(
org.apache.hadoop.mapreduce.Reducer.Context context) {
return (context instanceof PigMapReduce.Reduce.IllustratorContext);
}
@Override
public POPackage getPack(
org.apache.hadoop.mapreduce.Reducer.Context context) {
return ((PigMapReduce.Reduce.IllustratorContext) context).pack;
}
@Override
public boolean inIllustrator(Context context) {
return ((WrappedMapper.Context)context).getConfiguration().get("inIllustrator")!=null;
}
@Override
public boolean inIllustrator(org.apache.hadoop.mapreduce.Reducer.Context context) {
return (context instanceof PigMapReduce.IllustrateReducerContext.IllustratorContext);
}
@Override
public POPackage getPack(org.apache.hadoop.mapreduce.Reducer.Context context) {
return ((PigMapReduce.IllustrateReducerContext.IllustratorContext) context).getPack();
}
/**
*
* Get mapper's illustrator context
*
* @param conf Configuration
* @param input Input bag to serve as data source
* @param output Map output buffer
* @param split the split
* @return Illustrator's context
* @throws IOException
* @throws InterruptedException
*/
@Override
public Context getIllustratorContext(Configuration conf, DataBag input,
List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.Mapper.Context mapperContext = new WrappedMapper<Text, Tuple, PigNullableWritable, Writable>().getMapContext(new IllustratorContext(conf, input, output, split));
return mapperContext;
}
/**
* Get reducer's illustrator context
*
* @param input Input buffer as output by maps
* @param pkg package
* @return reducer's illustrator context
* @throws IOException
* @throws InterruptedException
*/
@Override
public Context getIllustratorContext(Job job,
List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException {
org.apache.hadoop.mapreduce.Reducer.Context reducerContext = new IllustrateReducerContext()
.getReducerContext(new IllustratorContextImpl(job, input, pkg));
return reducerContext;
}
/**
* Given a list of all colliding document updates for the same unique document
* key, this method returns zero or more documents in an application specific
* order.
*
* The caller will then apply the updates for this key to Solr in the order
* returned by the orderUpdate() method.
*
* @param uniqueKey
* the document key common to all collidingUpdates mentioned below
* @param collidingUpdates
* all updates in the MapReduce job that have a key equal to
* {@code uniqueKey} mentioned above. The input order is unspecified.
* @param context
* The <code>Context</code> passed from the {@link Reducer}
* implementations.
* @return the order in which the updates shall be applied to Solr
*/
Iterator<SolrInputDocument> orderUpdates(
Text uniqueKey, Iterator<SolrInputDocument> collidingUpdates, Context context);
/**
* Get reducer's illustrator context
*
* @param input Input buffer as output by maps
* @param pkg package
* @return reducer's illustrator context
* @throws IOException
* @throws InterruptedException
*/
@Override
public Context getIllustratorContext(Job job,
List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException {
return new IllustratorContext(job, input, pkg);
}