下面列出了怎么用org.apache.hadoop.mapreduce.Counter的API类实例代码及写法,或者点击链接到github查看源代码。
private static Method findCounterMethod(TaskAttemptContext context) {
if (context != null) {
if (COUNTER_METHODS_BY_CLASS.containsKey(context.getClass())) {
return COUNTER_METHODS_BY_CLASS.get(context.getClass());
}
try {
Method method = context.getClass().getMethod("getCounter", String.class, String.class);
if (method.getReturnType().isAssignableFrom(Counter.class)) {
COUNTER_METHODS_BY_CLASS.put(context.getClass(), method);
return method;
}
} catch (NoSuchMethodException e) {
return null;
}
}
return null;
}
@Override
public JsonElement serialize(CounterGroup cg, Type t, JsonSerializationContext ctx) {
JsonObject obj = new JsonObject();
if (!cg.getName().equals(cg.getDisplayName()))
obj.addProperty("displayName", cg.getDisplayName());
JsonObject dns = new JsonObject();
boolean anyNamesDiffer = false;
for (Counter c : cg) {
obj.addProperty(c.getName(), c.getValue());
if (!c.getName().equals(c.getDisplayName()))
anyNamesDiffer = true;
dns.addProperty(c.getName(), c.getDisplayName());
}
if (anyNamesDiffer)
obj.add("displayNames", dns);
return obj;
}
/**
* Create a {@link org.apache.gobblin.metrics.GobblinMetrics} instance for this job run from the Hadoop counters.
*/
@VisibleForTesting
void countersToMetrics(GobblinMetrics metrics) throws IOException {
Optional<Counters> counters = Optional.fromNullable(this.job.getCounters());
if (counters.isPresent()) {
// Write job-level counters
CounterGroup jobCounterGroup = counters.get().getGroup(MetricGroup.JOB.name());
for (Counter jobCounter : jobCounterGroup) {
metrics.getCounter(jobCounter.getName()).inc(jobCounter.getValue());
}
// Write task-level counters
CounterGroup taskCounterGroup = counters.get().getGroup(MetricGroup.TASK.name());
for (Counter taskCounter : taskCounterGroup) {
metrics.getCounter(taskCounter.getName()).inc(taskCounter.getValue());
}
}
}
/**
* Construct from another counters object.
* @param <C1> type of the other counter
* @param <G1> type of the other counter group
* @param counters the counters object to copy
* @param groupFactory the factory for new groups
*/
@InterfaceAudience.Private
public <C1 extends Counter, G1 extends CounterGroupBase<C1>>
AbstractCounters(AbstractCounters<C1, G1> counters,
CounterGroupFactory<C, G> groupFactory) {
this.groupFactory = groupFactory;
for(G1 group: counters) {
String name = group.getName();
G newGroup = groupFactory.newGroup(name, group.getDisplayName(), limits);
(isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup);
for(Counter counter: group) {
newGroup.addCounter(counter.getName(), counter.getDisplayName(),
counter.getValue());
}
}
}
@Override
protected void setup(final Context context) throws IOException, InterruptedException {
BlurOutputFormat.setProgressable(context);
BlurOutputFormat.setGetCounter(new GetCounter() {
@Override
public Counter getCounter(Enum<?> counterName) {
return context.getCounter(counterName);
}
});
_newRecordsUpdate = context.getCounter(BLUR_UPDATE, NEW_RECORDS + SEP + UPDATE);
_newRecordsNoUpdate = context.getCounter(BLUR_UPDATE, NEW_RECORDS + SEP + NO_UPDATE);
_existingRecordsUpdate = context.getCounter(BLUR_UPDATE, EXISTING_RECORDS + SEP + UPDATE);
_existingRecordsNoUpdate = context.getCounter(BLUR_UPDATE, EXISTING_RECORDS + SEP + NO_UPDATE);
_ignoredExistingRows = context.getCounter(BLUR_UPDATE, IGNORED_EXISTING_ROWS);
_debugRecordsWithSameRecordId = context.getCounter(BLUR_UPDATE_DEBUG, MULTIPLE_RECORD_W_SAME_RECORD_ID);
_debugMarkerRecordsNoUpdate = context.getCounter(BLUR_UPDATE_DEBUG, MARKER_RECORDS + SEP + NO_UPDATE);
_debugMarkerRecordsUpdate = context.getCounter(BLUR_UPDATE_DEBUG, MARKER_RECORDS + SEP + UPDATE);
_debugIndexValues = context.getCounter(BLUR_UPDATE_DEBUG, INDEX_VALUES);
_debugNullBlurRecords = context.getCounter(BLUR_UPDATE_DEBUG, NULL_BLUR_RECORDS);
}
protected static void updateCounters(ScanMetrics scanMetrics, long numScannerRestarts,
Method getCounter, TaskAttemptContext context, long numStale) {
// we can get access to counters only if hbase uses new mapreduce APIs
if (getCounter == null) {
return;
}
try {
for (Map.Entry<String, Long> entry:scanMetrics.getMetricsMap().entrySet()) {
Counter ct = (Counter)getCounter.invoke(context,
HBASE_COUNTER_GROUP_NAME, entry.getKey());
ct.increment(entry.getValue());
}
((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
"NUM_SCANNER_RESTARTS")).increment(numScannerRestarts);
((Counter) getCounter.invoke(context, HBASE_COUNTER_GROUP_NAME,
"NUM_SCAN_RESULTS_STALE")).increment(numStale);
} catch (Exception e) {
LOG.debug("can't update counter." + StringUtils.stringifyException(e));
}
}
/**
* Construct from another counters object.
* @param <C1> type of the other counter
* @param <G1> type of the other counter group
* @param counters the counters object to copy
* @param groupFactory the factory for new groups
*/
@InterfaceAudience.Private
public <C1 extends Counter, G1 extends CounterGroupBase<C1>>
AbstractCounters(AbstractCounters<C1, G1> counters,
CounterGroupFactory<C, G> groupFactory) {
this.groupFactory = groupFactory;
for(G1 group: counters) {
String name = group.getName();
G newGroup = groupFactory.newGroup(name, group.getDisplayName(), limits);
(isFrameworkGroup(name) ? fgroups : groups).put(name, newGroup);
for(Counter counter: group) {
newGroup.addCounter(counter.getName(), counter.getDisplayName(),
counter.getValue());
}
}
}
public static long run(String collection, Path input, Path output,
Configuration baseConf) throws IOException, ClassNotFoundException,
InterruptedException {
Configuration conf = new Configuration(baseConf);
Job job = Job.getInstance(conf);
job.setJarByClass(OnlineFeatureDriver.class);
job.setJobName("GROUP each record's feature BY identifier");
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OnlineVectorWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ListWritable.class);
job.setMapperClass(OnlineFeatureMapper.class);
job.setReducerClass(OnlineFeatureReducer.class);
HadoopUtil.delete(conf, output);
boolean succeeded = job.waitForCompletion(true);
if (!succeeded) {
throw new IllegalStateException("Job:Group feature, Failed!");
}
Counter counter = job.getCounters().findCounter(
"org.apache.hadoop.mapred.Task$Counter",
"REDUCE_OUTPUT_RECORDS");
long reduceOutputRecords = counter.getValue();
LOG.info(
"Job: GROUP each record's feature BY identifier, output recordes = {}",
reduceOutputRecords);
return reduceOutputRecords;
}
@Override
@SuppressWarnings("unchecked")
public void incrAllCounters(CounterGroupBase<C> other) {
if (checkNotNull(other.getUnderlyingGroup(), "other group")
instanceof FileSystemCounterGroup<?>) {
for (Counter counter : other) {
FSCounter c = (FSCounter) ((Counter)counter).getUnderlyingCounter();
findCounter(c.scheme, c.key) .increment(counter.getValue());
}
}
}
private void clearCounters() {
for (Group g : countersToMetrics) {
for (Counter c : g) {
c.setValue(0);
}
}
}
private static long getRecordCountFromCounter(TaskAttemptContext context, Enum<?> counterName) {
try {
Method getCounterMethod = context.getClass().getMethod("getCounter", Enum.class);
return ((Counter) getCounterMethod.invoke(context, counterName)).getValue();
} catch (Exception e) {
throw new RuntimeException("Error reading record count counter", e);
}
}
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input,
Counter inputKeyCounter,
Counter inputValueCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
RawComparator<KEYIN> comparator,
Class<KEYIN> keyClass,
Class<VALUEIN> valueClass
) throws InterruptedException, IOException{
super(conf, taskid, output, committer, reporter);
this.input = input;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
this.comparator = comparator;
this.serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(buffer);
this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
this.valueDeserializer.open(buffer);
hasMore = input.next();
this.keyClass = keyClass;
this.valueClass = valueClass;
this.conf = conf;
this.taskid = taskid;
}
@Override
public Counter getCounter(String group, String name) {
try {
return context.getCounter(group, name);
} catch (NullPointerException npe) {
return null;
}
}
@Test
public void testMapper() throws IOException, InterruptedException {
Configuration conf = new Configuration();
// basic config
conf.set("statsd.host", "localhost");
conf.set("statsd.port", "8125");
conf.set("mapreduce.job.queuename", "queue1");
conf.set("mapreduce.job.name", "job1");
// some valid aspect configs
conf.set("statsd.final.gauge.MyGroup1", "CounterGroup1");
conf.set("statsd.final.counter.MyGroup2", "CounterGroup2/Counter1");
conf.set("statsd.live.time.MyGroup3.MyCounter2", "CounterGroup3/Counter2");
conf.set("statsd.live.counter.TestGroup", TestCounters.class.getName());
CounterToStatsDConfiguration config = new CounterToStatsDConfiguration(conf);
TestStatsDEnabledMapper mapper = new TestStatsDEnabledMapper();
Mapper.Context context = mapper.createTestContext(conf);
mapper.setup(context);
Assert.assertNotNull(mapper.getHelper());
TaskAttemptContext returnedContext = mapper.getContext(context);
Assert.assertEquals(CounterStatsDClient.class.getName() + '$' + "StatsDTaskAttemptContext", returnedContext.getClass().getName());
Counter testCounter = mapper.getCounter(context, TestCounters.COUNTER1);
Assert.assertEquals(CounterStatsDClient.class.getName() + '$' + "StatsDCounter", testCounter.getClass().getName());
testCounter = mapper.getCounter(context, "CounterGroup1", "Counter1");
Assert.assertEquals(CounterStatsDClient.class.getName() + '$' + "StatsDCounter", testCounter.getClass().getName());
Assert.assertFalse(((CounterStatsDClientTest.TestCounterStatsDClient) (mapper.getHelper()).getClient()).stopped);
mapper.cleanup(context);
Assert.assertNull(mapper.getHelper().getClient());
}
@Override
public boolean incrCounter(String group, String name, long delta) {
if (context == null) {
return false;
}
Counter counter = context.getCounter(group, name);
counter.increment(delta);
return true;
}
/** {@inheritDoc} */
@Override public Counter addCounter(String name, String displayName, long value) {
final Counter counter = cntrs.findCounter(this.name, name);
counter.setValue(value);
return counter;
}
@Override
public void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
Counter c = context.getCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_TOTAL);
c.increment(1);
LinkedHashMap<String, String> rowdata = new LinkedHashMap<>();
rowdata.put("row", Bytes.toString(key.get()));
Iterator<Cell> it = result.listCells().iterator();
while (it.hasNext()) {
Cell cell = it.next();
byte[] qualifier = extractFieldByteArray(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
byte[] value = extractFieldByteArray(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
String _qualifier = Bytes.toString(qualifier);
if (!HbaseMigrateUtils.isIgnoreHbaseQualifier(_qualifier)) {
rowdata.put(_qualifier, Bytes.toString(value));
}
}
// Insert sql.
try {
String insertSql = SimpleHfileToRmdbExporter.currentRmdbManager.buildInsertSql(rowdata);
if (SimpleHfileToRmdbExporter.verbose) {
log.info(format("Inserting [%s]: %s", c.getValue(), insertSql));
}
SimpleHfileToRmdbExporter.currentRmdbManager.getRmdbRepository().saveRowdata(insertSql);
context.getCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_PROCESSED).increment(1);
} catch (Exception e) {
log.error(e);
}
}
public TaskCounterGroupInfo(String name, CounterGroup group) {
this.counterGroupName = name;
this.counter = new ArrayList<TaskCounterInfo>();
for (Counter c : group) {
TaskCounterInfo cinfo = new TaskCounterInfo(c.getName(), c.getValue());
this.counter.add(cinfo);
}
}
public CounterGroupInfo(String name, CounterGroup group, CounterGroup mg,
CounterGroup rg) {
this.counterGroupName = name;
this.counter = new ArrayList<CounterInfo>();
for (Counter c : group) {
Counter mc = mg == null ? null : mg.findCounter(c.getName());
Counter rc = rg == null ? null : rg.findCounter(c.getName());
CounterInfo cinfo = new CounterInfo(c, mc, rc);
this.counter.add(cinfo);
}
}
private void updateProgressSplits() {
double newProgress = reportedStatus.progress;
newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
Counters counters = reportedStatus.counters;
if (counters == null)
return;
WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
if (splitsBlock != null) {
long now = clock.getTime();
long start = getLaunchTime(); // TODO Ensure not 0
if (start != 0 && now - start <= Integer.MAX_VALUE) {
splitsBlock.getProgressWallclockTime().extend(newProgress,
(int) (now - start));
}
Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
splitsBlock.getProgressCPUTime().extend(newProgress,
(int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
}
Counter virtualBytes = counters
.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
if (virtualBytes != null) {
splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
(int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
Counter physicalBytes = counters
.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
if (physicalBytes != null) {
splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
(int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
}
}
private void updateProgressSplits() {
double newProgress = reportedStatus.progress;
newProgress = Math.max(Math.min(newProgress, 1.0D), 0.0D);
Counters counters = reportedStatus.counters;
if (counters == null)
return;
WrappedProgressSplitsBlock splitsBlock = getProgressSplitBlock();
if (splitsBlock != null) {
long now = clock.getTime();
long start = getLaunchTime(); // TODO Ensure not 0
if (start != 0 && now - start <= Integer.MAX_VALUE) {
splitsBlock.getProgressWallclockTime().extend(newProgress,
(int) (now - start));
}
Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
splitsBlock.getProgressCPUTime().extend(newProgress,
(int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
}
Counter virtualBytes = counters
.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
if (virtualBytes != null) {
splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
(int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
Counter physicalBytes = counters
.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
if (physicalBytes != null) {
splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
(int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
}
}
}
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
Counter slotMillisMapCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_MAPS);
if (slotMillisMapCounter != null) {
summary.setMapSlotSeconds(slotMillisMapCounter.getValue() / 1000);
}
Counter slotMillisReduceCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_REDUCES);
if (slotMillisReduceCounter != null) {
summary.setReduceSlotSeconds(slotMillisReduceCounter.getValue() / 1000);
}
}
private void updateCounter()
{
long[] operatorTime = getOperatorTime();
String profileCounterGroupName =
PhaseContext.isMapper() ? mapperProfileCounterGroupName
: reducerProfileCounterGroupName;
ArrayNode operatorsJson = multipassOperatorsJson.get(currentPassIndex);
for (int i = 0; i < operatorTime.length; i++)
{
if (operatorTime[i] > 0)
{
JsonNode operatorJson = operatorsJson.get(i);
OperatorType type =
OperatorType.valueOf(operatorJson.get("operator").getTextValue());
String outputName = operatorJson.get("output").getTextValue();
String counterName =
String.format("P%d-O%d-%s-%s",
currentPassIndex,
i,
type,
outputName);
Counter profileCounter =
PhaseContext.getCounter(profileCounterGroupName, counterName);
profileCounter.increment(operatorTime[i]);
}
}
}
@Override
public synchronized boolean equals(Object genericRight) {
if (genericRight instanceof Counter) {
synchronized (genericRight) {
Counter right = (Counter) genericRight;
return getName().equals(right.getName()) &&
getDisplayName().equals(right.getDisplayName()) &&
getValue() == right.getValue();
}
}
return false;
}
/**
* Make the pre 0.21 counter string (for e.g. old job history files)
* [(actual-name)(display-name)(value)]
* @param counter to stringify
* @return the stringified result
*/
public static String toEscapedCompactString(Counter counter) {
// First up, obtain the strings that need escaping. This will help us
// determine the buffer length apriori.
String escapedName, escapedDispName;
long currentValue;
synchronized(counter) {
escapedName = escape(counter.getName());
escapedDispName = escape(counter.getDisplayName());
currentValue = counter.getValue();
}
int length = escapedName.length() + escapedDispName.length() + 4;
length += 8; // For the following delimiting characters
StringBuilder builder = new StringBuilder(length);
builder.append(COUNTER_OPEN);
// Add the counter name
builder.append(UNIT_OPEN);
builder.append(escapedName);
builder.append(UNIT_CLOSE);
// Add the display name
builder.append(UNIT_OPEN);
builder.append(escapedDispName);
builder.append(UNIT_CLOSE);
// Add the value
builder.append(UNIT_OPEN);
builder.append(currentValue);
builder.append(UNIT_CLOSE);
builder.append(COUNTER_CLOSE);
return builder.toString();
}
/**
* FrameworkGroup ::= #counter (key value)*
*/
@Override
@SuppressWarnings("unchecked")
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, size());
for (int i = 0; i < counters.length; ++i) {
Counter counter = (C) counters[i];
if (counter != null) {
WritableUtils.writeVInt(out, i);
WritableUtils.writeVLong(out, counter.getValue());
}
}
}
@Override
public Counter getCounter(Enum<?> counterName) {
throw new NotImplementedException();
}
public Counter getBadLineCount() {
return badLineCount;
}
@Override
public Counter getCounter(Enum<?> counterName) {
return base.getCounter(counterName);
}
@Override
public Counter getCounter(final Enum<?> counterName) {
return context.getCounter(counterName);
}