类org.apache.hadoop.mapred.UtilsForTests源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.UtilsForTests的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop-gpu   文件: TestJobTrackerRestart.java
/**
 * Return the job conf configured with the priorities and mappers as passed.
 * @param conf The default conf
 * @param priorities priorities for the jobs
 * @param numMaps number of maps for the jobs
 * @param numReds number of reducers for the jobs
 * @param outputDir output dir
 * @param inDir input dir
 * @param mapSignalFile filename thats acts as a signal for maps
 * @param reduceSignalFile filename thats acts as a signal for reducers
 * @return a array of jobconfs configured as needed
 * @throws IOException
 */
private static JobConf[] getJobs(JobConf conf, JobPriority[] priorities, 
                         int[] numMaps, int[] numReds,
                         Path outputDir, Path inDir,
                         String mapSignalFile, String reduceSignalFile) 
throws IOException {
  JobConf[] jobs = new JobConf[priorities.length];
  for (int i = 0; i < jobs.length; ++i) {
    jobs[i] = new JobConf(conf);
    Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++));
    UtilsForTests.configureWaitingJobConf(jobs[i], inDir, newOutputDir, 
        numMaps[i], numReds[i], "jt restart test job", mapSignalFile, 
        reduceSignalFile);
    jobs[i].setJobPriority(priorities[i]);
  }
  return jobs;
}
 
源代码2 项目: hadoop   文件: TestJobOutputCommitter.java
private void testKilledJob(String fileName,
    Class<? extends OutputFormat> output, String[] exclude) throws Exception {
  Path outDir = getNewOutputDir();
  Job job = MapReduceTestUtil.createKillJob(conf, outDir, inDir);
  job.setOutputFormatClass(output);

  job.submit();

  // wait for the setup to be completed
  while (job.setupProgress() != 1.0f) {
    UtilsForTests.waitFor(100);
  }

  job.killJob(); // kill the job

  assertFalse("Job did not get kill", job.waitForCompletion(true));

  if (fileName != null) {
    Path testFile = new Path(outDir, fileName);
    assertTrue("File " + testFile + " missing for job " + job.getJobID(), fs
        .exists(testFile));
  }

  // check if the files from the missing set exists
  for (String ex : exclude) {
    Path file = new Path(outDir, ex);
    assertFalse("File " + file + " should not be present for killed job "
        + job.getJobID(), fs.exists(file));
  }
}
 
源代码3 项目: hadoop   文件: TestMRCJCFileOutputCommitter.java
@SuppressWarnings("unchecked")
public void testCommitter() throws Exception {
  Job job = Job.getInstance();
  FileOutputFormat.setOutputPath(job, outDir);
  Configuration conf = job.getConfiguration();
  conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
  JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
  TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
  FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);

  // setup
  committer.setupJob(jContext);
  committer.setupTask(tContext);

  // write output
  TextOutputFormat theOutputFormat = new TextOutputFormat();
  RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
  writeOutput(theRecordWriter, tContext);

  // do commit
  committer.commitTask(tContext);
  committer.commitJob(jContext);

  // validate output
  File expectedFile = new File(new Path(outDir, partFile).toString());
  StringBuffer expectedOutput = new StringBuffer();
  expectedOutput.append(key1).append('\t').append(val1).append("\n");
  expectedOutput.append(val1).append("\n");
  expectedOutput.append(val2).append("\n");
  expectedOutput.append(key2).append("\n");
  expectedOutput.append(key1).append("\n");
  expectedOutput.append(key2).append('\t').append(val2).append("\n");
  String output = UtilsForTests.slurp(expectedFile);
  assertEquals(output, expectedOutput.toString());
  FileUtil.fullyDelete(new File(outDir.toString()));
}
 
源代码4 项目: big-c   文件: TestJobOutputCommitter.java
private void testKilledJob(String fileName,
    Class<? extends OutputFormat> output, String[] exclude) throws Exception {
  Path outDir = getNewOutputDir();
  Job job = MapReduceTestUtil.createKillJob(conf, outDir, inDir);
  job.setOutputFormatClass(output);

  job.submit();

  // wait for the setup to be completed
  while (job.setupProgress() != 1.0f) {
    UtilsForTests.waitFor(100);
  }

  job.killJob(); // kill the job

  assertFalse("Job did not get kill", job.waitForCompletion(true));

  if (fileName != null) {
    Path testFile = new Path(outDir, fileName);
    assertTrue("File " + testFile + " missing for job " + job.getJobID(), fs
        .exists(testFile));
  }

  // check if the files from the missing set exists
  for (String ex : exclude) {
    Path file = new Path(outDir, ex);
    assertFalse("File " + file + " should not be present for killed job "
        + job.getJobID(), fs.exists(file));
  }
}
 
源代码5 项目: big-c   文件: TestMRCJCFileOutputCommitter.java
@SuppressWarnings("unchecked")
public void testCommitter() throws Exception {
  Job job = Job.getInstance();
  FileOutputFormat.setOutputPath(job, outDir);
  Configuration conf = job.getConfiguration();
  conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
  JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
  TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
  FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);

  // setup
  committer.setupJob(jContext);
  committer.setupTask(tContext);

  // write output
  TextOutputFormat theOutputFormat = new TextOutputFormat();
  RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
  writeOutput(theRecordWriter, tContext);

  // do commit
  committer.commitTask(tContext);
  committer.commitJob(jContext);

  // validate output
  File expectedFile = new File(new Path(outDir, partFile).toString());
  StringBuffer expectedOutput = new StringBuffer();
  expectedOutput.append(key1).append('\t').append(val1).append("\n");
  expectedOutput.append(val1).append("\n");
  expectedOutput.append(val2).append("\n");
  expectedOutput.append(key2).append("\n");
  expectedOutput.append(key1).append("\n");
  expectedOutput.append(key2).append('\t').append(val2).append("\n");
  String output = UtilsForTests.slurp(expectedFile);
  assertEquals(output, expectedOutput.toString());
  FileUtil.fullyDelete(new File(outDir.toString()));
}
 
源代码6 项目: RDFS   文件: TestJobInProgress.java
private void waitTillReady(JobInProgress jip, JobConf job) {
  // wait for all the maps to get scheduled
  while (jip.runningMaps() < job.getNumMapTasks()) {
    UtilsForTests.waitFor(10);
  }
  
  // wait for all the reducers to get scheduled
  while (jip.runningReduces() < job.getNumReduceTasks()) {
    UtilsForTests.waitFor(10);
  }
}
 
源代码7 项目: RDFS   文件: TestJobInProgress.java
@SuppressWarnings("unchecked")
JobConf configure(Class MapClass,Class ReduceClass, int maps, int reducers,
                  boolean locality) 
throws Exception {
  JobConf jobConf = mrCluster.createJobConf();
  final Path inDir = new Path("./failjob/input");
  final Path outDir = new Path("./failjob/output");
  String input = "Test failing job.\n One more line";
  FileSystem inFs = inDir.getFileSystem(jobConf);
  FileSystem outFs = outDir.getFileSystem(jobConf);
  outFs.delete(outDir, true);
  if (!inFs.mkdirs(inDir)) {
    throw new IOException("create directory failed" + inDir.toString());
  }

  DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
  file.writeBytes(input);
  file.close();
  jobConf.setJobName("failmaptask");
  if (locality) {
    jobConf.setInputFormat(TextInputFormat.class);
  } else {
    jobConf.setInputFormat(UtilsForTests.RandomInputFormat.class);
  }
  jobConf.setOutputKeyClass(Text.class);
  jobConf.setOutputValueClass(Text.class);
  jobConf.setMapperClass(MapClass);
  jobConf.setCombinerClass(ReduceClass);
  jobConf.setReducerClass(ReduceClass);
  FileInputFormat.setInputPaths(jobConf, inDir);
  FileOutputFormat.setOutputPath(jobConf, outDir);
  jobConf.setNumMapTasks(maps);
  jobConf.setNumReduceTasks(reducers);
  return jobConf; 
}
 
源代码8 项目: hadoop-gpu   文件: TestJobTrackerRestart.java
@Override
public void setupJob(JobContext context) throws IOException {
  FileSystem fs = FileSystem.get(context.getConfiguration());
  while (true) {
    if (fs.exists(shareDir)) {
      break;
    }
    UtilsForTests.waitFor(100);
  }
  super.cleanupJob(context);
}
 
源代码9 项目: hadoop-gpu   文件: TestJobInProgress.java
private void waitTillReady(JobInProgress jip, JobConf job) {
  // wait for all the maps to get scheduled
  while (jip.runningMaps() < job.getNumMapTasks()) {
    UtilsForTests.waitFor(10);
  }
  
  // wait for all the reducers to get scheduled
  while (jip.runningReduces() < job.getNumReduceTasks()) {
    UtilsForTests.waitFor(10);
  }
}
 
源代码10 项目: hadoop-gpu   文件: TestJobInProgress.java
@SuppressWarnings("unchecked")
JobConf configure(Class MapClass,Class ReduceClass, int maps, int reducers,
                  boolean locality) 
throws Exception {
  JobConf jobConf = mrCluster.createJobConf();
  final Path inDir = new Path("./failjob/input");
  final Path outDir = new Path("./failjob/output");
  String input = "Test failing job.\n One more line";
  FileSystem inFs = inDir.getFileSystem(jobConf);
  FileSystem outFs = outDir.getFileSystem(jobConf);
  outFs.delete(outDir, true);
  if (!inFs.mkdirs(inDir)) {
    throw new IOException("create directory failed" + inDir.toString());
  }

  DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
  file.writeBytes(input);
  file.close();
  jobConf.setJobName("failmaptask");
  if (locality) {
    jobConf.setInputFormat(TextInputFormat.class);
  } else {
    jobConf.setInputFormat(UtilsForTests.RandomInputFormat.class);
  }
  jobConf.setOutputKeyClass(Text.class);
  jobConf.setOutputValueClass(Text.class);
  jobConf.setMapperClass(MapClass);
  jobConf.setCombinerClass(ReduceClass);
  jobConf.setReducerClass(ReduceClass);
  FileInputFormat.setInputPaths(jobConf, inDir);
  FileOutputFormat.setOutputPath(jobConf, outDir);
  jobConf.setNumMapTasks(maps);
  jobConf.setNumReduceTasks(reducers);
  return jobConf; 
}
 
源代码11 项目: RDFS   文件: TestJobInProgress.java
/**
 * Test if running tasks are correctly maintained for various types of jobs
 */
private void testRunningTaskCount(boolean speculation, boolean locality)
throws Exception {
  LOG.info("Testing running jobs with speculation : " + speculation 
           + ", locality : " + locality);
  // cleanup
  dfsCluster.getFileSystem().delete(TEST_DIR, true);
  
  final Path mapSignalFile = new Path(TEST_DIR, "map-signal");
  final Path redSignalFile = new Path(TEST_DIR, "reduce-signal");
  
  // configure a waiting job with 2 maps and 2 reducers
  JobConf job = 
    configure(UtilsForTests.WaitingMapper.class, IdentityReducer.class, 1, 1,
              locality);
  job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString());
  job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString());
  
  // Disable slow-start for reduces since this maps don't complete 
  // in these test-cases...
  job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
  
  // test jobs with speculation
  job.setSpeculativeExecution(speculation);
  JobClient jc = new JobClient(job);
  RunningJob running = jc.submitJob(job);
  JobTracker jobtracker = mrCluster.getJobTrackerRunner().getJobTracker();
  JobInProgress jip = jobtracker.getJob(running.getID());
  LOG.info("Running job " + jip.getJobID());
  
  // wait
  LOG.info("Waiting for job " + jip.getJobID() + " to be ready");
  waitTillReady(jip, job);
  
  // check if the running structures are populated
  Set<TaskInProgress> uniqueTasks = new HashSet<TaskInProgress>();
  for (Map.Entry<Node, Set<TaskInProgress>> s : 
         jip.getRunningMapCache().entrySet()) {
    uniqueTasks.addAll(s.getValue());
  }
  
  // add non local map tasks
  uniqueTasks.addAll(jip.getNonLocalRunningMaps());
  
  assertEquals("Running map count doesnt match for jobs with speculation " 
               + speculation + ", and locality " + locality,
               jip.runningMaps(), uniqueTasks.size());

  assertEquals("Running reducer count doesnt match for jobs with speculation "
               + speculation + ", and locality " + locality,
               jip.runningReduces(), jip.getRunningReduces().size());
  
  // signal the tasks
  LOG.info("Signaling the tasks");
  UtilsForTests.signalTasks(dfsCluster, dfsCluster.getFileSystem(),
                            mapSignalFile.toString(), 
                            redSignalFile.toString(), numSlaves);
  
  // wait for the job to complete
  LOG.info("Waiting for job " + jip.getJobID() + " to be complete");
  UtilsForTests.waitTillDone(jc);
  
  // cleanup
  dfsCluster.getFileSystem().delete(TEST_DIR, true);
}
 
源代码12 项目: RDFS   文件: TestSessionDriver.java
protected void setUp() throws Exception {
  conf = new Configuration();
  conf.setClass("topology.node.switch.mapping.impl",
                org.apache.hadoop.net.IPv4AddressTruncationMapping.class,
                org.apache.hadoop.net.DNSToSwitchMapping.class);
  conf.setInt(CoronaConf.NOTIFIER_RETRY_INTERVAL_START, 0);
  conf.setInt(CoronaConf.NOTIFIER_RETRY_INTERVAL_FACTOR, 1);
  conf.setInt(CoronaConf.NOTIFIER_RETRY_MAX, 3);
  conf.setInt(CoronaConf.NOTIFIER_POLL_INTERVAL, 10);
  conf.set(CoronaConf.CPU_TO_RESOURCE_PARTITIONING, TstUtils.std_cpu_to_resource_partitioning);

  myclock = new UtilsForTests.FakeClock();
  myclock.advance(System.currentTimeMillis());
  ClusterManager.clock = myclock;

  cm = new ClusterManager(conf);
  cms = new ClusterManagerServer(conf, cm);
  cms.start();

  ClusterNodeInfo nodes [];
  nodes = new ClusterNodeInfo[numNodes];
  Map<ResourceType, String> resourceInfos =
      new EnumMap<ResourceType, String>(ResourceType.class);
  resourceInfos.put(ResourceType.MAP, "");
  resourceInfos.put(ResourceType.REDUCE, "");
  for (int i=0; i<numNodes; i++) {
    nodes[i] = new ClusterNodeInfo(TstUtils.getNodeHost(i),
                                   new InetAddress(TstUtils.getNodeHost(i),
                                                   TstUtils.getNodePort(i)),
                                   TstUtils.std_spec);
    nodes[i].setFree(TstUtils.std_spec);
    nodes[i].setResourceInfos(resourceInfos);
  }
  for (int i=0; i<numNodes; i++) {
    cm.nodeHeartbeat(nodes[i]);
  }

  rd = new ResourceDriver();
  driver = new SessionDriver(conf, rd);
  driver.startSession();
}
 
源代码13 项目: hadoop-gpu   文件: TestJobTrackerRestart.java
/**
 * Tests the jobtracker with restart-recovery turned off.
 * Submit a job with normal priority, maps = 2, reducers = 0}
 * 
 * Wait for the job to complete 50%
 * 
 * Restart the jobtracker with recovery turned off
 * 
 * Check if the job is missing
 */
public void testRestartWithoutRecovery(MiniDFSCluster dfs, 
                                       MiniMRCluster mr) 
throws IOException {
  // III. Test a job with waiting mapper and recovery turned off
  
  FileSystem fileSys = dfs.getFileSystem();
  
  cleanUp(fileSys, shareDir);
  
  JobConf newConf = getJobs(mr.createJobConf(), 
                            new JobPriority[] {JobPriority.NORMAL}, 
                            new int[] {2}, new int[] {0},
                            outputDir, inDir, 
                            getMapSignalFile(shareDir), 
                            getReduceSignalFile(shareDir))[0];
  
  JobClient jobClient = new JobClient(newConf);
  RunningJob job = jobClient.submitJob(newConf);
  JobID id = job.getID();
  
  //  make sure that the job is 50% completed
  while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
    UtilsForTests.waitFor(100);
  }
  
  mr.stopJobTracker();
  
  // Turn off the recovery
  mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
                                    false);
  
  // Wait for a minute before submitting a job
  UtilsForTests.waitFor(60 * 1000);
  
  mr.startJobTracker();
  
  // Signal the tasks
  UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), 
                            getReduceSignalFile(shareDir));
  
  // Wait for the JT to be ready
  UtilsForTests.waitForJobTracker(jobClient);
  
  UtilsForTests.waitTillDone(jobClient);
  
  // The submitted job should not exist
  assertTrue("Submitted job was detected with recovery disabled", 
             UtilsForTests.getJobStatus(jobClient, id) == null);
}
 
源代码14 项目: hadoop-gpu   文件: TestJobTrackerRestart.java
/** Tests a job on jobtracker with restart-recovery turned on and empty 
 *  jobhistory file.
 * Preparation :
 *    - Configure a job with
 *       - num-maps : 0 (long waiting setup)
 *       - num-reducers : 0
 *    
 * Check if the job succeedes after restart.
 * 
 * Assumption that map slots are given first for setup.
 */
public void testJobRecoveryWithEmptyHistory(MiniDFSCluster dfs, 
                                            MiniMRCluster mr) 
throws IOException {
  mr.startTaskTracker(null, null, 1, 1);
  FileSystem fileSys = dfs.getFileSystem();
  
  cleanUp(fileSys, shareDir);
  cleanUp(fileSys, inDir);
  cleanUp(fileSys, outputDir);
  
  JobConf conf = mr.createJobConf();
  conf.setNumReduceTasks(0);
  conf.setOutputCommitter(TestEmptyJob.CommitterWithDelayCleanup.class);
  fileSys.delete(outputDir, false);
  RunningJob job1 = 
    UtilsForTests.runJob(conf, inDir, outputDir, 30, 0);
  
  conf.setNumReduceTasks(0);
  conf.setOutputCommitter(CommitterWithDelaySetup.class);
  Path inDir2 = new Path(testDir, "input2");
  fileSys.mkdirs(inDir2);
  Path outDir2 = new Path(testDir, "output2");
  fileSys.delete(outDir2, false);
  JobConf newConf = getJobs(mr.createJobConf(),
                            new JobPriority[] {JobPriority.NORMAL},
                            new int[] {10}, new int[] {0},
                            outDir2, inDir2,
                            getMapSignalFile(shareDir),
                            getReduceSignalFile(shareDir))[0];

  JobClient jobClient = new JobClient(newConf);
  RunningJob job2 = jobClient.submitJob(newConf);
  JobID id = job2.getID();

  /*RunningJob job2 = 
    UtilsForTests.runJob(mr.createJobConf(), inDir2, outDir2, 0);
  
  JobID id = job2.getID();*/
  JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
  
  mr.getJobTrackerRunner().getJobTracker().initJob(jip);
  
  // find out the history filename
  String history = 
    JobHistory.JobInfo.getJobHistoryFileName(jip.getJobConf(), id);
  Path historyPath = JobHistory.JobInfo.getJobHistoryLogLocation(history);
  
  //  make sure that setup is launched
  while (jip.runningMaps() == 0) {
    UtilsForTests.waitFor(100);
  }
  
  id = job1.getID();
  jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
  
  mr.getJobTrackerRunner().getJobTracker().initJob(jip);
  
  //  make sure that cleanup is launched and is waiting
  while (!jip.isCleanupLaunched()) {
    UtilsForTests.waitFor(100);
  }
  
  mr.stopJobTracker();
  
  // delete the history file .. just to be safe.
  FileSystem historyFS = historyPath.getFileSystem(conf);
  historyFS.delete(historyPath, false);
  historyFS.create(historyPath).close(); // create an empty file
  
  
  UtilsForTests.signalTasks(dfs, fileSys, getMapSignalFile(shareDir), getReduceSignalFile(shareDir), (short)1);

  // Turn on the recovery
  mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
                                    true);
  
  mr.startJobTracker();
  
  job1.waitForCompletion();
  job2.waitForCompletion();
}
 
源代码15 项目: hadoop-gpu   文件: TestJobInProgress.java
/**
 * Test if running tasks are correctly maintained for various types of jobs
 */
private void testRunningTaskCount(boolean speculation, boolean locality)
throws Exception {
  LOG.info("Testing running jobs with speculation : " + speculation 
           + ", locality : " + locality);
  // cleanup
  dfsCluster.getFileSystem().delete(TEST_DIR, true);
  
  final Path mapSignalFile = new Path(TEST_DIR, "map-signal");
  final Path redSignalFile = new Path(TEST_DIR, "reduce-signal");
  
  // configure a waiting job with 2 maps and 2 reducers
  JobConf job = 
    configure(UtilsForTests.WaitingMapper.class, IdentityReducer.class, 1, 1,
              locality);
  job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString());
  job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString());
  
  // Disable slow-start for reduces since this maps don't complete 
  // in these test-cases...
  job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
  
  // test jobs with speculation
  job.setSpeculativeExecution(speculation);
  JobClient jc = new JobClient(job);
  RunningJob running = jc.submitJob(job);
  JobTracker jobtracker = mrCluster.getJobTrackerRunner().getJobTracker();
  JobInProgress jip = jobtracker.getJob(running.getID());
  LOG.info("Running job " + jip.getJobID());
  
  // wait
  LOG.info("Waiting for job " + jip.getJobID() + " to be ready");
  waitTillReady(jip, job);
  
  // check if the running structures are populated
  Set<TaskInProgress> uniqueTasks = new HashSet<TaskInProgress>();
  for (Map.Entry<Node, Set<TaskInProgress>> s : 
         jip.getRunningMapCache().entrySet()) {
    uniqueTasks.addAll(s.getValue());
  }
  
  // add non local map tasks
  uniqueTasks.addAll(jip.getNonLocalRunningMaps());
  
  assertEquals("Running map count doesnt match for jobs with speculation " 
               + speculation + ", and locality " + locality,
               jip.runningMaps(), uniqueTasks.size());

  assertEquals("Running reducer count doesnt match for jobs with speculation "
               + speculation + ", and locality " + locality,
               jip.runningReduces(), jip.getRunningReduces().size());
  
  // signal the tasks
  LOG.info("Signaling the tasks");
  UtilsForTests.signalTasks(dfsCluster, dfsCluster.getFileSystem(),
                            mapSignalFile.toString(), 
                            redSignalFile.toString(), numSlaves);
  
  // wait for the job to complete
  LOG.info("Waiting for job " + jip.getJobID() + " to be complete");
  UtilsForTests.waitTillDone(jc);
  
  // cleanup
  dfsCluster.getFileSystem().delete(TEST_DIR, true);
}
 
源代码16 项目: hadoop   文件: TestLineRecordReaderJobs.java
/**
 * Reads the output file into a string
 *
 * @param conf
 * @return
 * @throws IOException
 */
public String readOutputFile(Configuration conf) throws IOException {
  FileSystem localFs = FileSystem.getLocal(conf);
  Path file = new Path(outputDir, "part-r-00000");
  return UtilsForTests.slurpHadoop(file, localFs);
}
 
源代码17 项目: big-c   文件: TestLineRecordReaderJobs.java
/**
 * Reads the output file into a string
 *
 * @param conf
 * @return
 * @throws IOException
 */
public String readOutputFile(Configuration conf) throws IOException {
  FileSystem localFs = FileSystem.getLocal(conf);
  Path file = new Path(outputDir, "part-r-00000");
  return UtilsForTests.slurpHadoop(file, localFs);
}
 
 类所在包
 类方法
 同包方法