类org.apache.hadoop.mapreduce.MRJobConfig源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.MRJobConfig的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: big-c   文件: TestSpeculativeExecution.java
public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context) throws IOException, InterruptedException {
  // Make one reducer slower for speculative execution
  TaskAttemptID taid = context.getTaskAttemptID();
  long sleepTime = 100;
  Configuration conf = context.getConfiguration();
  boolean test_speculate_reduce =
            conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);

  // IF TESTING REDUCE SPECULATIVE EXECUTION:
  //   Make the "*_r_000000_0" attempt take much longer than the others.
  //   When speculative execution is enabled, this should cause the attempt
  //   to be killed and restarted. At that point, the attempt ID will be
  //   "*_r_000000_1", so sleepTime will still remain 100ms.
  if ( (taid.getTaskType() == TaskType.REDUCE) && test_speculate_reduce
        && (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) {
    sleepTime = 10000;
  }
  try{
    Thread.sleep(sleepTime);
  } catch(InterruptedException ie) {
    // Ignore
  }
  context.write(key,new IntWritable(0));
}
 
源代码2 项目: hadoop   文件: PipeReducer.java
public void configure(JobConf job) {
  super.configure(job);
  //disable the auto increment of the counter. For streaming, no of 
  //processed records could be different(equal or less) than the no of 
  //records input.
  SkipBadRecords.setAutoIncrReducerProcCount(job, false);
  skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);

  try {
    reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
    reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
    this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
  } catch (UnsupportedEncodingException e) {
    throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
  }
}
 
源代码3 项目: big-c   文件: CombineFileRecordReaderWrapper.java
private boolean fileSplitIsValid(TaskAttemptContext context) {
  Configuration conf = context.getConfiguration();
  long offset = conf.getLong(MRJobConfig.MAP_INPUT_START, 0L);
  if (fileSplit.getStart() != offset) {
    return false;
  }
  long length = conf.getLong(MRJobConfig.MAP_INPUT_PATH, 0L);
  if (fileSplit.getLength() != length) {
    return false;
  }
  String path = conf.get(MRJobConfig.MAP_INPUT_FILE);
  if (!fileSplit.getPath().toString().equals(path)) {
    return false;
  }
  return true;
}
 
源代码4 项目: hadoop   文件: ClientServiceDelegate.java
public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
    JobID jobId, MRClientProtocol historyServerProxy) {
  this.conf = new Configuration(conf); // Cloning for modifying.
  // For faster redirects from AM to HS.
  this.conf.setInt(
      CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
      this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
          MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
  this.conf.setInt(
      CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
      this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES_ON_TIMEOUTS,
          MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES_ON_TIMEOUTS));
  this.rm = rm;
  this.jobId = jobId;
  this.historyServerProxy = historyServerProxy;
  this.appId = TypeConverter.toYarn(jobId).getAppId();
  notRunningJobs = new HashMap<JobState, HashMap<String, NotRunningJob>>();
}
 
源代码5 项目: hadoop   文件: TestBinaryTokenFile.java
/**
 * run a distributed job with -tokenCacheFile option parameter and
 * verify that no exception happens.
 * @throws IOException
*/
@Test
public void testTokenCacheFile() throws IOException {
  Configuration conf = mrCluster.getConfig();
  createBinaryTokenFile(conf);
  // provide namenodes names for the job to get the delegation tokens for
  final String nnUri = dfsCluster.getURI(0).toString();
  conf.set(MRJobConfig.JOB_NAMENODES, nnUri + "," + nnUri);

  // using argument to pass the file name
  final String[] args = {
      "-tokenCacheFile", binaryTokenFileName.toString(),
      "-m", "1", "-r", "1", "-mt", "1", "-rt", "1"
      };
  int res = -1;
  try {
    res = ToolRunner.run(conf, new SleepJob(), args);
  } catch (Exception e) {
    System.out.println("Job failed with " + e.getLocalizedMessage());
    e.printStackTrace(System.out);
    fail("Job failed");
  }
  assertEquals("dist job res is not 0:", 0, res);
}
 
源代码6 项目: hadoop   文件: TestCompressionEmulationUtils.java
/**
 * Runs a GridMix data-generation job.
 */
private static void runDataGenJob(Configuration conf, Path tempDir) 
throws IOException, ClassNotFoundException, InterruptedException {
  JobClient client = new JobClient(conf);
  
  // get the local job runner
  conf.setInt(MRJobConfig.NUM_MAPS, 1);
  
  Job job = Job.getInstance(conf);
  
  CompressionEmulationUtil.configure(job);
  job.setInputFormatClass(CustomInputFormat.class);
  
  // set the output path
  FileOutputFormat.setOutputPath(job, tempDir);
  
  // submit and wait for completion
  job.submit();
  int ret = job.waitForCompletion(true) ? 0 : 1;

  assertEquals("Job Failed", 0, ret);
}
 
源代码7 项目: big-c   文件: TestFail.java
@Test
//All Task attempts are timed out, leading to Job failure
public void testTimedOutTask() throws Exception {
  MRApp app = new TimeOutTaskMRApp(1, 0);
  Configuration conf = new Configuration();
  int maxAttempts = 2;
  conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
  // disable uberization (requires entire job to be reattempted, so max for
  // subtask attempts is overridden to 1)
  conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
  Job job = app.submit(conf);
  app.waitForState(job, JobState.FAILED);
  Map<TaskId,Task> tasks = job.getTasks();
  Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
  Task task = tasks.values().iterator().next();
  Assert.assertEquals("Task state not correct", TaskState.FAILED,
      task.getReport().getTaskState());
  Map<TaskAttemptId, TaskAttempt> attempts =
      tasks.values().iterator().next().getAttempts();
  Assert.assertEquals("Num attempts is not correct", maxAttempts,
      attempts.size());
  for (TaskAttempt attempt : attempts.values()) {
    Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
        attempt.getReport().getTaskAttemptState());
  }
}
 
源代码8 项目: big-c   文件: MRApps.java
/**
 * Creates a {@link ApplicationClassLoader} if
 * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
 * the APP_CLASSPATH environment variable is set.
 * @param conf
 * @return the created job classloader, or null if the job classloader is not
 * enabled or the APP_CLASSPATH environment variable is not set
 * @throws IOException
 */
public static ClassLoader createJobClassLoader(Configuration conf)
    throws IOException {
  ClassLoader jobClassLoader = null;
  if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
    String appClasspath = System.getenv(Environment.APP_CLASSPATH.key());
    if (appClasspath == null) {
      LOG.warn("Not creating job classloader since APP_CLASSPATH is not set.");
    } else {
      LOG.info("Creating job classloader");
      if (LOG.isDebugEnabled()) {
        LOG.debug("APP_CLASSPATH=" + appClasspath);
      }
      String[] systemClasses = getSystemClasses(conf);
      jobClassLoader = createJobClassLoader(appClasspath,
          systemClasses);
    }
  }
  return jobClassLoader;
}
 
源代码9 项目: hadoop   文件: PipeMapper.java
public void configure(JobConf job) {
  super.configure(job);
  //disable the auto increment of the counter. For streaming, no of 
  //processed records could be different(equal or less) than the no of 
  //records input.
  SkipBadRecords.setAutoIncrMapperProcCount(job, false);
  skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
  if (mapInputWriterClass_.getCanonicalName().equals(TextInputWriter.class.getCanonicalName())) {
    String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
    ignoreKey = job.getBoolean("stream.map.input.ignoreKey", 
      inputFormatClassName.equals(TextInputFormat.class.getCanonicalName()));
  }
  
  try {
    mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "\t").getBytes("UTF-8");
    mapInputFieldSeparator = job.get("stream.map.input.field.separator", "\t").getBytes("UTF-8");
    numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1);
  } catch (UnsupportedEncodingException e) {
    throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
  }
}
 
源代码10 项目: hadoop   文件: TestJobImpl.java
@Test(timeout=20000)
public void testKilledDuringCommit() throws Exception {
  Configuration conf = new Configuration();
  conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
  AsyncDispatcher dispatcher = new AsyncDispatcher();
  dispatcher.init(conf);
  dispatcher.start();
  CyclicBarrier syncBarrier = new CyclicBarrier(2);
  OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true);
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();

  JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
  completeJobTasks(job);
  assertJobState(job, JobStateInternal.COMMITTING);

  syncBarrier.await();
  job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
  assertJobState(job, JobStateInternal.KILLED);
  dispatcher.stop();
  commitHandler.stop();
}
 
源代码11 项目: hadoop   文件: TestSpeculativeExecution.java
public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context) throws IOException, InterruptedException {
  // Make one reducer slower for speculative execution
  TaskAttemptID taid = context.getTaskAttemptID();
  long sleepTime = 100;
  Configuration conf = context.getConfiguration();
  boolean test_speculate_reduce =
            conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);

  // IF TESTING REDUCE SPECULATIVE EXECUTION:
  //   Make the "*_r_000000_0" attempt take much longer than the others.
  //   When speculative execution is enabled, this should cause the attempt
  //   to be killed and restarted. At that point, the attempt ID will be
  //   "*_r_000000_1", so sleepTime will still remain 100ms.
  if ( (taid.getTaskType() == TaskType.REDUCE) && test_speculate_reduce
        && (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) {
    sleepTime = 10000;
  }
  try{
    Thread.sleep(sleepTime);
  } catch(InterruptedException ie) {
    // Ignore
  }
  context.write(key,new IntWritable(0));
}
 
源代码12 项目: pravega-samples   文件: TeraGen.java
/**
 * Create the desired number of splits, dividing the number of rows
 * between the mappers.
 */
public List<InputSplit> getSplits(JobContext job) {
  long totalRows = getNumberOfRows(job);
  int numSplits = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
  LOG.info("Generating " + totalRows + " using " + numSplits);
  List<InputSplit> splits = new ArrayList<InputSplit>();
  long currentRow = 0;
  for(int split = 0; split < numSplits; ++split) {
    long goal = 
      (long) Math.ceil(totalRows * (double)(split + 1) / numSplits);
    splits.add(new RangeInputSplit(currentRow, goal - currentRow));
    currentRow = goal;
  }
  return splits;
}
 
源代码13 项目: hadoop   文件: SplitMetaInfoReader.java
public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(
    JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir) 
throws IOException {
  long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE,
      MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
  Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
  String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString();
  FileStatus fStatus = fs.getFileStatus(metaSplitFile);
  if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
    throw new IOException("Split metadata size exceeded " +
        maxMetaInfoSize +". Aborting job " + jobId);
  }
  FSDataInputStream in = fs.open(metaSplitFile);
  byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
  in.readFully(header);
  if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
    throw new IOException("Invalid header on split file");
  }
  int vers = WritableUtils.readVInt(in);
  if (vers != JobSplit.META_SPLIT_VERSION) {
    in.close();
    throw new IOException("Unsupported split version " + vers);
  }
  int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values
  JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo = 
    new JobSplit.TaskSplitMetaInfo[numSplits];
  for (int i = 0; i < numSplits; i++) {
    JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
    splitMetaInfo.readFields(in);
    JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
        jobSplitFile, 
        splitMetaInfo.getStartOffset());
    allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex, 
        splitMetaInfo.getLocations(), 
        splitMetaInfo.getInputDataLength());
  }
  in.close();
  return allSplitMetaInfo;
}
 
源代码14 项目: big-c   文件: RMCommunicator.java
@Override
protected void serviceInit(Configuration conf) throws Exception {
  super.serviceInit(conf);
  rmPollInterval =
      conf.getInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS,
          MRJobConfig.DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS);
}
 
源代码15 项目: ignite   文件: HadoopFileSystemCacheUtils.java
/**
 * Gets non-null user name as per the Hadoop viewpoint.
 * @param cfg the Hadoop job configuration, may be null.
 * @return the user name, never null.
 */
private static String getMrHadoopUser(Configuration cfg) throws IOException {
    String user = cfg.get(MRJobConfig.USER_NAME);

    if (user == null)
        user = IgniteHadoopFileSystem.getFsHadoopUser();

    return user;
}
 
源代码16 项目: big-c   文件: MRAppMaster.java
private void processRecovery() throws IOException{
  if (appAttemptID.getAttemptId() == 1) {
    return;  // no need to recover on the first attempt
  }

  boolean recoveryEnabled = getConfig().getBoolean(
      MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
      MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);

  boolean recoverySupportedByCommitter = isRecoverySupported();

  // If a shuffle secret was not provided by the job client then this app
  // attempt will generate one.  However that disables recovery if there
  // are reducers as the shuffle secret would be app attempt specific.
  int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
  boolean shuffleKeyValidForRecovery =
      TokenCache.getShuffleSecretKey(jobCredentials) != null;

  if (recoveryEnabled && recoverySupportedByCommitter
      && (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
    LOG.info("Recovery is enabled. "
        + "Will try to recover from previous life on best effort basis.");
    try {
      parsePreviousJobHistory();
    } catch (IOException e) {
      LOG.warn("Unable to parse prior job history, aborting recovery", e);
      // try to get just the AMInfos
      amInfos.addAll(readJustAMInfos());
    }
  } else {
    LOG.info("Will not try to recover. recoveryEnabled: "
          + recoveryEnabled + " recoverySupportedByCommitter: "
          + recoverySupportedByCommitter + " numReduceTasks: "
          + numReduceTasks + " shuffleKeyValidForRecovery: "
          + shuffleKeyValidForRecovery + " ApplicationAttemptID: "
          + appAttemptID.getAttemptId());
    // Get the amInfos anyways whether recovery is enabled or not
    amInfos.addAll(readJustAMInfos());
  }
}
 
源代码17 项目: beam   文件: HadoopFormats.java
/**
 * Creates new instance of {@link Partitioner} by class specified in hadoop {@link Configuration}.
 *
 * @param conf hadoop Configuration
 * @param <KeyT> KeyType of {@link Partitioner}
 * @param <ValueT> ValueTYpe of {@link Partitioner}
 * @return new {@link Partitioner}
 */
@SuppressWarnings("unchecked")
static <KeyT, ValueT> Partitioner<KeyT, ValueT> getPartitioner(Configuration conf) {
  return (Partitioner<KeyT, ValueT>)
      createInstanceFromConfig(
          conf,
          MRJobConfig.PARTITIONER_CLASS_ATTR,
          DEFAULT_PARTITIONER_CLASS_ATTR,
          Partitioner.class);
}
 
源代码18 项目: hadoop   文件: MapTaskAttemptImpl.java
@Override
public Task createRemoteTask() {
  //job file name is set in TaskAttempt, setting it null here
  MapTask mapTask =
    new MapTask("", TypeConverter.fromYarn(getID()), partition,
        splitInfo.getSplitIndex(), 1); // YARN doesn't have the concept of slots per task, set it as 1.
  mapTask.setUser(conf.get(MRJobConfig.USER_NAME));
  mapTask.setConf(conf);
  return mapTask;
}
 
源代码19 项目: hadoop   文件: TestJobConf.java
@Test
public void testProfileParamsSetter() {
  JobConf configuration = new JobConf();

  configuration.setProfileParams("test");
  Assert.assertEquals("test", configuration.get(MRJobConfig.TASK_PROFILE_PARAMS));
}
 
源代码20 项目: big-c   文件: TestYARNRunner.java
@Test
public void testAMStandardEnv() throws Exception {
  final String ADMIN_LIB_PATH = "foo";
  final String USER_LIB_PATH = "bar";
  final String USER_SHELL = "shell";
  JobConf jobConf = new JobConf();

  jobConf.set(MRJobConfig.MR_AM_ADMIN_USER_ENV, "LD_LIBRARY_PATH=" +
      ADMIN_LIB_PATH);
  jobConf.set(MRJobConfig.MR_AM_ENV, "LD_LIBRARY_PATH="
      + USER_LIB_PATH);
  jobConf.set(MRJobConfig.MAPRED_ADMIN_USER_SHELL, USER_SHELL);

  YARNRunner yarnRunner = new YARNRunner(jobConf);
  ApplicationSubmissionContext appSubCtx =
      buildSubmitContext(yarnRunner, jobConf);

  // make sure PWD is first in the lib path
  ContainerLaunchContext clc = appSubCtx.getAMContainerSpec();
  Map<String, String> env = clc.getEnvironment();
  String libPath = env.get(Environment.LD_LIBRARY_PATH.name());
  assertNotNull("LD_LIBRARY_PATH not set", libPath);
  String cps = jobConf.getBoolean(
      MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
      MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM)
      ? ApplicationConstants.CLASS_PATH_SEPARATOR : File.pathSeparator;
  assertEquals("Bad AM LD_LIBRARY_PATH setting",
      MRApps.crossPlatformifyMREnv(conf, Environment.PWD)
      + cps + ADMIN_LIB_PATH + cps + USER_LIB_PATH, libPath);

  // make sure SHELL is set
  String shell = env.get(Environment.SHELL.name());
  assertNotNull("SHELL not set", shell);
  assertEquals("Bad SHELL setting", USER_SHELL, shell);
}
 
@Override
public void contextualize(Configuration conf, AppContext context) {
  super.contextualize(conf, context);

  lambda
      = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS,
          MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS);
  smoothedValue
      = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
          ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
}
 
源代码22 项目: hadoop   文件: LocalContainerAllocator.java
@Override
protected void serviceInit(Configuration conf) throws Exception {
  super.serviceInit(conf);
  retryInterval =
      getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
          MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
  // Init startTime to current time. If all goes well, it will be reset after
  // first attempt to contact RM.
  retrystartTime = System.currentTimeMillis();
}
 
源代码23 项目: hadoop   文件: TaskAttemptImpl.java
private int getCpuRequired(Configuration conf, TaskType taskType) {
  int vcores = 1;
  if (taskType == TaskType.MAP)  {
    vcores =
        conf.getInt(MRJobConfig.MAP_CPU_VCORES,
            MRJobConfig.DEFAULT_MAP_CPU_VCORES);
  } else if (taskType == TaskType.REDUCE) {
    vcores =
        conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
            MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
  }
  
  return vcores;
}
 
源代码24 项目: hadoop   文件: TaskAttemptImpl.java
private int getGpuRequired(Configuration conf, TaskType taskType) {
  int gcores = 0;
  if (taskType == TaskType.MAP)  {
    gcores =
        conf.getInt(MRJobConfig.MAP_GPU_CORES,
            MRJobConfig.DEFAULT_MAP_GPU_CORES);
  } else if (taskType == TaskType.REDUCE) {
    gcores =
        conf.getInt(MRJobConfig.REDUCE_GPU_CORES,
            MRJobConfig.DEFAULT_REDUCE_GPU_CORES);
  }

  return gcores;
}
 
源代码25 项目: hadoop   文件: TaskAttemptImpl.java
private WrappedProgressSplitsBlock getProgressSplitBlock() {
  readLock.lock();
  try {
    if (progressSplitBlock == null) {
      progressSplitBlock = new WrappedProgressSplitsBlock(conf.getInt(
          MRJobConfig.MR_AM_NUM_PROGRESS_SPLITS,
          MRJobConfig.DEFAULT_MR_AM_NUM_PROGRESS_SPLITS));
    }
    return progressSplitBlock;
  } finally {
    readLock.unlock();
  }
}
 
源代码26 项目: hadoop   文件: TestJobImpl.java
@Test(timeout=20000)
public void testKilledDuringKillAbort() throws Exception {
  Configuration conf = new Configuration();
  conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
  AsyncDispatcher dispatcher = new AsyncDispatcher();
  dispatcher.init(conf);
  dispatcher.start();
  OutputCommitter committer = new StubbedOutputCommitter() {
    @Override
    public synchronized void abortJob(JobContext jobContext, State state)
        throws IOException {
      while (!Thread.interrupted()) {
        try {
          wait();
        } catch (InterruptedException e) {
        }
      }
    }
  };
  CommitterEventHandler commitHandler =
      createCommitterEventHandler(dispatcher, committer);
  commitHandler.init(conf);
  commitHandler.start();

  JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
  JobId jobId = job.getID();
  job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
  assertJobState(job, JobStateInternal.INITED);
  job.handle(new JobStartEvent(jobId));
  assertJobState(job, JobStateInternal.SETUP);

  job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
  assertJobState(job, JobStateInternal.KILL_ABORT);

  job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
  assertJobState(job, JobStateInternal.KILLED);
  dispatcher.stop();
  commitHandler.stop();
}
 
源代码27 项目: big-c   文件: TestFileOutputCommitter.java
public void testInvalidVersionNumber() throws IOException {
  Job job = Job.getInstance();
  FileOutputFormat.setOutputPath(job, outDir);
  Configuration conf = job.getConfiguration();
  conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
  conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 3);
  TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
  try {
    new FileOutputCommitter(outDir, tContext);
    fail("should've thrown an exception!");
  } catch (IOException e) {
    //test passed
  }
}
 
源代码28 项目: hadoop   文件: TestTaskHeartbeatHandler.java
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testTimeout() throws InterruptedException {
  EventHandler mockHandler = mock(EventHandler.class);
  Clock clock = new SystemClock();
  TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1);
  
  
  Configuration conf = new Configuration();
  conf.setInt(MRJobConfig.TASK_TIMEOUT, 10); //10 ms
  conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 10); //10 ms
  
  hb.init(conf);
  hb.start();
  try {
    ApplicationId appId = ApplicationId.newInstance(0l, 5);
    JobId jobId = MRBuilderUtils.newJobId(appId, 4);
    TaskId tid = MRBuilderUtils.newTaskId(jobId, 3, TaskType.MAP);
    TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 2);
    hb.register(taid);
    Thread.sleep(100);
    //Events only happen when the task is canceled
    verify(mockHandler, times(2)).handle(any(Event.class));
  } finally {
    hb.stop();
  }
}
 
源代码29 项目: rya   文件: JoinSelectStatisticsTest.java
@Override
public int run(String[] args) throws Exception {

    Configuration conf = getConf();
    String outpath = conf.get(OUTPUTPATH);
    
    Job job = new Job(conf, this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
    job.setJarByClass(this.getClass());
    conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
    
    MultipleInputs.addInputPath(job, new Path(PROSPECTSOUT.getAbsolutePath()), 
            SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
    MultipleInputs.addInputPath(job,new Path(SPOOUT.getAbsolutePath()) , 
            SequenceFileInputFormat.class, JoinSelectAggregateMapper.class);
    job.setMapOutputKeyClass(CompositeType.class);
    job.setMapOutputValueClass(TripleCard.class);

    tempDir = new File(File.createTempFile(outpath, "txt").getParentFile(), System.currentTimeMillis() + "");
    SequenceFileOutputFormat.setOutputPath(job, new Path(tempDir.getAbsolutePath()));
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setOutputKeyClass(TripleEntry.class);
    job.setOutputValueClass(CardList.class);


    job.setSortComparatorClass(JoinSelectSortComparator.class);
    job.setGroupingComparatorClass(JoinSelectGroupComparator.class);
    job.setPartitionerClass(JoinSelectPartitioner.class);
    job.setReducerClass(JoinReducer.class);
    job.setNumReduceTasks(32);
    job.waitForCompletion(true);
    
    return job.isSuccessful() ? 0 : 1;          
}
 
源代码30 项目: hadoop   文件: TestStreamAggregate.java
protected String[] genArgs() {
  return new String[] {
    "-input", INPUT_FILE.getAbsolutePath(),
    "-output", OUTPUT_DIR.getAbsolutePath(),
    "-mapper", map,
    "-reducer", "aggregate",
    "-jobconf", MRJobConfig.PRESERVE_FAILED_TASK_FILES + "=true",
    "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
  };
}
 
 同包方法