下面列出了org.apache.hadoop.mapreduce.Job#getCounters ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public JobMetrics(Job job, String bytesReplicatedKey) {
Builder<String, Long> builder = ImmutableMap.builder();
if (job != null) {
Counters counters;
try {
counters = job.getCounters();
} catch (IOException e) {
throw new CircusTrainException("Unable to get counters from job.", e);
}
if (counters != null) {
for (CounterGroup group : counters) {
for (Counter counter : group) {
builder.put(DotJoiner.join(group.getName(), counter.getName()), counter.getValue());
}
}
}
}
metrics = builder.build();
Long bytesReplicatedValue = metrics.get(bytesReplicatedKey);
if (bytesReplicatedValue != null) {
bytesReplicated = bytesReplicatedValue;
} else {
bytesReplicated = 0L;
}
}
protected void verifySleepJobCounters(Job job) throws InterruptedException,
IOException {
Counters counters = job.getCounters();
Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
.getValue());
Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
.getValue());
Assert.assertEquals(numSleepReducers,
counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
Assert
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
&& counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
Assert
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
&& counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
}
/**
* Tests an index where the index pk is correct (indexed col values are indexed correctly), but
* a covered index value is incorrect. Scrutiny should report the invalid row
*/
@Test public void testCoveredValueIncorrect() throws Exception {
// insert one valid row
upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
conn.commit();
// disable index and insert another data row
disableIndex();
upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
conn.commit();
// insert a bad index row for the above data row
upsertIndexRow("name-2", 2, 9999);
conn.commit();
// scrutiny should report the bad row
List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
Job job = completedJobs.get(0);
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT));
}
/**
* Tests an index with the same # of rows as the data table, but one of the index rows is
* incorrect Scrutiny should report the invalid rows.
*/
@Test public void testEqualRowCountIndexIncorrect() throws Exception {
// insert one valid row
upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
conn.commit();
// disable the index and insert another row which is not indexed
disableIndex();
upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
conn.commit();
// insert a bad row into the index
upsertIndexRow("badName", 2, 9999);
conn.commit();
// scrutiny should report the bad row
List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
Job job = completedJobs.get(0);
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
}
/**
* Tests when there are more data table rows than index table rows Scrutiny should report the
* number of incorrect rows
*/
@Test public void testMoreDataRows() throws Exception {
upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
conn.commit();
disableIndex();
// these rows won't have a corresponding index row
upsertRow(dataTableUpsertStmt, 2, "name-2", 95124);
upsertRow(dataTableUpsertStmt, 3, "name-3", 95125);
conn.commit();
List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName);
Job job = completedJobs.get(0);
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
}
@Override
public TimingResult call() throws Exception {
PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
PerformanceEvaluation.checkTable(admin, opts);
PerformanceEvaluation.RunResult results[] = null;
long numRows = opts.totalRows;
long elapsedTime = 0;
if (opts.nomapred) {
results = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
for (PerformanceEvaluation.RunResult r : results) {
elapsedTime = Math.max(elapsedTime, r.duration);
}
} else {
Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
Counters counters = job.getCounters();
numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
}
return new TimingResult(numRows, elapsedTime, results);
}
private boolean runMrWithLookup(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table,
Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot)
throws ClassNotFoundException, IOException, InterruptedException {
PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot,
fileCache);
Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
ExistingDataIndexLookupMapper.setSnapshot(job, MRUPDATE_SNAPSHOT);
FileInputFormat.addInputPath(job, result._partitionedInputData);
MultipleInputs.addInputPath(job, result._partitionedInputData, SequenceFileInputFormat.class,
ExistingDataIndexLookupMapper.class);
for (Path p : inprogressPathList) {
FileInputFormat.addInputPath(job, p);
MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, NewDataMapper.class);
}
BlurOutputFormat.setOutputPath(job, outputPath);
BlurOutputFormat.setupJob(job, descriptor);
job.setReducerClass(UpdateReducer.class);
job.setMapOutputKeyClass(IndexKey.class);
job.setMapOutputValueClass(IndexValue.class);
job.setPartitionerClass(IndexKeyPartitioner.class);
job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
boolean success = job.waitForCompletion(true);
Counters counters = job.getCounters();
LOG.info("Counters [" + counters + "]");
return success;
}
@Override
public int run(String[] args) throws Exception {
String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
if (!doCommandLine(otherArgs)) {
return 1;
}
Job job = createSubmittableJob(otherArgs);
if (!job.waitForCompletion(true)) {
LOG.info("Map-reduce job failed!");
return 1;
}
counters = job.getCounters();
return 0;
}
/**
* Tests global view on multi-tenant table should work too
**/
@Test public void testGlobalViewOnMultiTenantTable() throws Exception {
String globalViewName = generateUniqueName();
String indexNameGlobal = generateUniqueName();
connGlobal.createStatement()
.execute(String.format(createViewStr, globalViewName, multiTenantTable));
String idxStmtGlobal = String.format(createIndexStr, indexNameGlobal, globalViewName);
connGlobal.createStatement().execute(idxStmtGlobal);
connGlobal.createStatement()
.execute(String.format(upsertQueryStr, globalViewName, "global", 5, "x"));
connGlobal.commit();
String[]
argValues =
getArgValues("", globalViewName, indexNameGlobal, 10L, SourceTable.INDEX_TABLE_SOURCE, false, null, null, null,
EnvironmentEdgeManager.currentTimeMillis());
List<Job> completedJobs = runScrutiny(argValues);
// Sunny case, both index and view are equal. 1 row
assertEquals(1, completedJobs.size());
for (Job job : completedJobs) {
assertTrue(job.isSuccessful());
Counters counters = job.getCounters();
assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
}
}
protected void verifyRandomWriterCounters(Job job)
throws InterruptedException, IOException {
Counters counters = job.getCounters();
Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
.getValue());
Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
.getValue());
Assert
.assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
&& counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
}
@Override
protected void verifyFailingMapperCounters(Job job)
throws InterruptedException, IOException {
Counters counters = job.getCounters();
super.verifyFailingMapperCounters(job);
Assert.assertEquals(2,
counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)
.getValue());
Assert.assertEquals(2, counters
.findCounter(JobCounter.NUM_FAILED_UBERTASKS).getValue());
}
@Override
protected void verifySleepJobCounters(Job job) throws InterruptedException,
IOException {
Counters counters = job.getCounters();
super.verifySleepJobCounters(job);
Assert.assertEquals(3,
counters.findCounter(JobCounter.NUM_UBER_SUBMAPS).getValue());
Assert.assertEquals(numSleepReducers,
counters.findCounter(JobCounter.NUM_UBER_SUBREDUCES).getValue());
Assert.assertEquals(3 + numSleepReducers,
counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
}
@Override
public int runJob() throws Exception {
final boolean job1Success = (super.runJob() == 0);
Assert.assertTrue(job1Success);
// after the first job there should be a sequence file with the
// filtered results which should match the expected results
// resources
final Job job = Job.getInstance(super.getConf());
final Configuration conf = job.getConfiguration();
MapReduceTestUtils.filterConfiguration(conf);
final ByteBuffer buf = ByteBuffer.allocate((8 * expectedResults.hashedCentroids.size()) + 4);
buf.putInt(expectedResults.hashedCentroids.size());
for (final Long hashedCentroid : expectedResults.hashedCentroids) {
buf.putLong(hashedCentroid);
}
conf.set(
MapReduceTestUtils.EXPECTED_RESULTS_KEY,
ByteArrayUtils.byteArrayToString(buf.array()));
GeoWaveInputFormat.setStoreOptions(conf, dataStoreOptions);
job.setJarByClass(this.getClass());
job.setJobName("GeoWave Test (" + dataStoreOptions.getGeoWaveNamespace() + ")");
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(VerifyExpectedResultsMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
job.setSpeculativeExecution(false);
FileInputFormat.setInputPaths(job, getHdfsOutputPath());
final boolean job2success = job.waitForCompletion(true);
final Counters jobCounters = job.getCounters();
final Counter expectedCnt = jobCounters.findCounter(ResultCounterType.EXPECTED);
Assert.assertNotNull(expectedCnt);
Assert.assertEquals(expectedResults.count, expectedCnt.getValue());
final Counter errorCnt = jobCounters.findCounter(ResultCounterType.ERROR);
if (errorCnt != null) {
Assert.assertEquals(0L, errorCnt.getValue());
}
final Counter unexpectedCnt = jobCounters.findCounter(ResultCounterType.UNEXPECTED);
if (unexpectedCnt != null) {
Assert.assertEquals(0L, unexpectedCnt.getValue());
}
return job2success ? 0 : 1;
}
@Override
public int run(String[] args) throws Exception {
int c = 0;
if (args.length < 5) {
System.err
.println("Usage Driver <table> <mr inc working path> <output path> <zk connection> <reducer multipler> <extra config files...>");
return 1;
}
String table = args[c++];
String mrIncWorkingPathStr = args[c++];
String outputPathStr = args[c++];
String blurZkConnection = args[c++];
int reducerMultipler = Integer.parseInt(args[c++]);
for (; c < args.length; c++) {
String externalConfigFileToAdd = args[c];
getConf().addResource(new Path(externalConfigFileToAdd));
}
Path outputPath = new Path(outputPathStr);
Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
FileSystem fileSystem = mrIncWorkingPath.getFileSystem(getConf());
Path newData = new Path(mrIncWorkingPath, NEW);
Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS);
Path completeData = new Path(mrIncWorkingPath, COMPLETE);
Path fileCache = new Path(mrIncWorkingPath, CACHE);
fileSystem.mkdirs(newData);
fileSystem.mkdirs(inprogressData);
fileSystem.mkdirs(completeData);
fileSystem.mkdirs(fileCache);
List<Path> srcPathList = new ArrayList<Path>();
for (FileStatus fileStatus : fileSystem.listStatus(newData)) {
srcPathList.add(fileStatus.getPath());
}
if (srcPathList.isEmpty()) {
return 0;
}
List<Path> inprogressPathList = new ArrayList<Path>();
boolean success = false;
Iface client = null;
try {
inprogressPathList = movePathList(fileSystem, inprogressData, srcPathList);
Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
waitForOtherSnapshotsToBeRemoved(client, table, MRUPDATE_SNAPSHOT);
client.createSnapshot(table, MRUPDATE_SNAPSHOT);
TableDescriptor descriptor = client.describe(table);
Path tablePath = new Path(descriptor.getTableUri());
BlurInputFormat.setLocalCachePath(job, fileCache);
BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, MapperForExistingData.class);
for (Path p : inprogressPathList) {
FileInputFormat.addInputPath(job, p);
MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewData.class);
}
BlurOutputFormat.setOutputPath(job, outputPath);
BlurOutputFormat.setupJob(job, descriptor);
job.setReducerClass(UpdateReducer.class);
job.setMapOutputKeyClass(IndexKey.class);
job.setMapOutputValueClass(IndexValue.class);
job.setPartitionerClass(IndexKeyPartitioner.class);
job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
success = job.waitForCompletion(true);
Counters counters = job.getCounters();
LOG.info("Counters [" + counters + "]");
} finally {
if (success) {
LOG.info("Indexing job succeeded!");
movePathList(fileSystem, completeData, inprogressPathList);
} else {
LOG.error("Indexing job failed!");
movePathList(fileSystem, newData, inprogressPathList);
}
if (client != null) {
client.removeSnapshot(table, MRUPDATE_SNAPSHOT);
}
}
if (success) {
return 0;
} else {
return 1;
}
}
@Test
public void testBlurOutputFormatOverFlowTest() throws IOException, InterruptedException, ClassNotFoundException {
Path input = getInDir();
Path output = getOutDir();
_fileSystem.delete(input, true);
_fileSystem.delete(output, true);
// 1500 * 50 = 75,000
writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
// 100 * 50 = 5,000
writeRecordsFile(new Path(input, "part2"), 1, 50, 2000, 100, "cf1");
Job job = Job.getInstance(_conf, "blur index");
job.setJarByClass(BlurOutputFormatTest.class);
job.setMapperClass(CsvBlurMapper.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, input);
CsvBlurMapper.addColumns(job, "cf1", "col");
Path tablePath = new Path(new Path(_root, "table"), "test");
TableDescriptor tableDescriptor = new TableDescriptor();
tableDescriptor.setShardCount(1);
tableDescriptor.setTableUri(tablePath.toString());
tableDescriptor.setName("test");
createShardDirectories(tablePath, 1);
BlurOutputFormat.setupJob(job, tableDescriptor);
BlurOutputFormat.setOutputPath(job, output);
BlurOutputFormat.setIndexLocally(job, true);
BlurOutputFormat.setOptimizeInFlight(job, false);
assertTrue(job.waitForCompletion(true));
Counters ctrs = job.getCounters();
System.out.println("Counters: " + ctrs);
Path path = new Path(output, ShardUtil.getShardName(0));
Collection<Path> commitedTasks = getCommitedTasks(path);
assertEquals(1, commitedTasks.size());
DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, commitedTasks.iterator().next()));
assertEquals(80000, reader.numDocs());
reader.close();
}
@Test
public void testBlurOutputFormatOverFlowMultipleReducersTest() throws IOException, InterruptedException,
ClassNotFoundException {
Path input = getInDir();
Path output = getOutDir();
_fileSystem.delete(input, true);
_fileSystem.delete(output, true);
// 1500 * 50 = 75,000
writeRecordsFile(new Path(input, "part1"), 1, 50, 1, 1500, "cf1");
// 100 * 50 = 5,000
writeRecordsFile(new Path(input, "part2"), 1, 50, 2000, 100, "cf1");
Job job = Job.getInstance(_conf, "blur index");
job.setJarByClass(BlurOutputFormatTest.class);
job.setMapperClass(CsvBlurMapper.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, input);
CsvBlurMapper.addColumns(job, "cf1", "col");
Path tablePath = new Path(new Path(_root, "table"), "test");
TableDescriptor tableDescriptor = new TableDescriptor();
tableDescriptor.setShardCount(2);
tableDescriptor.setTableUri(tablePath.toString());
tableDescriptor.setName("test");
createShardDirectories(output, 2);
BlurOutputFormat.setupJob(job, tableDescriptor);
BlurOutputFormat.setOutputPath(job, output);
BlurOutputFormat.setIndexLocally(job, false);
BlurOutputFormat.setDocumentBufferStrategy(job, DocumentBufferStrategyHeapSize.class);
BlurOutputFormat.setMaxDocumentBufferHeapSize(job, 128 * 1024);
assertTrue(job.waitForCompletion(true));
Counters ctrs = job.getCounters();
System.out.println("Counters: " + ctrs);
long total = 0;
for (int i = 0; i < tableDescriptor.getShardCount(); i++) {
Path path = new Path(output, ShardUtil.getShardName(i));
Collection<Path> commitedTasks = getCommitedTasks(path);
assertEquals(1, commitedTasks.size());
DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, commitedTasks.iterator().next()));
total += reader.numDocs();
reader.close();
}
assertEquals(80000, total);
}
public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
Stopwatch scanTimer = Stopwatch.createUnstarted();
Scan scan = getScan();
String jobName = "testScanMapReduce";
Job job = new Job(conf);
job.setJobName(jobName);
job.setJarByClass(getClass());
TableMapReduceUtil.initTableMapperJob(
this.tablename,
scan,
MyMapper.class,
NullWritable.class,
NullWritable.class,
job
);
job.setNumReduceTasks(0);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(NullOutputFormat.class);
scanTimer.start();
job.waitForCompletion(true);
scanTimer.stop();
Counters counters = job.getCounters();
long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
System.out.println("HBase scan mapreduce: ");
System.out.println("total time to open scanner: " +
scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
System.out.println("total bytes: " + totalBytes + " bytes ("
+ StringUtils.humanReadableInt(totalBytes) + ")");
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
System.out.println("total rows : " + numRows);
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
System.out.println("total cells : " + numCells);
System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
}
@Test
public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException {
Path input = getInDir();
Path output = getOutDir();
_fileSystem.delete(input, true);
_fileSystem.delete(output, true);
writeRecordsFile(new Path(input, "part1"), 1, 1, 1, 1, "cf1");
writeRecordsFile(new Path(input, "part2"), 1, 1, 2, 1, "cf1");
Job job = Job.getInstance(_conf, "blur index");
job.setJarByClass(BlurOutputFormatTest.class);
job.setMapperClass(CsvBlurMapper.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, input);
CsvBlurMapper.addColumns(job, "cf1", "col");
Path tablePath = new Path(new Path(_root, "table"), "test");
TableDescriptor tableDescriptor = new TableDescriptor();
tableDescriptor.setShardCount(1);
tableDescriptor.setTableUri(tablePath.toString());
tableDescriptor.setName("test");
createShardDirectories(tablePath, 1);
BlurOutputFormat.setupJob(job, tableDescriptor);
BlurOutputFormat.setOutputPath(job, output);
assertTrue(job.waitForCompletion(true));
Counters ctrs = job.getCounters();
System.out.println("Counters: " + ctrs);
Path path = new Path(output, ShardUtil.getShardName(0));
dump(path, _conf);
Collection<Path> commitedTasks = getCommitedTasks(path);
assertEquals(1, commitedTasks.size());
DirectoryReader reader = DirectoryReader.open(new HdfsDirectory(_conf, commitedTasks.iterator().next()));
assertEquals(2, reader.numDocs());
reader.close();
}
@Test
public void testBlurOutputFormat() throws IOException, InterruptedException, ClassNotFoundException, BlurException,
TException {
fileSystem.delete(inDir, true);
String tableName = "testBlurOutputFormat";
writeRecordsFile("in/part1", 1, 1, 1, 1, "cf1");
writeRecordsFile("in/part2", 1, 1, 2, 1, "cf1");
Job job = Job.getInstance(conf, "blur index");
job.setJarByClass(BlurOutputFormatMiniClusterTest.class);
job.setMapperClass(CsvBlurMapper.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
String tableUri = new Path(TEST_ROOT_DIR + "/blur/" + tableName).makeQualified(fileSystem.getUri(),
fileSystem.getWorkingDirectory()).toString();
CsvBlurMapper.addColumns(job, "cf1", "col");
TableDescriptor tableDescriptor = new TableDescriptor();
tableDescriptor.setShardCount(1);
tableDescriptor.setTableUri(tableUri);
tableDescriptor.setName(tableName);
Iface client = getClient();
client.createTable(tableDescriptor);
BlurOutputFormat.setupJob(job, tableDescriptor);
Path output = new Path(TEST_ROOT_DIR + "/out");
BlurOutputFormat.setOutputPath(job, output);
Path tablePath = new Path(tableUri);
Path shardPath = new Path(tablePath, ShardUtil.getShardName(0));
FileStatus[] listStatus = fileSystem.listStatus(shardPath);
System.out.println("======" + listStatus.length);
for (FileStatus fileStatus : listStatus) {
System.out.println(fileStatus.getPath());
}
assertEquals(3, listStatus.length);
assertTrue(job.waitForCompletion(true));
Counters ctrs = job.getCounters();
System.out.println("Counters: " + ctrs);
client.loadData(tableName, output.toString());
while (true) {
TableStats tableStats = client.tableStats(tableName);
System.out.println(tableStats);
if (tableStats.getRowCount() > 0) {
break;
}
Thread.sleep(100);
}
assertTrue(fileSystem.exists(tablePath));
assertFalse(fileSystem.isFile(tablePath));
FileStatus[] listStatusAfter = fileSystem.listStatus(shardPath);
assertEquals(12, listStatusAfter.length);
}
@Test
public void testSpeculativeExecution() throws Exception {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
return;
}
/*------------------------------------------------------------------
* Test that Map/Red does not speculate if MAP_SPECULATIVE and
* REDUCE_SPECULATIVE are both false.
* -----------------------------------------------------------------
*/
Job job = runSpecTest(false, false);
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
Counters counters = job.getCounters();
Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
.getValue());
Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
.getValue());
Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
.getValue());
/*----------------------------------------------------------------------
* Test that Mapper speculates if MAP_SPECULATIVE is true and
* REDUCE_SPECULATIVE is false.
* ---------------------------------------------------------------------
*/
job = runSpecTest(true, false);
succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
counters = job.getCounters();
// The long-running map will be killed and a new one started.
Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
.getValue());
Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
.getValue());
Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
.getValue());
Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_KILLED_MAPS)
.getValue());
/*----------------------------------------------------------------------
* Test that Reducer speculates if REDUCE_SPECULATIVE is true and
* MAP_SPECULATIVE is false.
* ---------------------------------------------------------------------
*/
job = runSpecTest(false, true);
succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
counters = job.getCounters();
// The long-running map will be killed and a new one started.
Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
.getValue());
Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
.getValue());
}