下面列出了org.apache.hadoop.mapreduce.Counter#increment ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
Method getCounter, TaskAttemptContext context, long numStale) {
// we can get access to counters only if hbase uses new mapreduce APIs
if (getCounter == null) {
return;
}
try {
for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
Counter ct = (Counter)getCounter.invoke(context,
HBASE_COUNTER_GROUP_NAME, entry.getKey());
ct.increment(entry.getValue());
}
((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
"NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
"NUM_SCAN_RESULTS_STALE")).increment(numStale);
} catch (Exception e) {
LOG.debug("can't update counter." + StringUtils.stringifyException(e));
}
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Counter counter = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER);
counter.increment(1);
_configuration = context.getConfiguration();
_existingRecords = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_EXISTING_RECORDS);
_rowLookup = context.getCounter(BlurIndexCounter.LOOKUP_MAPPER_ROW_LOOKUP_ATTEMPT);
_blurPartitioner = new BlurPartitioner();
TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
_numberOfShardsInTable = tableDescriptor.getShardCount();
_tablePath = new Path(tableDescriptor.getTableUri());
_snapshot = getSnapshot(_configuration);
_totalNumberOfBytes = _configuration.getLong(LookupBuilderReducer.BLUR_CACHE_DIR_TOTAL_BYTES, 128 * 1024 * 1024);
_closer = Closer.create();
}
public long putIfAbsent(final Enum<?> cName, final long value) {
final Counter counter = getCounter(cName);
final long origValue = counter.getValue();
if (origValue == 0)
counter.increment(value);
return origValue;
}
public void map(LongWritable key, Text value, Context context) {
Counter errorCounter = context.getCounter("Log error", "Parse error");
try {
//解析每行日志
String user = parseLog(value.toString());
//写入reduce端,使用Text("test")进行占位,本想使用null占位,但是reduce端需要遍历Map的输出,所以不能使用NullWritable
context.write(new Text(user), new Text("test"));
} catch (Exception e) {
errorCounter.increment(1);
e.printStackTrace();
}
}
public void map(LongWritable key, Text value, Context context) {
Counter errorCounter = context.getCounter("Log error", "Parse error");
try {
//解析每行日志
String user = parseLog(value.toString());
//写入reduce端,使用Text("test")进行占位,本想使用null占位,但是reduce端需要遍历Map的输出,所以不能使用NullWritable
context.write(new Text(user), new Text("test"));
} catch (Exception e) {
errorCounter.increment(1);
e.printStackTrace();
}
}
public void map(LongWritable key, Text value, Context context) {
Counter errorCounter = context.getCounter("Log error", "Parse error");
try {
//解析每行日志
String user = parseLog(value.toString());
//写入reduce端,使用Text("test")进行占位,本想使用null占位,但是reduce端需要遍历Map的输出,所以不能使用NullWritable
context.write(new Text(user), new Text("test"));
} catch (Exception e) {
errorCounter.increment(1);
e.printStackTrace();
}
}
@Override
public void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
Counter c = context.getCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_TOTAL);
c.increment(1);
LinkedHashMap<String, String> rowdata = new LinkedHashMap<>();
rowdata.put("row", Bytes.toString(key.get()));
Iterator<Cell> it = result.listCells().iterator();
while (it.hasNext()) {
Cell cell = it.next();
byte[] qualifier = extractFieldByteArray(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
byte[] value = extractFieldByteArray(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
String _qualifier = Bytes.toString(qualifier);
if (!HbaseMigrateUtils.isIgnoreHbaseQualifier(_qualifier)) {
rowdata.put(_qualifier, Bytes.toString(value));
}
}
// Insert sql.
try {
String insertSql = SimpleHfileToRmdbExporter.currentRmdbManager.buildInsertSql(rowdata);
if (SimpleHfileToRmdbExporter.verbose) {
log.info(format("Inserting [%s]: %s", c.getValue(), insertSql));
}
SimpleHfileToRmdbExporter.currentRmdbManager.getRmdbRepository().saveRowdata(insertSql);
context.getCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_PROCESSED).increment(1);
} catch (Exception e) {
log.error(e);
}
}
@Override
public void incrAllCounters(CounterGroupBase<T> rightGroup) {
try {
for (Counter right : rightGroup) {
Counter left = findCounter(right.getName(), right.getDisplayName());
left.increment(right.getValue());
}
} catch (LimitExceededException e) {
counters.clear();
throw e;
}
}
@Override
public void incrAllCounters(CounterGroupBase<T> rightGroup) {
try {
for (Counter right : rightGroup) {
Counter left = findCounter(right.getName(), right.getDisplayName());
left.increment(right.getValue());
}
} catch (LimitExceededException e) {
counters.clear();
throw e;
}
}
@Override
public boolean incrCounter(String group, String name, long delta) {
if (context == null) {
return false;
}
Counter counter = context.getCounter(group, name);
counter.increment(delta);
return true;
}
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
byte[] row = value.getRow();
Counter c = getCounter(row);
c.increment(1);
}
@Override
public boolean incrCounter(Enum<?> name, long delta) {
if (context == null) {
return false;
}
Counter counter = context.getCounter(name);
counter.increment(delta);
return true;
}
@Override
public boolean incrCounter(String group, String name, long delta) {
if (context == null) {
return false;
}
Counter counter = context.getCounter(group, name);
counter.increment(delta);
return true;
}
@Override
public boolean incrCounter(Enum<?> name, long delta) {
if (context == null) {
return false;
}
Counter counter = context.getCounter(name);
counter.increment(delta);
return true;
}
/**
* @param event
* @param fields
* @param reporter
*/
protected Value createBloomFilter(RawRecordContainer event, Multimap<String,NormalizedContentInterface> fields, StatusReporter reporter) {
Value filterValue = DataTypeHandler.NULL_VALUE;
if (this.bloomFiltersEnabled) {
try {
// Create and start the stopwatch
final Stopwatch stopWatch = new Stopwatch();
stopWatch.start();
// Create the bloom filter, which may involve NGram expansion
final BloomFilterWrapper result = this.createBloomFilter(fields);
final BloomFilter<String> bloomFilter = result.getFilter();
filterValue = MemberShipTest.toValue(bloomFilter);
// Stop the stopwatch
stopWatch.stop();
if (null != reporter) {
final Counter filterCounter = reporter.getCounter(MemberShipTest.class.getSimpleName(), "BloomFilterCreated");
if (null != filterCounter) {
filterCounter.increment(1);
}
final Counter sizeCounter = reporter.getCounter(MemberShipTest.class.getSimpleName(), "BloomFilterSize");
if (null != sizeCounter) {
sizeCounter.increment(filterValue.getSize());
}
final Counter fieldsCounter = reporter.getCounter(MemberShipTest.class.getSimpleName(), "BloomFilterAppliedFields");
if (null != fieldsCounter) {
fieldsCounter.increment(result.getFieldValuesAppliedToFilter());
}
final Counter ngramsCounter = reporter.getCounter(MemberShipTest.class.getSimpleName(), "BloomFilterAppliedNGrams");
if (null != ngramsCounter) {
ngramsCounter.increment(result.getNGramsAppliedToFilter());
}
final Counter prunedCounter = reporter.getCounter(MemberShipTest.class.getSimpleName(), "BloomFilterPrunedNGrams");
if (null != prunedCounter) {
prunedCounter.increment(result.getNGramsPrunedFromFilter());
}
final Counter creationTime = reporter.getCounter(MemberShipTest.class.getSimpleName(), "Creation Time-(ms)");
if (null != creationTime) {
creationTime.increment(stopWatch.elapsed(TimeUnit.MILLISECONDS));
}
}
} catch (Exception e) {
if (null != reporter) {
final Counter errorCounter = reporter.getCounter(MemberShipTest.class.getSimpleName(), "BloomFilterError");
if (null != errorCounter) {
errorCounter.increment(filterValue.getSize());
}
}
}
}
return filterValue;
}
/**
* Parse a pre 0.21 counters string into a counter object.
* @param <C> type of the counter
* @param <G> type of the counter group
* @param <T> type of the counters object
* @param compactString to parse
* @param counters an empty counters object to hold the result
* @return the counters object holding the result
* @throws ParseException
*/
@SuppressWarnings("deprecation")
public static <C extends Counter, G extends CounterGroupBase<C>,
T extends AbstractCounters<C, G>>
T parseEscapedCompactString(String compactString, T counters)
throws ParseException {
IntWritable index = new IntWritable(0);
// Get the group to work on
String groupString =
getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
while (groupString != null) {
IntWritable groupIndex = new IntWritable(0);
// Get the actual name
String groupName =
StringInterner.weakIntern(getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex));
groupName = StringInterner.weakIntern(unescape(groupName));
// Get the display name
String groupDisplayName =
StringInterner.weakIntern(getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex));
groupDisplayName = StringInterner.weakIntern(unescape(groupDisplayName));
// Get the counters
G group = counters.getGroup(groupName);
group.setDisplayName(groupDisplayName);
String counterString =
getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
while (counterString != null) {
IntWritable counterIndex = new IntWritable(0);
// Get the actual name
String counterName =
StringInterner.weakIntern(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex));
counterName = StringInterner.weakIntern(unescape(counterName));
// Get the display name
String counterDisplayName =
StringInterner.weakIntern(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex));
counterDisplayName = StringInterner.weakIntern(unescape(counterDisplayName));
// Get the value
long value =
Long.parseLong(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE,
counterIndex));
// Add the counter
Counter counter = group.findCounter(counterName);
counter.setDisplayName(counterDisplayName);
counter.increment(value);
// Get the next counter
counterString =
getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
}
groupString = getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
}
return counters;
}
/**
* Parse a pre 0.21 counters string into a counter object.
* @param <C> type of the counter
* @param <G> type of the counter group
* @param <T> type of the counters object
* @param compactString to parse
* @param counters an empty counters object to hold the result
* @return the counters object holding the result
* @throws ParseException
*/
@SuppressWarnings("deprecation")
public static <C extends Counter, G extends CounterGroupBase<C>,
T extends AbstractCounters<C, G>>
T parseEscapedCompactString(String compactString, T counters)
throws ParseException {
IntWritable index = new IntWritable(0);
// Get the group to work on
String groupString =
getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
while (groupString != null) {
IntWritable groupIndex = new IntWritable(0);
// Get the actual name
String groupName =
StringInterner.weakIntern(getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex));
groupName = StringInterner.weakIntern(unescape(groupName));
// Get the display name
String groupDisplayName =
StringInterner.weakIntern(getBlock(groupString, UNIT_OPEN, UNIT_CLOSE, groupIndex));
groupDisplayName = StringInterner.weakIntern(unescape(groupDisplayName));
// Get the counters
G group = counters.getGroup(groupName);
group.setDisplayName(groupDisplayName);
String counterString =
getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
while (counterString != null) {
IntWritable counterIndex = new IntWritable(0);
// Get the actual name
String counterName =
StringInterner.weakIntern(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex));
counterName = StringInterner.weakIntern(unescape(counterName));
// Get the display name
String counterDisplayName =
StringInterner.weakIntern(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE, counterIndex));
counterDisplayName = StringInterner.weakIntern(unescape(counterDisplayName));
// Get the value
long value =
Long.parseLong(getBlock(counterString, UNIT_OPEN, UNIT_CLOSE,
counterIndex));
// Add the counter
Counter counter = group.findCounter(counterName);
counter.setDisplayName(counterDisplayName);
counter.increment(value);
// Get the next counter
counterString =
getBlock(groupString, COUNTER_OPEN, COUNTER_CLOSE, groupIndex);
}
groupString = getBlock(compactString, GROUP_OPEN, GROUP_CLOSE, index);
}
return counters;
}
/**
* Tests job counters retrieval.
*
* @throws Exception If failed.
*/
@Test
public void testJobCounters() throws Exception {
IgniteFileSystem igfs = grid(0).fileSystem(HadoopAbstractSelfTest.igfsName);
igfs.mkdirs(new IgfsPath(PATH_INPUT));
try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(igfs.create(
new IgfsPath(PATH_INPUT + "/test.file"), true)))) {
bw.write(
"alpha\n" +
"beta\n" +
"gamma\n" +
"alpha\n" +
"beta\n" +
"gamma\n" +
"alpha\n" +
"beta\n" +
"gamma\n"
);
}
Configuration conf = config(HadoopAbstractSelfTest.REST_PORT);
final Job job = Job.getInstance(conf);
try {
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(TestCountingMapper.class);
job.setReducerClass(TestCountingReducer.class);
job.setCombinerClass(TestCountingCombiner.class);
FileInputFormat.setInputPaths(job, new Path("igfs://" + igfsName + "@" + PATH_INPUT));
FileOutputFormat.setOutputPath(job, new Path("igfs://" + igfsName + "@" + PATH_OUTPUT));
job.submit();
final Counter cntr = job.getCounters().findCounter(TestCounter.COUNTER1);
assertEquals(0, cntr.getValue());
cntr.increment(10);
assertEquals(10, cntr.getValue());
// Transferring to map phase.
setupLockFile.delete();
// Transferring to reduce phase.
mapLockFile.delete();
job.waitForCompletion(false);
assertEquals("job must end successfully", JobStatus.State.SUCCEEDED, job.getStatus().getState());
final Counters counters = job.getCounters();
assertNotNull("counters cannot be null", counters);
assertEquals("wrong counters count", 3, counters.countCounters());
assertEquals("wrong counter value", 15, counters.findCounter(TestCounter.COUNTER1).getValue());
assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER2).getValue());
assertEquals("wrong counter value", 3, counters.findCounter(TestCounter.COUNTER3).getValue());
}
catch (Throwable t) {
log.error("Unexpected exception", t);
}
finally {
job.getCluster().close();
}
}
public void reduce(Text key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
int minDistance = Node.INFINITE;
System.out.println("input -> K[" + key + "]");
Node shortestAdjacentNode = null;
Node originalNode = null;
for (Text textValue : values) {
System.out.println(" input -> V[" + textValue + "]");
Node node = Node.fromMR(textValue.toString());
if(node.containsAdjacentNodes()) {
// the original data
//
originalNode = node;
}
if(node.getDistance() < minDistance) {
minDistance = node.getDistance();
shortestAdjacentNode = node;
}
}
if(shortestAdjacentNode != null) {
originalNode.setDistance(minDistance);
originalNode.setBackpointer(shortestAdjacentNode.getBackpointer());
}
outValue.set(originalNode.toString());
System.out.println(
" output -> K[" + key + "],V[" + outValue + "]");
context.write(key, outValue);
if (minDistance != Node.INFINITE &&
targetNode.equals(key.toString())) {
Counter counter = context.getCounter(
PathCounter.TARGET_NODE_DISTANCE_COMPUTED);
counter.increment(minDistance);
context.getCounter(PathCounter.PATH.toString(),
shortestAdjacentNode.getBackpointer()).increment(1);
}
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Counter counter = context.getCounter(BlurIndexCounter.INPUT_FORMAT_MAPPER);
counter.increment(1);
_existingRecords = context.getCounter(BlurIndexCounter.INPUT_FORMAT_EXISTING_RECORDS);
}