下面列出了org.apache.hadoop.mapreduce.CounterGroup#addCounter ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test (timeout=50000)
public void testCountersToJSON() throws Exception {
JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
Counters counters = new Counters();
CounterGroup group1 = counters.addGroup("DOCTORS",
"Incarnations of the Doctor");
group1.addCounter("PETER_CAPALDI", "Peter Capaldi", 12);
group1.addCounter("MATT_SMITH", "Matt Smith", 11);
group1.addCounter("DAVID_TENNANT", "David Tennant", 10);
CounterGroup group2 = counters.addGroup("COMPANIONS",
"Companions of the Doctor");
group2.addCounter("CLARA_OSWALD", "Clara Oswald", 6);
group2.addCounter("RORY_WILLIAMS", "Rory Williams", 5);
group2.addCounter("AMY_POND", "Amy Pond", 4);
group2.addCounter("MARTHA_JONES", "Martha Jones", 3);
group2.addCounter("DONNA_NOBLE", "Donna Noble", 2);
group2.addCounter("ROSE_TYLER", "Rose Tyler", 1);
JsonNode jsonNode = jheh.countersToJSON(counters);
String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions "
+ "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\""
+ ":\"Amy Pond\",\"VALUE\":4},{\"NAME\":\"CLARA_OSWALD\","
+ "\"DISPLAY_NAME\":\"Clara Oswald\",\"VALUE\":6},{\"NAME\":"
+ "\"DONNA_NOBLE\",\"DISPLAY_NAME\":\"Donna Noble\",\"VALUE\":2},"
+ "{\"NAME\":\"MARTHA_JONES\",\"DISPLAY_NAME\":\"Martha Jones\","
+ "\"VALUE\":3},{\"NAME\":\"RORY_WILLIAMS\",\"DISPLAY_NAME\":\"Rory "
+ "Williams\",\"VALUE\":5},{\"NAME\":\"ROSE_TYLER\",\"DISPLAY_NAME\":"
+ "\"Rose Tyler\",\"VALUE\":1}]},{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\""
+ ":\"Incarnations of the Doctor\",\"COUNTERS\":[{\"NAME\":"
+ "\"DAVID_TENNANT\",\"DISPLAY_NAME\":\"David Tennant\",\"VALUE\":10},"
+ "{\"NAME\":\"MATT_SMITH\",\"DISPLAY_NAME\":\"Matt Smith\",\"VALUE\":"
+ "11},{\"NAME\":\"PETER_CAPALDI\",\"DISPLAY_NAME\":\"Peter Capaldi\","
+ "\"VALUE\":12}]}]";
Assert.assertEquals(expected, jsonStr);
}
static Counters fromAvro(JhCounters counters) {
Counters result = new Counters();
if(counters != null) {
for (JhCounterGroup g : counters.groups) {
CounterGroup group =
result.addGroup(StringInterner.weakIntern(g.name.toString()),
StringInterner.weakIntern(g.displayName.toString()));
for (JhCounter c : g.counts) {
group.addCounter(StringInterner.weakIntern(c.name.toString()),
StringInterner.weakIntern(c.displayName.toString()), c.value);
}
}
}
return result;
}
@Test (timeout=50000)
public void testCountersToJSON() throws Exception {
JobHistoryEventHandler jheh = new JobHistoryEventHandler(null, 0);
Counters counters = new Counters();
CounterGroup group1 = counters.addGroup("DOCTORS",
"Incarnations of the Doctor");
group1.addCounter("PETER_CAPALDI", "Peter Capaldi", 12);
group1.addCounter("MATT_SMITH", "Matt Smith", 11);
group1.addCounter("DAVID_TENNANT", "David Tennant", 10);
CounterGroup group2 = counters.addGroup("COMPANIONS",
"Companions of the Doctor");
group2.addCounter("CLARA_OSWALD", "Clara Oswald", 6);
group2.addCounter("RORY_WILLIAMS", "Rory Williams", 5);
group2.addCounter("AMY_POND", "Amy Pond", 4);
group2.addCounter("MARTHA_JONES", "Martha Jones", 3);
group2.addCounter("DONNA_NOBLE", "Donna Noble", 2);
group2.addCounter("ROSE_TYLER", "Rose Tyler", 1);
JsonNode jsonNode = jheh.countersToJSON(counters);
String jsonStr = new ObjectMapper().writeValueAsString(jsonNode);
String expected = "[{\"NAME\":\"COMPANIONS\",\"DISPLAY_NAME\":\"Companions "
+ "of the Doctor\",\"COUNTERS\":[{\"NAME\":\"AMY_POND\",\"DISPLAY_NAME\""
+ ":\"Amy Pond\",\"VALUE\":4},{\"NAME\":\"CLARA_OSWALD\","
+ "\"DISPLAY_NAME\":\"Clara Oswald\",\"VALUE\":6},{\"NAME\":"
+ "\"DONNA_NOBLE\",\"DISPLAY_NAME\":\"Donna Noble\",\"VALUE\":2},"
+ "{\"NAME\":\"MARTHA_JONES\",\"DISPLAY_NAME\":\"Martha Jones\","
+ "\"VALUE\":3},{\"NAME\":\"RORY_WILLIAMS\",\"DISPLAY_NAME\":\"Rory "
+ "Williams\",\"VALUE\":5},{\"NAME\":\"ROSE_TYLER\",\"DISPLAY_NAME\":"
+ "\"Rose Tyler\",\"VALUE\":1}]},{\"NAME\":\"DOCTORS\",\"DISPLAY_NAME\""
+ ":\"Incarnations of the Doctor\",\"COUNTERS\":[{\"NAME\":"
+ "\"DAVID_TENNANT\",\"DISPLAY_NAME\":\"David Tennant\",\"VALUE\":10},"
+ "{\"NAME\":\"MATT_SMITH\",\"DISPLAY_NAME\":\"Matt Smith\",\"VALUE\":"
+ "11},{\"NAME\":\"PETER_CAPALDI\",\"DISPLAY_NAME\":\"Peter Capaldi\","
+ "\"VALUE\":12}]}]";
Assert.assertEquals(expected, jsonStr);
}
static Counters fromAvro(JhCounters counters) {
Counters result = new Counters();
if(counters != null) {
for (JhCounterGroup g : counters.groups) {
CounterGroup group =
result.addGroup(StringInterner.weakIntern(g.name.toString()),
StringInterner.weakIntern(g.displayName.toString()));
for (JhCounter c : g.counts) {
group.addCounter(StringInterner.weakIntern(c.name.toString()),
StringInterner.weakIntern(c.displayName.toString()), c.value);
}
}
}
return result;
}
protected void run(String[] args) throws ParseException, AccumuloSecurityException, AccumuloException, TableNotFoundException, IOException {
parseConfig(args);
ZooKeeperInstance instance = new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers));
Connector connector = instance.getConnector(username, new PasswordToken(password));
Authorizations auths = connector.securityOperations().getUserAuthorizations(connector.whoami());
try (BatchWriter writer = connector.createBatchWriter(tableName, new BatchWriterConfig().setMaxWriteThreads(bwThreads).setMaxMemory(bwMemory)
.setMaxLatency(60, TimeUnit.SECONDS));
BatchScanner scanner = connector.createBatchScanner(tableName, auths, bsThreads)) {
scanner.setRanges(ranges);
for (Entry<Key,Value> entry : scanner) {
Key key = entry.getKey();
ByteArrayDataInput in = ByteStreams.newDataInput(entry.getValue().get());
Counters counters = new Counters();
try {
counters.readFields(in);
} catch (IOException e) {
// The IO exception means the counters are in the wrong format. We *assume* that they are in
// the old (CDH3) format, and de-serialize according to that, and re-write the key with the new value.
in = ByteStreams.newDataInput(entry.getValue().get());
int numGroups = in.readInt();
while (numGroups-- > 0) {
String groupName = Text.readString(in);
String groupDisplayName = Text.readString(in);
CounterGroup group = counters.addGroup(groupName, groupDisplayName);
int groupSize = WritableUtils.readVInt(in);
for (int i = 0; i < groupSize; i++) {
String counterName = Text.readString(in);
String counterDisplayName = counterName;
if (in.readBoolean())
counterDisplayName = Text.readString(in);
long value = WritableUtils.readVLong(in);
group.addCounter(counterName, counterDisplayName, value);
}
}
ByteArrayDataOutput out = ByteStreams.newDataOutput();
counters.write(out);
Mutation m = new Mutation(key.getRow());
m.put(key.getColumnFamily(), key.getColumnQualifier(), key.getColumnVisibilityParsed(), key.getTimestamp() + 1,
new Value(out.toByteArray()));
writer.addMutation(m);
}
}
}
}