下面列出了怎么用org.apache.hadoop.util.ToolRunner的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* checks fsck with two missing file blocks in same stripe
*/
@Test
public void test2FileBlocksMissingInSameStripe()
throws Exception {
LOG.info("test2FileBlocksMissingInSameStripe");
setUp(false);
waitUntilCorruptFileCount(dfs, 0);
removeFileBlock(FILE_PATH0, 1, 1);
waitUntilCorruptFileCount(dfs, 1);
removeFileBlock(FILE_PATH0, 1, 0);
waitUntilCorruptFileCount(dfs, 1);
ToolRunner.run(shell, args);
int result = shell.getCorruptCount();
assertTrue("fsck should return 1, but returns " +
Integer.toString(result), result == 1);
}
@Test
public void testRestoreDriverHelp() throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
System.setOut(new PrintStream(baos));
String[] args = new String[] { "-help" };
ToolRunner.run(conf, new RestoreDriver(), args);
String output = baos.toString();
System.out.println(baos.toString());
assertTrue(output.indexOf(USAGE_RESTORE) >= 0);
assertTrue(output.indexOf(BackupRestoreConstants.OPTION_TABLE_LIST_DESC) > 0);
baos = new ByteArrayOutputStream();
System.setOut(new PrintStream(baos));
args = new String[] { "-h" };
ToolRunner.run(conf, new RestoreDriver(), args);
output = baos.toString();
System.out.println(baos.toString());
assertTrue(output.indexOf(USAGE_RESTORE) >= 0);
assertTrue(output.indexOf(BackupRestoreConstants.OPTION_TABLE_LIST_DESC) > 0);
}
/**
* This is the main method for executing mrgeo commands. All commands come through this method.
* <p/>
* Instead of returning an integer denoting return status. This method uses
* {@link System#exit(int)} for the return status.
*
* @param args String[] Command line arguments
*/
public static void main(String[] args)
{
Configuration conf = HadoopUtils.createConfiguration();
int res = 0;
try
{
res = ToolRunner.run(conf, new MrGeo(), args);
}
catch (Exception e)
{
log.error("Exception thrown", e);
System.exit(-1);
}
System.exit(res);
}
public static void main(String[] args) {
TraceBuilder builder = new TraceBuilder();
int result = RUN_METHOD_FAILED_EXIT_CODE;
try {
result = ToolRunner.run(builder, args);
} catch (Throwable t) {
t.printStackTrace(System.err);
} finally {
try {
builder.finish();
} finally {
if (result == 0) {
return;
}
System.exit(result);
}
}
}
@Test
public void testWCCDefaults() throws Exception {
String[] expected = {
"-D", "giraph.useSuperstepCounters=false", "-D", "giraph.zkList=localhost:2181", "com.soteradefense.dga.wcc.WeaklyConnectedComponentComputation", "-eip", "/input/", "-op", "/output/", "-eof",
"com.soteradefense.dga.io.formats.DGAEdgeTTTOutputFormat", "-eif", "com.soteradefense.dga.io.formats.DGATextEdgeValueInputFormat", "-w", "1", "-ca", "write.vertex.value=true", "-ca", "mapred.task.timeout=600000"};
PowerMockito.mockStatic(ToolRunner.class);
String[] args = {"libs/","wcc", "/input/", "/output/"};
GiraphRunner runner = mock(GiraphRunner.class);
DGAConfiguration conf = new DGAConfiguration();
whenNew(GiraphRunner.class).withNoArguments().thenReturn(runner);
whenNew(DGAConfiguration.class).withNoArguments().thenReturn(conf);
stub(method(ToolRunner.class, "run", runner.getClass(), String[].class)).toReturn(0);
stub(method(System.class, "exit")).toReturn(0);
DGARunner.main(args);
for (String s : conf.convertToCommandLineArguments(WeaklyConnectedComponentComputation.class.getCanonicalName())) {
boolean hasValue = false;
int i = 0;
while (!hasValue && i < expected.length) {
hasValue = s.equals(expected[i]);
i++;
}
assertTrue(hasValue);
}
}
private void execute(String [] args, String namenode) {
FsShell shell=new FsShell();
FileSystem fs=null;
try {
ToolRunner.run(shell, args);
fs = new DistributedFileSystem(NameNode.getAddress(namenode),
shell.getConf());
assertTrue("Directory does not get created",
fs.isDirectory(new Path("/data")));
fs.delete(new Path("/data"), true);
} catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
} finally {
if (fs!=null) {
try {
fs.close();
} catch (IOException ignored) {
}
}
}
}
/**
* Corrupt blocks in all stripes
*/
@Test
public void testCorruptionInAllStripes() throws Exception {
LOG.info("testCorruptionInAllStripes");
int rsParityLength = 3;
setUpCluster(rsParityLength);
TestRaidShellFsck.waitUntilCorruptFileCount(dfs, 0);
LOG.info(" Corrupt 2 blocks of source file in stripe 0, 1, 2 ");
removeAndReportBlock(cluster, srcStats[0], new int[]{0, 1});
removeAndReportBlock(cluster, srcStats[1], new int[]{1});
removeAndReportBlock(cluster, srcStats[2], new int[]{2, 3});
TestRaidShellFsck.waitUntilCorruptFileCount(dfs, 3);
LOG.info(" Corrupt 2 blocks of parity file in stripe 0, 2 ");
removeAndReportBlock(cluster, parityStat, new int[]{0, 1, 6, 7});
TestRaidShellFsck.waitUntilCorruptFileCount(dfs, 4);
ToolRunner.run(shell, args);
int result = shell.getCorruptCount();
// the second file is not recoverable
assertEquals("fsck should return 2", 2, result);
}
public static void main(String[] args) throws Exception {
NNThroughputBenchmark bench = null;
try {
bench = new NNThroughputBenchmark(new HdfsConfiguration());
ToolRunner.run(bench, args);
} finally {
if(bench != null)
bench.close();
}
}
public static void main(String[] args) throws Exception {
// -files option is also used by GenericOptionsParser
// Make sure that is not the first argument for fsck
int res = -1;
if ((args.length == 0) || ("-files".equals(args[0]))) {
printUsage(System.err);
ToolRunner.printGenericCommandUsage(System.err);
} else if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) {
res = 0;
} else {
res = ToolRunner.run(new DFSck(new HdfsConfiguration()), args);
}
System.exit(res);
}
public static void main(String[] args) throws Exception {
FilterLinesByWord fl = new FilterLinesByWord(true);
int status = ToolRunner.run(new Configuration(), fl, args);
if (fl.exitOnCompletion) {
System.exit(status);
}
}
/** copy files from dfs file system to local file system */
public void testCopyFromDfsToLocal() throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem hdfs = cluster.getFileSystem();
final String namenode = FileSystem.getDefaultUri(conf).toString();
if (namenode.startsWith("hdfs://")) {
MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
ToolRunner.run(new DistCpV1(conf), new String[] {
"-log",
"/logs",
namenode+"/srcdat",
"file:///"+TEST_ROOT_DIR+"/destdat"});
assertTrue("Source and destination directories do not match.",
checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files));
assertTrue("Log directory does not exist.",
hdfs.exists(new Path("/logs")));
deldir(localfs, TEST_ROOT_DIR+"/destdat");
deldir(hdfs, "/logs");
deldir(hdfs, "/srcdat");
}
} finally {
if (cluster != null) { cluster.shutdown(); }
}
}
@Test(timeout=20000)
public void testBenchmarkWithProto() throws Exception {
int rc = ToolRunner.run(new RPCCallBenchmark(),
new String[] {
"--clientThreads", "30",
"--serverThreads", "30",
"--time", "5",
"--serverReaderThreads", "4",
"--messageSize", "1024",
"--engine", "protobuf"});
assertEquals(0, rc);
}
static int printUsage() {
System.out.println("sampler -r <reduces>\n" +
" [-inFormat <input format class>]\n" +
" [-keyClass <map input & output key class>]\n" +
" [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
"// Sample from random splits at random (general)\n" +
" -splitSample <numSamples> <maxsplits> | " +
" // Sample from first records in splits (random data)\n"+
" -splitInterval <double pcnt> <maxsplits>]" +
" // Sample from splits at intervals (sorted data)");
System.out.println("Default sampler: -splitRandom 0.1 10000 10");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
@Test public void testGetTheMedian() throws Exception {
String args[] = new String[2];
args[0] = INPUT;
args[1] = MEDIAN_OUTPUT;
WordMedian wm = new WordMedian();
ToolRunner.run(new Configuration(), wm, args);
double median = wm.getMedian();
// outputs MUST match
WordMedianReader wr = new WordMedianReader();
assertEquals(median, wr.read(INPUT), 0.0);
}
private Path doCopyAndTest(FsShell shell, Path dest, Path src,
String cpArgs, int expectedExitCode) throws Exception {
final Path target = new Path(dest, "targetfile" +
counter.getAndIncrement());
final String[] argv = cpArgs == null ?
new String[] { "-cp", src.toUri().toString(),
target.toUri().toString() } :
new String[] { "-cp", cpArgs, src.toUri().toString(),
target.toUri().toString() };
final int ret = ToolRunner.run(shell, argv);
assertEquals("cp -p is not working", expectedExitCode, ret);
return target;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
initConf(conf);
IntegrationTestingUtility.setUseDistributedCluster(conf);
int status = ToolRunner.run(conf, new IntegrationTestMobCompaction(), args);
System.exit(status);
}
/**
* verify that -delete option works for other {@link FileSystem}
* implementations. See MAPREDUCE-1285 */
public void testDeleteLocal() throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem hdfs = cluster.getFileSystem();
final String namenode = FileSystem.getDefaultUri(conf).toString();
if (namenode.startsWith("hdfs://")) {
MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
String destdir = TEST_ROOT_DIR + "/destdat";
MyFile[] localFiles = createFiles(localfs, destdir);
ToolRunner.run(new DistCpV1(conf), new String[] {
"-delete",
"-update",
"-log",
"/logs",
namenode+"/srcdat",
"file:///"+TEST_ROOT_DIR+"/destdat"});
assertTrue("Source and destination directories do not match.",
checkFiles(localfs, destdir, files));
assertTrue("Log directory does not exist.",
hdfs.exists(new Path("/logs")));
deldir(localfs, destdir);
deldir(hdfs, "/logs");
deldir(hdfs, "/srcdat");
}
} finally {
if (cluster != null) { cluster.shutdown(); }
}
}
/**
* Finalize one namenode with its configuration
* @param nn
* @param conf
* @throws Exception
*/
public void finalizeNameNode(NameNode nn, Configuration conf) throws Exception{
if (nn == null) {
throw new IllegalStateException("Attempting to finalize "
+ "Namenode but it is not running");
}
ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"});
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
int status = ToolRunner.run(conf, new FilterLinesByWordOneToOne(),
otherArgs);
System.exit(status);
}
@Test
public void testStatsTargetPartial() throws Exception {
final HBaseSail sail = new HBaseSail(HBaseServerTestInstance.getInstanceConfig(), "statsTable3", true, -1, true, 0, null, null);
sail.initialize();
try (InputStream ref = HalyardStatsTest.class.getResourceAsStream("testData.trig")) {
RDFParser p = Rio.createParser(RDFFormat.TRIG);
p.setPreserveBNodeIDs(true);
p.setRDFHandler(new AbstractRDFHandler() {
@Override
public void handleStatement(Statement st) throws RDFHandlerException {
sail.addStatement(st.getSubject(), st.getPredicate(), st.getObject(), st.getContext());
}
}).parse(ref, "");
}
sail.commit();
sail.close();
File root = File.createTempFile("test_stats", "");
root.delete();
root.mkdirs();
assertEquals(0, ToolRunner.run(HBaseServerTestInstance.getInstanceConfig(), new HalyardStats(),
new String[]{"-s", "statsTable3", "-t", root.toURI().toURL().toString() + "stats{0}.trig", "-r", "100", "-g", "http://whatever/myStats", "-c", "http://whatever/graph0"}));
File stats = new File(root, "stats0.trig");
assertTrue(stats.isFile());
try (InputStream statsStream = new FileInputStream(stats)) {
try (InputStream refStream = HalyardStatsTest.class.getResourceAsStream("testStatsTargetPartial.trig")) {
Model statsM = Rio.parse(statsStream, "", RDFFormat.TRIG, new ParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true), SimpleValueFactory.getInstance(), new ParseErrorLogger());
Model refM = Rio.parse(refStream, "", RDFFormat.TRIG, new ParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true), SimpleValueFactory.getInstance(), new ParseErrorLogger(), SimpleValueFactory.getInstance().createIRI("http://whatever/myStats"));
assertEqualModels(refM, statsM);
}
}
}
static int printUsage() {
System.out.println("randomtextwriter "
+ "[-outFormat <output format class>] "
+ "<output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
private static void runRandomWriter(JobConf job, Path sortInput)
throws Exception {
// Scale down the default settings for RandomWriter for the test-case
// Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP -> 1MB
job.setInt("test.randomwrite.bytes_per_map", RW_BYTES_PER_MAP);
job.setInt("test.randomwriter.maps_per_host", RW_MAPS_PER_HOST);
String[] rwArgs = {sortInput.toString()};
// Run RandomWriter
assertEquals(ToolRunner.run(job, new RandomWriter(), rwArgs), 0);
}
@Test
public void testsJob8D() throws Exception {
String input = "src/test/resources/data/base_cuboid/";
String output = "target/test-output/8d_cuboid";
String cubeName = "test_kylin_cube_with_slr_1_new_segment";
String segmentName = "20130331080000_20131212080000";
String jobname = "8d_cuboid";
String level = "1";
FileUtil.fullyDelete(new File(output));
String[] args = { "-input", input, "-cubename", cubeName, "-segmentname", segmentName, "-output", output, "-jobname", jobname, "-level", level };
assertEquals("Job failed", 0, ToolRunner.run(conf, new NDCuboidJob(), args));
}
/**
* checks fsck with missing block in file block but not in parity block
*/
@Test
public void testFileBlockMissing() throws Exception {
LOG.info("testFileBlockMissing");
setUp(false);
waitUntilCorruptFileCount(dfs, 0);
removeFileBlock(FILE_PATH0, 0, 0);
//corrupting two source blocks in the same stripe
removeFileBlock(FILE_PATH1, 0, 0);
removeFileBlock(FILE_PATH1, 0, 2);
waitUntilCorruptFileCount(dfs, 2);
ToolRunner.run(shell, args);
int result = shell.getCorruptCount();
int limit= Codec.getCodec("rs").stripeLength+Codec.getCodec("rs").parityLength;
long[] result2 = new long[limit];
for (int i=0; i<limit;i++) {
System.err.println("Reading the resulting array");
result2[i]=shell.getStrpMissingBlks(i);
}
assertTrue("Assertion1: New fsck should return 1, but returns " +
Long.toString(result2[0]), result2[0] == 1);
assertTrue("Assertion2: New fsck should return 1, but returns " +
Long.toString(result2[1]), result2[1] == 1);
assertTrue("fsck should return 0, but returns " +
Integer.toString(result), result == 0);
}
/**
* Verify the data described by <code>simple_tsv</code> matches
* <code>simple_expected</code>.
*/
protected void doLoadIncrementalHFiles(Path hfiles, TableName tableName)
throws Exception {
String[] args = { hfiles.toString(), tableName.getNameAsString() };
LOG.info(format("Running LoadIncrememntalHFiles with args: %s", Arrays.asList(args)));
assertEquals("Loading HFiles failed.", 0,
ToolRunner.run(new BulkLoadHFilesTool(getConf()), args));
Table table = null;
Scan scan = new Scan() {{
setCacheBlocks(false);
setCaching(1000);
}};
try {
table = util.getConnection().getTable(tableName);
Iterator<Result> resultsIt = table.getScanner(scan).iterator();
Iterator<KeyValue> expectedIt = simple_expected.iterator();
while (resultsIt.hasNext() && expectedIt.hasNext()) {
Result r = resultsIt.next();
for (Cell actual : r.rawCells()) {
assertTrue(
"Ran out of expected values prematurely!",
expectedIt.hasNext());
KeyValue expected = expectedIt.next();
assertEquals("Scan produced surprising result", 0,
CellComparator.getInstance().compare(expected, actual));
}
}
assertFalse("Did not consume all expected values.", expectedIt.hasNext());
assertFalse("Did not consume all scan results.", resultsIt.hasNext());
} finally {
if (null != table) table.close();
}
}
@Test
public void testLouvainDefaults() throws Exception {
String[] expected = {"-D", "giraph.useSuperstepCounters=false", "-D", "giraph.zkList=localhost:2181", "com.soteradefense.dga.louvain.giraph.LouvainComputation", "-eip", "/input/", "-op", "/output/giraph_0", "-vof", "com.soteradefense.dga.io.formats.LouvainVertexOutputFormat", "-eif", "com.soteradefense.dga.io.formats.DGALongEdgeValueInputFormat", "-mc", "com.soteradefense.dga.louvain.giraph.LouvainMasterCompute", "-esd", "/output/giraph_0", "-w", "1", "-ca", "minimum.progress=2000", "-ca", "progress.tries=1", "-ca", "mapred.task.timeout=600000", "-ca", "actual.Q.aggregators=1"};
PowerMockito.mockStatic(ToolRunner.class);
PowerMockito.mockStatic(Job.class);
Job job = mock(Job.class);
GiraphRunner runner = mock(GiraphRunner.class);
Configuration hadoopConf = mock(Configuration.class);
FileSystem fs = mock(FileSystem.class);
String[] args = {"libs/","louvain", "/input/", "/output/"};
DGAConfiguration conf = new DGAConfiguration();
whenNew(GiraphRunner.class).withNoArguments().thenReturn(runner);
whenNew(DGAConfiguration.class).withNoArguments().thenReturn(conf);
whenNew(FileSystem.class).withAnyArguments().thenReturn(fs);
whenNew(Configuration.class).withNoArguments().thenReturn(hadoopConf);
stub(method(ToolRunner.class, "run", runner.getClass(), String[].class)).toReturn(1);
whenNew(Job.class).withAnyArguments().thenReturn(job);
stub(method(Job.class, "getInstance", Configuration.class)).toReturn(job);
stub(method(Job.class, "waitForCompletion", boolean.class)).toReturn(0);
stub(method(System.class, "exit")).toReturn(0);
stub(method(FileSystem.class, "get", hadoopConf.getClass())).toReturn(fs);
stub(method(FileSystem.class, "exists", Path.class)).toReturn(true);
stub(method(FileSystem.class, "isFile", Path.class)).toReturn(true);
DGARunner.main(args);
for (String s : conf.convertToCommandLineArguments(LouvainComputation.class.getCanonicalName())) {
boolean hasValue = false;
int i = 0;
while (!hasValue && i < expected.length) {
hasValue = s.equals(expected[i]);
i++;
}
assertTrue(hasValue);
}
}
public static void main(String[] args) throws Exception
{
// Start the LogCountsPerHour MapReduce application
int res = ToolRunner.run(new Configuration(), new LogCountsPerHour(), args);
System.exit(res);
}
private void runSleepJob(JobConf conf) throws Exception {
String[] args = { "-m", "3", "-r", "1", "-mt", "3000", "-rt", "1000" };
ToolRunner.run(conf, new SleepJob(), args);
}
/**
* Simple end-to-end test
* @throws Exception
*/
@Test
public void testWALPlayer() throws Exception {
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
final byte[] FAMILY = Bytes.toBytes("family");
final byte[] COLUMN1 = Bytes.toBytes("c1");
final byte[] COLUMN2 = Bytes.toBytes("c2");
final byte[] ROW = Bytes.toBytes("row");
Table t1 = TEST_UTIL.createTable(tableName1, FAMILY);
Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);
// put a row into the first table
Put p = new Put(ROW);
p.addColumn(FAMILY, COLUMN1, COLUMN1);
p.addColumn(FAMILY, COLUMN2, COLUMN2);
t1.put(p);
// delete one column
Delete d = new Delete(ROW);
d.addColumns(FAMILY, COLUMN1);
t1.delete(d);
// replay the WAL, map table 1 to table 2
WAL log = cluster.getRegionServer(0).getWAL(null);
log.rollWriter();
String walInputDir = new Path(cluster.getMaster().getMasterFileSystem()
.getWALRootDir(), HConstants.HREGION_LOGDIR_NAME).toString();
Configuration configuration= TEST_UTIL.getConfiguration();
WALPlayer player = new WALPlayer(configuration);
String optionName="_test_.name";
configuration.set(optionName, "1000");
player.setupTime(configuration, optionName);
assertEquals(1000,configuration.getLong(optionName,0));
assertEquals(0, ToolRunner.run(configuration, player,
new String[] {walInputDir, tableName1.getNameAsString(),
tableName2.getNameAsString() }));
// verify the WAL was player into table 2
Get g = new Get(ROW);
Result r = t2.get(g);
assertEquals(1, r.size());
assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new RandomWriter(), args);
System.exit(res);
}