下面列出了怎么用org.apache.hadoop.util.Tool的API类实例代码及写法,或者点击链接到github查看源代码。
public static void main(String[] args) {
Tool shell = new SQLCLI(new OzoneConfiguration());
int res = 0;
try {
ToolRunner.run(shell, args);
} catch (Exception ex) {
LOG.error(ex.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Command execution failed", ex);
}
res = 1;
}
System.exit(res);
}
protected static void runJob(Tool job, String[] args) {
try {
int exitCode = ToolRunner.run(job, args);
System.exit(exitCode);
} catch (Exception e) {
e.printStackTrace(System.err);
System.exit(5);
}
}
public static int runTool(Configuration conf, Tool tool, String[] args,
OutputStream out) throws Exception {
PrintStream oldOut = System.out;
PrintStream newOut = new PrintStream(out, true);
try {
System.setOut(newOut);
return ToolRunner.run(conf, tool, args);
} finally {
System.setOut(oldOut);
}
}
public static int runTool(Configuration conf, Tool tool, String[] args,
OutputStream out) throws Exception {
PrintStream oldOut = System.out;
PrintStream newOut = new PrintStream(out, true);
try {
System.setOut(newOut);
return ToolRunner.run(conf, tool, args);
} finally {
System.setOut(oldOut);
}
}
protected static void runJob(Tool job, String[] args) {
try {
int exitCode = ToolRunner.run(job, args);
System.exit(exitCode);
} catch (Exception e) {
e.printStackTrace(System.err);
System.exit(5);
}
}
protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException {
Tool player = new WALPlayer();
// Player reads all files in arbitrary directory structure and creates
// a Map task for each file. We use ';' as separator
// because WAL file names contains ','
String dirs = StringUtils.join(dirPaths, ';');
String jobname = "Incremental_Backup-" + backupId ;
Path bulkOutputPath = getBulkOutputDir();
conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
conf.set(JOB_NAME_CONF_KEY, jobname);
String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
try {
player.setConf(conf);
int result = player.run(playerArgs);
if(result != 0) {
throw new IOException("WAL Player failed");
}
conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
conf.unset(JOB_NAME_CONF_KEY);
} catch (IOException e) {
throw e;
} catch (Exception ee) {
throw new IOException("Can not convert from directory " + dirs
+ " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
}
}
@Override
public int run(String[] args) throws Exception {
if (args.length == 0) {
printUsage();
return AbstractHBaseTool.EXIT_FAILURE;
}
Tool tool;
switch (args[0]) {
case VALIDATE_CP_NAME:
tool = new CoprocessorValidator();
break;
case VALIDATE_DBE_NAME:
tool = new DataBlockEncodingValidator();
break;
case VALIDATE_HFILE:
tool = new HFileContentValidator();
break;
case "-h":
printUsage();
return AbstractHBaseTool.EXIT_FAILURE;
default:
System.err.println("Unknown command: " + args[0]);
printUsage();
return AbstractHBaseTool.EXIT_FAILURE;
}
tool.setConf(getConf());
return tool.run(Arrays.copyOfRange(args, 1, args.length));
}
protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
String[] args, int valueMultiplier) throws Exception {
TableName table = TableName.valueOf(args[args.length - 1]);
Configuration conf = new Configuration(util.getConfiguration());
// populate input file
FileSystem fs = FileSystem.get(conf);
Path inputPath = fs.makeQualified(new Path(util
.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
FSDataOutputStream op = fs.create(inputPath, true);
op.write(Bytes.toBytes(data));
op.close();
LOG.debug(String.format("Wrote test data to file: %s", inputPath));
if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
LOG.debug("Forcing combiner.");
conf.setInt("mapreduce.map.combine.minspills", 1);
}
// run the import
List<String> argv = new ArrayList<>(Arrays.asList(args));
argv.add(inputPath.toString());
Tool tool = new ImportTsv();
LOG.debug("Running ImportTsv with arguments: " + argv);
try {
// Job will fail if observer rejects entries without TTL
assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
} finally {
// Clean up
if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
LOG.debug("Deleting test subdirectory");
util.cleanupDataTestDirOnTestFS(table.getNameAsString());
}
}
return tool;
}
/**
* Run an ImportTsv job and perform basic validation on the results. Returns
* the ImportTsv <code>Tool</code> instance so that other tests can inspect it
* for further validation as necessary. This method is static to insure
* non-reliance on instance's util/conf facilities.
*
* @param args
* Any arguments to pass BEFORE inputFile path is appended.
* @param dataAvailable
* @return The Tool instance used to run the test.
*/
private Tool doMROnTableTest(HBaseTestingUtility util, String family, String data, String[] args,
int valueMultiplier, boolean dataAvailable) throws Exception {
String table = args[args.length - 1];
Configuration conf = new Configuration(util.getConfiguration());
// populate input file
FileSystem fs = FileSystem.get(conf);
Path inputPath = fs.makeQualified(new Path(util.getDataTestDirOnTestFS(table), "input.dat"));
FSDataOutputStream op = fs.create(inputPath, true);
op.write(Bytes.toBytes(data));
op.close();
LOG.debug(String.format("Wrote test data to file: %s", inputPath));
if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
LOG.debug("Forcing combiner.");
conf.setInt("mapreduce.map.combine.minspills", 1);
}
// run the import
List<String> argv = new ArrayList<>(Arrays.asList(args));
argv.add(inputPath.toString());
Tool tool = new ImportTsv();
LOG.debug("Running ImportTsv with arguments: " + argv);
assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
validateTable(conf, TableName.valueOf(table), family, valueMultiplier, dataAvailable);
if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
LOG.debug("Deleting test subdirectory");
util.cleanupDataTestDirOnTestFS(table);
}
return tool;
}
void generateAndLoad(final TableName table) throws Exception {
LOG.info("Running test testGenerateAndLoad.");
String cf = "d";
Path hfiles = new Path(
util.getDataTestDirOnTestFS(table.getNameAsString()), "hfiles");
Map<String, String> args = new HashMap<>();
args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, hfiles.toString());
args.put(ImportTsv.COLUMNS_CONF_KEY,
format("HBASE_ROW_KEY,HBASE_TS_KEY,%s:c1,%s:c2", cf, cf));
// configure the test harness to NOT delete the HFiles after they're
// generated. We need those for doLoadIncrementalHFiles
args.put(TestImportTsv.DELETE_AFTER_LOAD_CONF, "false");
// run the job, complete the load.
util.createTable(table, new String[]{cf});
Tool t = TestImportTsv.doMROnTableTest(util, table, cf, simple_tsv, args);
doLoadIncrementalHFiles(hfiles, table);
// validate post-conditions
validateDeletedPartitionsFile(t.getConf());
// clean up after ourselves.
util.deleteTable(table);
util.cleanupDataTestDirOnTestFS(table.getNameAsString());
LOG.info("testGenerateAndLoad completed successfully.");
}
@Override
public int runTestFromCommandLine() throws Exception {
Tool tool = null;
Loop loop = new VisibilityLoop();
loop.it = this;
tool = loop;
return ToolRunner.run(getConf(), tool, otherArgs);
}
@Override
public int runTestFromCommandLine() throws Exception {
Tool tool = null;
if (toRun.equalsIgnoreCase("Generator")) {
tool = new Generator();
} else if (toRun.equalsIgnoreCase("Verify")) {
tool = new Verify();
} else if (toRun.equalsIgnoreCase("Loop")) {
Loop loop = new Loop();
loop.it = this;
tool = loop;
} else if (toRun.equalsIgnoreCase("Walker")) {
tool = new Walker();
} else if (toRun.equalsIgnoreCase("Print")) {
tool = new Print();
} else if (toRun.equalsIgnoreCase("Delete")) {
tool = new Delete();
} else if (toRun.equalsIgnoreCase("Clean")) {
tool = new Clean();
} else if (toRun.equalsIgnoreCase("Search")) {
tool = new Search();
} else {
usage();
throw new RuntimeException("Unknown arg");
}
return ToolRunner.run(getConf(), tool, otherArgs);
}
protected static void runJob(Tool job, String[] args) {
try {
int exitCode = ToolRunner.run(job, args);
System.exit(exitCode);
} catch (Exception e) {
e.printStackTrace(System.err);
System.exit(5);
}
}
private int runTool(Configuration conf, Tool tool, String[] args, OutputStream out) throws Exception {
PrintStream oldOut = System.out;
PrintStream newOut = new PrintStream(out, true);
try {
System.setOut(newOut);
return ToolRunner.run(conf, tool, args);
} finally {
System.setOut(oldOut);
}
}
private int runTool(Configuration conf, Tool tool, String[] args, OutputStream out) throws Exception {
PrintStream oldOut = System.out;
PrintStream newOut = new PrintStream(out, true);
try {
System.setOut(newOut);
return ToolRunner.run(conf, tool, args);
} finally {
System.setOut(oldOut);
}
}
@Override
protected Tool getTool(PrintStream o) {
return new GetGroupsForTesting(conf, o);
}
public static int runTool(Configuration conf, Tool tool, String[] args,
OutputStream out) throws Exception {
return TestMRJobClient.runTool(conf, tool, args, out);
}
private void runTest(final JobClient jc, final Configuration conf,
final String jobClass, final String[] args, KillTaskThread killTaskThread,
KillTrackerThread killTrackerThread) throws Exception {
Thread t = new Thread("Job Test") {
public void run() {
try {
Class<?> jobClassObj = conf.getClassByName(jobClass);
int status = ToolRunner.run(conf, (Tool)(jobClassObj.newInstance()),
args);
checkJobExitStatus(status, jobClass);
} catch (Exception e) {
LOG.fatal("JOB " + jobClass + " failed to run");
System.exit(-1);
}
}
};
t.setDaemon(true);
t.start();
JobStatus[] jobs;
//get the job ID. This is the job that we just submitted
while ((jobs = jc.jobsToComplete()).length == 0) {
LOG.info("Waiting for the job " + jobClass +" to start");
Thread.sleep(1000);
}
JobID jobId = jobs[jobs.length - 1].getJobID();
RunningJob rJob = jc.getJob(jobId);
if(rJob.isComplete()) {
LOG.error("The last job returned by the querying JobTracker is complete :" +
rJob.getJobID() + " .Exiting the test");
System.exit(-1);
}
while (rJob.getJobState() == JobStatus.PREP) {
LOG.info("JobID : " + jobId + " not started RUNNING yet");
Thread.sleep(1000);
rJob = jc.getJob(jobId);
}
if (killTaskThread != null) {
killTaskThread.setRunningJob(rJob);
killTaskThread.start();
killTaskThread.join();
LOG.info("DONE WITH THE TASK KILL/FAIL TESTS");
}
if (killTrackerThread != null) {
killTrackerThread.setRunningJob(rJob);
killTrackerThread.start();
killTrackerThread.join();
LOG.info("DONE WITH THE TESTS TO DO WITH LOST TASKTRACKERS");
}
t.join();
}
@Override
protected Tool getTool(PrintStream o) {
return new GetGroups(conf, o);
}
/**
* Test running many balancer simultaneously.
*
* Case-1: First balancer is running. Now, running second one should get
* "Another balancer is running. Exiting.." IOException and fail immediately
*
* Case-2: When running second balancer 'balancer.id' file exists but the
* lease doesn't exists. Now, the second balancer should run successfully.
*/
@Test(timeout = 100000)
public void testManyBalancerSimultaneously() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
// add an empty node with half of the capacities(4 * CAPACITY) & the same
// rack
long[] capacities = new long[] { 4 * CAPACITY };
String[] racks = new String[] { RACK0 };
long newCapacity = 2 * CAPACITY;
String newRack = RACK0;
LOG.info("capacities = " + long2String(capacities));
LOG.info("racks = " + Arrays.asList(racks));
LOG.info("newCapacity= " + newCapacity);
LOG.info("newRack = " + newRack);
LOG.info("useTool = " + false);
assertEquals(capacities.length, racks.length);
int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.racks(racks).simulatedCapacities(capacities).build();
try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
long totalCapacity = sum(capacities);
// fill up the cluster to be 30% full
final long totalUsedSpace = totalCapacity * 3 / 10;
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
(short) numOfDatanodes, 0);
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
new long[] { newCapacity });
// Case1: Simulate first balancer by creating 'balancer.id' file. It
// will keep this file until the balancing operation is completed.
FileSystem fs = cluster.getFileSystem(0);
final FSDataOutputStream out = fs
.create(Balancer.BALANCER_ID_PATH, false);
out.writeBytes(InetAddress.getLocalHost().getHostName());
out.hflush();
assertTrue("'balancer.id' file doesn't exist!",
fs.exists(Balancer.BALANCER_ID_PATH));
// start second balancer
final String[] args = { "-policy", "datanode" };
final Tool tool = new Cli();
tool.setConf(conf);
int exitCode = tool.run(args); // start balancing
assertEquals("Exit status code mismatches",
ExitStatus.IO_EXCEPTION.getExitCode(), exitCode);
// Case2: Release lease so that another balancer would be able to
// perform balancing.
out.close();
assertTrue("'balancer.id' file doesn't exist!",
fs.exists(Balancer.BALANCER_ID_PATH));
exitCode = tool.run(args); // start balancing
assertEquals("Exit status code mismatches",
ExitStatus.SUCCESS.getExitCode(), exitCode);
} finally {
cluster.shutdown();
}
}
@Override
protected Tool getTool(PrintStream o) {
return new GetGroups(conf, o);
}
@Override
protected Tool getTool(PrintStream o) {
return new GetGroups(conf, o);
}
@Override
protected Tool getTool(PrintStream o) {
return new GetGroupsForTesting(conf, o);
}
public static int runTool(Configuration conf, Tool tool, String[] args,
OutputStream out) throws Exception {
return TestMRJobClient.runTool(conf, tool, args, out);
}
private void runTest(final JobClient jc, final Configuration conf,
final String jobClass, final String[] args, KillTaskThread killTaskThread,
KillTrackerThread killTrackerThread) throws Exception {
Thread t = new Thread("Job Test") {
public void run() {
try {
Class<?> jobClassObj = conf.getClassByName(jobClass);
int status = ToolRunner.run(conf, (Tool)(jobClassObj.newInstance()),
args);
checkJobExitStatus(status, jobClass);
} catch (Exception e) {
LOG.fatal("JOB " + jobClass + " failed to run");
System.exit(-1);
}
}
};
t.setDaemon(true);
t.start();
JobStatus[] jobs;
//get the job ID. This is the job that we just submitted
while ((jobs = jc.jobsToComplete()).length == 0) {
LOG.info("Waiting for the job " + jobClass +" to start");
Thread.sleep(1000);
}
JobID jobId = jobs[jobs.length - 1].getJobID();
RunningJob rJob = jc.getJob(jobId);
if(rJob.isComplete()) {
LOG.error("The last job returned by the querying JobTracker is complete :" +
rJob.getJobID() + " .Exiting the test");
System.exit(-1);
}
while (rJob.getJobState() == JobStatus.PREP) {
LOG.info("JobID : " + jobId + " not started RUNNING yet");
Thread.sleep(1000);
rJob = jc.getJob(jobId);
}
if (killTaskThread != null) {
killTaskThread.setRunningJob(rJob);
killTaskThread.start();
killTaskThread.join();
LOG.info("DONE WITH THE TASK KILL/FAIL TESTS");
}
if (killTrackerThread != null) {
killTrackerThread.setRunningJob(rJob);
killTrackerThread.start();
killTrackerThread.join();
LOG.info("DONE WITH THE TESTS TO DO WITH LOST TASKTRACKERS");
}
t.join();
}
@Override
protected Tool getTool(PrintStream o) {
return new GetGroups(conf, o);
}
/**
* Test running many balancer simultaneously.
*
* Case-1: First balancer is running. Now, running second one should get
* "Another balancer is running. Exiting.." IOException and fail immediately
*
* Case-2: When running second balancer 'balancer.id' file exists but the
* lease doesn't exists. Now, the second balancer should run successfully.
*/
@Test(timeout = 100000)
public void testManyBalancerSimultaneously() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
// add an empty node with half of the capacities(4 * CAPACITY) & the same
// rack
long[] capacities = new long[] { 4 * CAPACITY };
String[] racks = new String[] { RACK0 };
long newCapacity = 2 * CAPACITY;
String newRack = RACK0;
LOG.info("capacities = " + long2String(capacities));
LOG.info("racks = " + Arrays.asList(racks));
LOG.info("newCapacity= " + newCapacity);
LOG.info("newRack = " + newRack);
LOG.info("useTool = " + false);
assertEquals(capacities.length, racks.length);
int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.racks(racks).simulatedCapacities(capacities).build();
try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
long totalCapacity = sum(capacities);
// fill up the cluster to be 30% full
final long totalUsedSpace = totalCapacity * 3 / 10;
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
(short) numOfDatanodes, 0);
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
new long[] { newCapacity });
// Case1: Simulate first balancer by creating 'balancer.id' file. It
// will keep this file until the balancing operation is completed.
FileSystem fs = cluster.getFileSystem(0);
final FSDataOutputStream out = fs
.create(Balancer.BALANCER_ID_PATH, false);
out.writeBytes(InetAddress.getLocalHost().getHostName());
out.hflush();
assertTrue("'balancer.id' file doesn't exist!",
fs.exists(Balancer.BALANCER_ID_PATH));
// start second balancer
final String[] args = { "-policy", "datanode" };
final Tool tool = new Cli();
tool.setConf(conf);
int exitCode = tool.run(args); // start balancing
assertEquals("Exit status code mismatches",
ExitStatus.IO_EXCEPTION.getExitCode(), exitCode);
// Case2: Release lease so that another balancer would be able to
// perform balancing.
out.close();
assertTrue("'balancer.id' file doesn't exist!",
fs.exists(Balancer.BALANCER_ID_PATH));
exitCode = tool.run(args); // start balancing
assertEquals("Exit status code mismatches",
ExitStatus.SUCCESS.getExitCode(), exitCode);
} finally {
cluster.shutdown();
}
}
@Override
protected Tool getTool(PrintStream o) {
return new GetGroups(conf, o);
}
@Override
protected Tool getTool(PrintStream o) {
return new GetGroups(conf, o);
}
private Tool doMROnTableTest(String data, int valueMultiplier) throws Exception {
return doMROnTableTest(util, tn, FAMILY, data, args, valueMultiplier,-1);
}