下面列出了org.apache.hadoop.mapreduce.Counters#readFields ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private Counters getCounters(String jobId) {
Counters counters = new Counters();
Range r = new Range("jobId\0" + jobId);
ingestScanner.setRange(r);
for (Map.Entry<Key,Value> entry : ingestScanner) {
try {
counters.readFields(ByteStreams.newDataInput(entry.getValue().get()));
} catch (IOException e) {
System.err.println("Error parsing counters for job " + jobId);
e.printStackTrace(System.err); // Called from main
// ignore for now -- bad counters so we'll just return partial/empty ones
}
processedJobs.add(jobId);
break;
}
if (!processedJobs.contains(jobId)) {
System.err.println("Couldn't find ingest counters for job " + jobId);
processedJobs.add(jobId);
}
return counters;
}
@Override
public Value getTopValue() {
Value topValue = super.getTopValue();
if (topValue == null) {
return null;
} else {
try {
Counters counters = new Counters();
counters.readFields(ByteStreams.newDataInput(topValue.get()));
String json = gson.toJson(counters);
return new Value(json.getBytes());
} catch (IOException e) {
log.debug("Unable to parse value for key " + getTopKey() + " as a counters", e);
return topValue;
}
}
}
@Override
public Entry<String,Counters> next() {
Counters cntrs = new Counters();
ByteArrayInputStream input = new ByteArrayInputStream(getNextCounterData());
DataInputStream dataInput = new DataInputStream(input);
try {
cntrs.readFields(dataInput);
} catch (IOException e) {
throw new RuntimeException(e);
}
return Maps.immutableEntry(getNextIdentifier(), cntrs);
}
protected void run(String[] args) throws ParseException, AccumuloSecurityException, AccumuloException, TableNotFoundException, IOException {
parseConfig(args);
ZooKeeperInstance instance = new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zookeepers));
Connector connector = instance.getConnector(username, new PasswordToken(password));
Authorizations auths = connector.securityOperations().getUserAuthorizations(connector.whoami());
try (BatchWriter writer = connector.createBatchWriter(tableName, new BatchWriterConfig().setMaxWriteThreads(bwThreads).setMaxMemory(bwMemory)
.setMaxLatency(60, TimeUnit.SECONDS));
BatchScanner scanner = connector.createBatchScanner(tableName, auths, bsThreads)) {
scanner.setRanges(ranges);
for (Entry<Key,Value> entry : scanner) {
Key key = entry.getKey();
ByteArrayDataInput in = ByteStreams.newDataInput(entry.getValue().get());
Counters counters = new Counters();
try {
counters.readFields(in);
} catch (IOException e) {
// The IO exception means the counters are in the wrong format. We *assume* that they are in
// the old (CDH3) format, and de-serialize according to that, and re-write the key with the new value.
in = ByteStreams.newDataInput(entry.getValue().get());
int numGroups = in.readInt();
while (numGroups-- > 0) {
String groupName = Text.readString(in);
String groupDisplayName = Text.readString(in);
CounterGroup group = counters.addGroup(groupName, groupDisplayName);
int groupSize = WritableUtils.readVInt(in);
for (int i = 0; i < groupSize; i++) {
String counterName = Text.readString(in);
String counterDisplayName = counterName;
if (in.readBoolean())
counterDisplayName = Text.readString(in);
long value = WritableUtils.readVLong(in);
group.addCounter(counterName, counterDisplayName, value);
}
}
ByteArrayDataOutput out = ByteStreams.newDataOutput();
counters.write(out);
Mutation m = new Mutation(key.getRow());
m.put(key.getColumnFamily(), key.getColumnQualifier(), key.getColumnVisibilityParsed(), key.getTimestamp() + 1,
new Value(out.toByteArray()));
writer.addMutation(m);
}
}
}
}