类org.apache.hadoop.mapred.SkipBadRecords源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.SkipBadRecords的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: MultithreadedMapRunner.java
@SuppressWarnings("unchecked")
public void configure(JobConf jobConf) {
  int numberOfThreads =
    jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
  if (LOG.isDebugEnabled()) {
    LOG.debug("Configuring jobConf " + jobConf.getJobName() +
              " to use " + numberOfThreads + " threads");
  }

  this.job = jobConf;
  //increment processed counter only if skipping feature is enabled
  this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
    SkipBadRecords.getAutoIncrMapperProcCount(job);
  this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
      jobConf);

  // Creating a threadpool of the configured size to execute the Mapper
  // map method in parallel.
  executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
                                           0L, TimeUnit.MILLISECONDS,
                                           new BlockingArrayQueue
                                             (numberOfThreads));
}
 
源代码2 项目: hadoop   文件: PipeReducer.java
public void configure(JobConf job) {
  super.configure(job);
  //disable the auto increment of the counter. For streaming, no of 
  //processed records could be different(equal or less) than the no of 
  //records input.
  SkipBadRecords.setAutoIncrReducerProcCount(job, false);
  skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);

  try {
    reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
    reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
    this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
  } catch (UnsupportedEncodingException e) {
    throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
  }
}
 
源代码3 项目: hadoop   文件: PipeMapper.java
public void configure(JobConf job) {
  super.configure(job);
  //disable the auto increment of the counter. For streaming, no of 
  //processed records could be different(equal or less) than the no of 
  //records input.
  SkipBadRecords.setAutoIncrMapperProcCount(job, false);
  skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
  if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) {
    String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
    ignoreKey = job.getBoolean("stream.map.input.ignoreKey", 
      inputFormatClassName.equals(TextInputFormat.class.getCanonicalName()));
  }
  
  try {
    mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
    mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
    numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1);
  } catch (UnsupportedEncodingException e) {
    throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
  }
}
 
源代码4 项目: hadoop   文件: TestStreamingBadRecords.java
public App(String[] args) throws Exception{
  if(args.length>0) {
    isReducer = Boolean.parseBoolean(args[0]);
  }
  String counter = SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS;
  if(isReducer) {
    counter = SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS;
  }
  BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
  String line;
  int count = 0;
  while ((line = in.readLine()) != null) {
    processLine(line);
    count++;
    if(count>=10) {
      System.err.println("reporter:counter:"+SkipBadRecords.COUNTER_GROUP+
          ","+counter+","+count);
      count = 0;
    }
  }
}
 
源代码5 项目: big-c   文件: MultithreadedMapRunner.java
@SuppressWarnings("unchecked")
public void configure(JobConf jobConf) {
  int numberOfThreads =
    jobConf.getInt(MultithreadedMapper.NUM_THREADS, 10);
  if (LOG.isDebugEnabled()) {
    LOG.debug("Configuring jobConf " + jobConf.getJobName() +
              " to use " + numberOfThreads + " threads");
  }

  this.job = jobConf;
  //increment processed counter only if skipping feature is enabled
  this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
    SkipBadRecords.getAutoIncrMapperProcCount(job);
  this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
      jobConf);

  // Creating a threadpool of the configured size to execute the Mapper
  // map method in parallel.
  executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
                                           0L, TimeUnit.MILLISECONDS,
                                           new BlockingArrayQueue
                                             (numberOfThreads));
}
 
源代码6 项目: big-c   文件: PipeReducer.java
public void configure(JobConf job) {
  super.configure(job);
  //disable the auto increment of the counter. For streaming, no of 
  //processed records could be different(equal or less) than the no of 
  //records input.
  SkipBadRecords.setAutoIncrReducerProcCount(job, false);
  skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);

  try {
    reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
    reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
    this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
  } catch (UnsupportedEncodingException e) {
    throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
  }
}
 
源代码7 项目: big-c   文件: PipeMapper.java
public void configure(JobConf job) {
  super.configure(job);
  //disable the auto increment of the counter. For streaming, no of 
  //processed records could be different(equal or less) than the no of 
  //records input.
  SkipBadRecords.setAutoIncrMapperProcCount(job, false);
  skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
  if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) {
    String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
    ignoreKey = job.getBoolean("stream.map.input.ignoreKey", 
      inputFormatClassName.equals(TextInputFormat.class.getCanonicalName()));
  }
  
  try {
    mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
    mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
    numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1);
  } catch (UnsupportedEncodingException e) {
    throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
  }
}
 
源代码8 项目: big-c   文件: TestStreamingBadRecords.java
public App(String[] args) throws Exception{
  if(args.length>0) {
    isReducer = Boolean.parseBoolean(args[0]);
  }
  String counter = SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS;
  if(isReducer) {
    counter = SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS;
  }
  BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
  String line;
  int count = 0;
  while ((line = in.readLine()) != null) {
    processLine(line);
    count++;
    if(count>=10) {
      System.err.println("reporter:counter:"+SkipBadRecords.COUNTER_GROUP+
          ","+counter+","+count);
      count = 0;
    }
  }
}
 
源代码9 项目: RDFS   文件: MultithreadedMapRunner.java
@SuppressWarnings("unchecked")
public void configure(JobConf jobConf) {
  int numberOfThreads =
    jobConf.getInt("mapred.map.multithreadedrunner.threads", 10);
  if (LOG.isDebugEnabled()) {
    LOG.debug("Configuring jobConf " + jobConf.getJobName() +
              " to use " + numberOfThreads + " threads");
  }

  this.job = jobConf;
  //increment processed counter only if skipping feature is enabled
  this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
    SkipBadRecords.getAutoIncrMapperProcCount(job);
  this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
      jobConf);

  // Creating a threadpool of the configured size to execute the Mapper
  // map method in parallel.
  executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
                                           0L, TimeUnit.MILLISECONDS,
                                           new BlockingArrayQueue
                                             (numberOfThreads));
}
 
源代码10 项目: RDFS   文件: PipeReducer.java
public void configure(JobConf job) {
  super.configure(job);
  //disable the auto increment of the counter. For streaming, no of 
  //processed records could be different(equal or less) than the no of 
  //records input.
  SkipBadRecords.setAutoIncrReducerProcCount(job, false);
  skipping = job.getBoolean("mapred.skip.on", false);

  try {
    reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
    reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
    this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
  } catch (UnsupportedEncodingException e) {
    throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
  }
}
 
源代码11 项目: RDFS   文件: PipeMapper.java
public void configure(JobConf job) {
  super.configure(job);
  //disable the auto increment of the counter. For streaming, no of 
  //processed records could be different(equal or less) than the no of 
  //records input.
  SkipBadRecords.setAutoIncrMapperProcCount(job, false);
  skipping = job.getBoolean("mapred.skip.on", false);
  String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
  ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());

  try {
    mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
    mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
    numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1);
  } catch (UnsupportedEncodingException e) {
    throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
  }
}
 
源代码12 项目: RDFS   文件: TestStreamingBadRecords.java
public App(String[] args) throws Exception{
  if(args.length>0) {
    isReducer = Boolean.parseBoolean(args[0]);
  }
  String counter = SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS;
  if(isReducer) {
    counter = SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS;
  }
  BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
  String line;
  int count = 0;
  while ((line = in.readLine()) != null) {
    processLine(line);
    count++;
    if(count>=10) {
      System.err.println("reporter:counter:"+SkipBadRecords.COUNTER_GROUP+
          ","+counter+","+count);
      count = 0;
    }
  }
}
 
源代码13 项目: hadoop-gpu   文件: MultithreadedMapRunner.java
@SuppressWarnings("unchecked")
public void configure(JobConf jobConf) {
  int numberOfThreads =
    jobConf.getInt("mapred.map.multithreadedrunner.threads", 10);
  if (LOG.isDebugEnabled()) {
    LOG.debug("Configuring jobConf " + jobConf.getJobName() +
              " to use " + numberOfThreads + " threads");
  }

  this.job = jobConf;
  //increment processed counter only if skipping feature is enabled
  this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
    SkipBadRecords.getAutoIncrMapperProcCount(job);
  this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
      jobConf);

  // Creating a threadpool of the configured size to execute the Mapper
  // map method in parallel.
  executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
                                           0L, TimeUnit.MILLISECONDS,
                                           new BlockingArrayQueue
                                             (numberOfThreads));
}
 
源代码14 项目: hadoop-gpu   文件: PipeReducer.java
public void configure(JobConf job) {
  super.configure(job);
  //disable the auto increment of the counter. For streaming, no of 
  //processed records could be different(equal or less) than the no of 
  //records input.
  SkipBadRecords.setAutoIncrReducerProcCount(job, false);
  skipping = job.getBoolean("mapred.skip.on", false);

  try {
    reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
    reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
    this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
  } catch (UnsupportedEncodingException e) {
    throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
  }
}
 
源代码15 项目: hadoop-gpu   文件: PipeMapper.java
public void configure(JobConf job) {
  super.configure(job);
  //disable the auto increment of the counter. For streaming, no of 
  //processed records could be different(equal or less) than the no of 
  //records input.
  SkipBadRecords.setAutoIncrMapperProcCount(job, false);
  skipping = job.getBoolean("mapred.skip.on", false);
  String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
  ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());

  try {
    mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
    mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
    numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1);
  } catch (UnsupportedEncodingException e) {
    throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
  }
}
 
源代码16 项目: hadoop-gpu   文件: TestStreamingBadRecords.java
public App(String[] args) throws Exception{
  if(args.length>0) {
    isReducer = Boolean.parseBoolean(args[0]);
  }
  String counter = SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS;
  if(isReducer) {
    counter = SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS;
  }
  BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
  String line;
  int count = 0;
  while ((line = in.readLine()) != null) {
    processLine(line);
    count++;
    if(count>=10) {
      System.err.println("reporter:counter:"+SkipBadRecords.COUNTER_GROUP+
          ","+counter+","+count);
      count = 0;
    }
  }
}
 
源代码17 项目: hadoop   文件: PipesReducer.java
public void configure(JobConf job) {
  this.job = job;
  //disable the auto increment of the counter. For pipes, no of processed 
  //records could be different(equal or less) than the no of records input.
  SkipBadRecords.setAutoIncrReducerProcCount(job, false);
  skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
}
 
源代码18 项目: hadoop   文件: PipesMapRunner.java
/**
 * Get the new configuration.
 * @param job the job's configuration
 */
public void configure(JobConf job) {
  this.job = job;
  //disable the auto increment of the counter. For pipes, no of processed 
  //records could be different(equal or less) than the no of records input.
  SkipBadRecords.setAutoIncrMapperProcCount(job, false);
}
 
源代码19 项目: big-c   文件: PipesReducer.java
public void configure(JobConf job) {
  this.job = job;
  //disable the auto increment of the counter. For pipes, no of processed 
  //records could be different(equal or less) than the no of records input.
  SkipBadRecords.setAutoIncrReducerProcCount(job, false);
  skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
}
 
源代码20 项目: big-c   文件: PipesMapRunner.java
/**
 * Get the new configuration.
 * @param job the job's configuration
 */
public void configure(JobConf job) {
  this.job = job;
  //disable the auto increment of the counter. For pipes, no of processed 
  //records could be different(equal or less) than the no of records input.
  SkipBadRecords.setAutoIncrMapperProcCount(job, false);
}
 
源代码21 项目: RDFS   文件: PipesReducer.java
public void configure(JobConf job) {
  this.job = job;
  //disable the auto increment of the counter. For pipes, no of processed 
  //records could be different(equal or less) than the no of records input.
  SkipBadRecords.setAutoIncrReducerProcCount(job, false);
  skipping = job.getBoolean("mapred.skip.on", false);
}
 
源代码22 项目: RDFS   文件: PipesMapRunner.java
/**
 * Get the new configuration.
 * @param job the job's configuration
 */
public void configure(JobConf job) {
  this.job = job;
  //disable the auto increment of the counter. For pipes, no of processed 
  //records could be different(equal or less) than the no of records input.
  SkipBadRecords.setAutoIncrMapperProcCount(job, false);
}
 
源代码23 项目: RDFS   文件: TestStreamingBadRecords.java
public void testSkip() throws Exception {
  JobConf clusterConf = createJobConf();
  createInput();
  int attSkip =0;
  SkipBadRecords.setAttemptsToStartSkipping(clusterConf,attSkip);
  //the no of attempts to successfully complete the task depends 
  //on the no of bad records.
  int mapperAttempts = attSkip+1+MAPPER_BAD_RECORDS.size();
  int reducerAttempts = attSkip+1+REDUCER_BAD_RECORDS.size();
  
  String[] args =  new String[] {
    "-input", (new Path(getInputDir(), "text.txt")).toString(),
    "-output", getOutputDir().toString(),
    "-mapper", badMapper,
    "-reducer", badReducer,
    "-verbose",
    "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
    "-jobconf", "mapred.skip.attempts.to.start.skipping="+attSkip,
    "-jobconf", "mapred.skip.out.dir=none",
    "-jobconf", "mapred.map.max.attempts="+mapperAttempts,
    "-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
    "-jobconf", "mapred.skip.map.max.skip.records="+Long.MAX_VALUE,
    "-jobconf", "mapred.skip.reduce.max.skip.groups="+Long.MAX_VALUE,
    "-jobconf", "mapred.map.tasks=1",
    "-jobconf", "mapred.reduce.tasks=1",
    "-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
    "-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
    "-jobconf", "mapred.job.tracker.http.address="
                  +clusterConf.get("mapred.job.tracker.http.address"),
    "-jobconf", "stream.debug=set",
    "-jobconf", "keep.failed.task.files=true",
    "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
  };
  StreamJob job = new StreamJob(args, false);      
  job.go();
  validateOutput(job.running_, false);
  //validate that there is no skip directory as it has been set to "none"
  assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)==null);
}
 
源代码24 项目: RDFS   文件: TestStreamingBadRecords.java
public void testNarrowDown() throws Exception {
  createInput();
  JobConf clusterConf = createJobConf();
  String[] args =  new String[] {
    "-input", (new Path(getInputDir(), "text.txt")).toString(),
    "-output", getOutputDir().toString(),
    "-mapper", badMapper,
    "-reducer", badReducer,
    "-verbose",
    "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
    "-jobconf", "mapred.skip.attempts.to.start.skipping=1",
    //actually fewer attempts are required than specified
    //but to cater to the case of slow processed counter update, need to 
    //have more attempts
    "-jobconf", "mapred.map.max.attempts=20",
    "-jobconf", "mapred.reduce.max.attempts=15",
    "-jobconf", "mapred.skip.map.max.skip.records=1",
    "-jobconf", "mapred.skip.reduce.max.skip.groups=1",
    "-jobconf", "mapred.map.tasks=1",
    "-jobconf", "mapred.reduce.tasks=1",
    "-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
    "-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
    "-jobconf", "mapred.job.tracker.http.address="
                  +clusterConf.get("mapred.job.tracker.http.address"),
    "-jobconf", "stream.debug=set",
    "-jobconf", "keep.failed.task.files=true",
    "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
  };
  StreamJob job = new StreamJob(args, false);      
  job.go();
  
  validateOutput(job.running_, true);
  assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)!=null);
}
 
源代码25 项目: hadoop-gpu   文件: PipesGPUMapRunner.java
/**
 * Get the new configuration.
 * @param job the job's configuration
 */
public void configure(JobConf job) {
  this.job = job;
  //disable the auto increment of the counter. For pipes, no of processed 
  //records could be different(equal or less) than the no of records input.
  SkipBadRecords.setAutoIncrMapperProcCount(job, false);
}
 
源代码26 项目: hadoop-gpu   文件: PipesReducer.java
public void configure(JobConf job) {
  this.job = job;
  //disable the auto increment of the counter. For pipes, no of processed 
  //records could be different(equal or less) than the no of records input.
  SkipBadRecords.setAutoIncrReducerProcCount(job, false);
  skipping = job.getBoolean("mapred.skip.on", false);
}
 
源代码27 项目: hadoop-gpu   文件: PipesMapRunner.java
/**
 * Get the new configuration.
 * @param job the job's configuration
 */
public void configure(JobConf job) {
  this.job = job;
  //disable the auto increment of the counter. For pipes, no of processed 
  //records could be different(equal or less) than the no of records input.
  SkipBadRecords.setAutoIncrMapperProcCount(job, false);
}
 
源代码28 项目: hadoop-gpu   文件: TestStreamingBadRecords.java
public void testSkip() throws Exception {
  JobConf clusterConf = createJobConf();
  createInput();
  int attSkip =0;
  SkipBadRecords.setAttemptsToStartSkipping(clusterConf,attSkip);
  //the no of attempts to successfully complete the task depends 
  //on the no of bad records.
  int mapperAttempts = attSkip+1+MAPPER_BAD_RECORDS.size();
  int reducerAttempts = attSkip+1+REDUCER_BAD_RECORDS.size();
  
  String[] args =  new String[] {
    "-input", (new Path(getInputDir(), "text.txt")).toString(),
    "-output", getOutputDir().toString(),
    "-mapper", badMapper,
    "-reducer", badReducer,
    "-verbose",
    "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
    "-jobconf", "mapred.skip.attempts.to.start.skipping="+attSkip,
    "-jobconf", "mapred.skip.out.dir=none",
    "-jobconf", "mapred.map.max.attempts="+mapperAttempts,
    "-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
    "-jobconf", "mapred.skip.map.max.skip.records="+Long.MAX_VALUE,
    "-jobconf", "mapred.skip.reduce.max.skip.groups="+Long.MAX_VALUE,
    "-jobconf", "mapred.map.tasks=1",
    "-jobconf", "mapred.reduce.tasks=1",
    "-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
    "-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
    "-jobconf", "mapred.job.tracker.http.address="
                  +clusterConf.get("mapred.job.tracker.http.address"),
    "-jobconf", "stream.debug=set",
    "-jobconf", "keep.failed.task.files=true",
    "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
  };
  StreamJob job = new StreamJob(args, false);      
  job.go();
  validateOutput(job.running_, false);
  //validate that there is no skip directory as it has been set to "none"
  assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)==null);
}
 
源代码29 项目: hadoop-gpu   文件: TestStreamingBadRecords.java
public void testNarrowDown() throws Exception {
  createInput();
  JobConf clusterConf = createJobConf();
  String[] args =  new String[] {
    "-input", (new Path(getInputDir(), "text.txt")).toString(),
    "-output", getOutputDir().toString(),
    "-mapper", badMapper,
    "-reducer", badReducer,
    "-verbose",
    "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
    "-jobconf", "mapred.skip.attempts.to.start.skipping=1",
    //actually fewer attempts are required than specified
    //but to cater to the case of slow processed counter update, need to 
    //have more attempts
    "-jobconf", "mapred.map.max.attempts=20",
    "-jobconf", "mapred.reduce.max.attempts=15",
    "-jobconf", "mapred.skip.map.max.skip.records=1",
    "-jobconf", "mapred.skip.reduce.max.skip.groups=1",
    "-jobconf", "mapred.map.tasks=1",
    "-jobconf", "mapred.reduce.tasks=1",
    "-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
    "-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
    "-jobconf", "mapred.job.tracker.http.address="
                  +clusterConf.get("mapred.job.tracker.http.address"),
    "-jobconf", "stream.debug=set",
    "-jobconf", "keep.failed.task.files=true",
    "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
  };
  StreamJob job = new StreamJob(args, false);      
  job.go();
  
  validateOutput(job.running_, true);
  assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)!=null);
}
 
 类所在包
 同包方法