类org.apache.hadoop.mapreduce.InputFormat源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.InputFormat的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: kylin-on-parquet-v2   文件: AbstractHadoopJob.java
public static double getTotalMapInputMB(Job job)
        throws ClassNotFoundException, IOException, InterruptedException, JobException {
    if (job == null) {
        throw new JobException("Job is null");
    }

    long mapInputBytes = 0;
    InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
    for (InputSplit split : input.getSplits(job)) {
        mapInputBytes += split.getLength();
    }
    
    // 0 input bytes is possible when the segment range hits no partition on a partitioned hive table (KYLIN-2470) 
    if (mapInputBytes == 0) {
        logger.warn("Map input splits are 0 bytes, something is wrong?");
    }
    
    double totalMapInputMB = (double) mapInputBytes / 1024 / 1024;
    return totalMapInputMB;
}
 
源代码2 项目: beam   文件: HadoopFormatIOReadTest.java
/**
 * This test validates behavior of {@link
 * HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} when Hadoop
 * InputFormat's {@link InputFormat#getSplits(JobContext)} getSplits(JobContext)} returns NULL
 * value.
 */
@Test
public void testComputeSplitsIfGetSplitsReturnsNullValue() throws Exception {
  InputFormat<Text, Employee> mockInputFormat = Mockito.mock(EmployeeInputFormat.class);
  SerializableSplit mockInputSplit = Mockito.mock(SerializableSplit.class);
  Mockito.when(mockInputFormat.getSplits(Mockito.any(JobContext.class))).thenReturn(null);
  HadoopInputFormatBoundedSource<Text, Employee> hifSource =
      new HadoopInputFormatBoundedSource<>(
          serConf,
          WritableCoder.of(Text.class),
          AvroCoder.of(Employee.class),
          null, // No key translation required.
          null, // No value translation required.
          mockInputSplit);
  thrown.expect(IOException.class);
  thrown.expectMessage("Error in computing splits, getSplits() returns null.");
  hifSource.setInputFormatObj(mockInputFormat);
  hifSource.computeSplitsIfNecessary();
}
 
源代码3 项目: hbase   文件: TestTableInputFormat.java
void testInputFormat(Class<? extends InputFormat> clazz)
    throws IOException, InterruptedException, ClassNotFoundException {
  final Job job = MapreduceTestingShim.createJob(UTIL.getConfiguration());
  job.setInputFormatClass(clazz);
  job.setOutputFormatClass(NullOutputFormat.class);
  job.setMapperClass(ExampleVerifier.class);
  job.setNumReduceTasks(0);

  LOG.debug("submitting job.");
  assertTrue("job failed!", job.waitForCompletion(true));
  assertEquals("Saw the wrong number of instances of the filtered-for row.", 2, job.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":row", "aaa").getValue());
  assertEquals("Saw any instances of the filtered out row.", 0, job.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":row", "bbb").getValue());
  assertEquals("Saw the wrong number of instances of columnA.", 1, job.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnA").getValue());
  assertEquals("Saw the wrong number of instances of columnB.", 1, job.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":family", "columnB").getValue());
  assertEquals("Saw the wrong count of values for the filtered-for row.", 2, job.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":value", "value aaa").getValue());
  assertEquals("Saw the wrong count of values for the filtered-out row.", 0, job.getCounters()
      .findCounter(TestTableInputFormat.class.getName() + ":value", "value bbb").getValue());
}
 
源代码4 项目: tinkerpop   文件: HadoopElementIterator.java
public HadoopElementIterator(final HadoopGraph graph) {
    try {
        this.graph = graph;
        final Configuration configuration = ConfUtil.makeHadoopConfiguration(this.graph.configuration());
        final InputFormat<NullWritable, VertexWritable> inputFormat = ConfUtil.getReaderAsInputFormat(configuration);
        if (inputFormat instanceof FileInputFormat) {
            final Storage storage = FileSystemStorage.open(configuration);
            if (!this.graph.configuration().containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
                return; // there is no input location and thus, no data (empty graph)
            if (!Constants.getSearchGraphLocation(this.graph.configuration().getInputLocation(), storage).isPresent())
                return; // there is no data at the input location (empty graph)
            configuration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, Constants.getSearchGraphLocation(this.graph.configuration().getInputLocation(), storage).get());
        }
        final List<InputSplit> splits = inputFormat.getSplits(new JobContextImpl(configuration, new JobID(UUID.randomUUID().toString(), 1)));
        for (final InputSplit split : splits) {
            this.readers.add(inputFormat.createRecordReader(split, new TaskAttemptContextImpl(configuration, new TaskAttemptID())));
        }
    } catch (final Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
 
源代码5 项目: beam   文件: HadoopFormatIOReadTest.java
/**
 * This test validates functionality of {@link HadoopFormatIO.Read#validateTransform()
 * Read.validateTransform()} function when myKeyTranslate's (simple function provided by user for
 * key translation) input type is not same as Hadoop InputFormat's keyClass(Which is property set
 * in configuration as "key.class").
 */
@Test
public void testReadValidationFailsWithWrongInputTypeKeyTranslationFunction() {
  SimpleFunction<LongWritable, String> myKeyTranslateWithWrongInputType =
      new SimpleFunction<LongWritable, String>() {
        @Override
        public String apply(LongWritable input) {
          return input.toString();
        }
      };
  HadoopFormatIO.Read<String, Employee> read =
      HadoopFormatIO.<String, Employee>read()
          .withConfiguration(serConf.get())
          .withKeyTranslation(myKeyTranslateWithWrongInputType);
  thrown.expect(IllegalArgumentException.class);
  thrown.expectMessage(
      String.format(
          "Key translation's input type is not same as hadoop InputFormat : %s key "
              + "class : %s",
          serConf.get().getClass("mapreduce.job.inputformat.class", InputFormat.class),
          serConf.get().getClass("key.class", Object.class)));
  read.validateTransform();
}
 
源代码6 项目: beam   文件: HadoopFormatIOCassandraIT.java
/**
 * Returns Hadoop configuration for reading data from Cassandra. To read data from Cassandra using
 * HadoopFormatIO, following properties must be set: InputFormat class, InputFormat key class,
 * InputFormat value class, Thrift address, Thrift port, partitioner class, keyspace and
 * columnfamily name.
 */
private static Configuration getConfiguration(HadoopFormatIOTestOptions options) {
  Configuration conf = new Configuration();
  conf.set(CASSANDRA_THRIFT_PORT_PROPERTY, options.getCassandraServerPort().toString());
  conf.set(CASSANDRA_THRIFT_ADDRESS_PROPERTY, options.getCassandraServerIp());
  conf.set(CASSANDRA_PARTITIONER_CLASS_PROPERTY, CASSANDRA_PARTITIONER_CLASS_VALUE);
  conf.set(CASSANDRA_KEYSPACE_PROPERTY, CASSANDRA_KEYSPACE);
  conf.set(CASSANDRA_COLUMNFAMILY_PROPERTY, CASSANDRA_TABLE);
  // Set user name and password if Cassandra instance has security configured.
  conf.set(USERNAME, options.getCassandraUserName());
  conf.set(PASSWORD, options.getCassandraPassword());
  conf.set(INPUT_KEYSPACE_USERNAME_CONFIG, options.getCassandraUserName());
  conf.set(INPUT_KEYSPACE_PASSWD_CONFIG, options.getCassandraPassword());
  conf.setClass(
      "mapreduce.job.inputformat.class",
      org.apache.cassandra.hadoop.cql3.CqlInputFormat.class,
      InputFormat.class);
  conf.setClass("key.class", Long.class, Object.class);
  conf.setClass("value.class", Row.class, Object.class);
  return conf;
}
 
源代码7 项目: titan1withtp3.1   文件: HadoopScanRunner.java
public static ScanMetrics runScanJob(ScanJob scanJob, Configuration conf, String confRootField,
                                 org.apache.hadoop.conf.Configuration hadoopConf,
                                 Class<? extends InputFormat> inputFormat)
        throws IOException, InterruptedException, ClassNotFoundException {

    ModifiableHadoopConfiguration scanConf =
            ModifiableHadoopConfiguration.of(TitanHadoopConfiguration.MAPRED_NS, hadoopConf);

    tryToLoadClassByName(scanJob);

    // Set the ScanJob class
    scanConf.set(TitanHadoopConfiguration.SCAN_JOB_CLASS, scanJob.getClass().getName());

    String jobName = HadoopScanMapper.class.getSimpleName() + "[" + scanJob + "]";

    return runJob(conf, confRootField, hadoopConf, inputFormat, jobName, HadoopScanMapper.class);
}
 
源代码8 项目: sqoop-on-spark   文件: MRJobTestUtil.java
@SuppressWarnings("deprecation")
public static boolean runJob(Configuration conf,
    Class<? extends InputFormat<?,?>> inputFormatClass,
    Class<? extends Mapper<?,?,?,?>> mapperClass,
    Class<? extends OutputFormat<?,?>> outputFormatClass) throws IOException,
    InterruptedException, ClassNotFoundException {
  Job job = new Job(conf);
  job.setInputFormatClass(inputFormatClass);
  job.setMapperClass(mapperClass);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(NullWritable.class);
  job.setOutputFormatClass(outputFormatClass);
  
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(NullWritable.class);

  boolean ret = job.waitForCompletion(true);

  // Hadoop 1.0 (and 0.20) have nasty bug when job committer is not called in
  // LocalJobRuner
  if (isHadoop1()) {
    callOutputCommitter(job, outputFormatClass);
  }

  return ret;
}
 
源代码9 项目: Flink-CEPplus   文件: HadoopInputFormatTest.java
private HadoopInputFormat<String, Long> setupHadoopInputFormat(InputFormat<String, Long> inputFormat, Job job,
																RecordReader<String, Long> recordReader) {

	HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat,
			String.class, Long.class, job);
	hadoopInputFormat.recordReader = recordReader;

	return hadoopInputFormat;
}
 
源代码10 项目: flink   文件: HadoopInputFormatTest.java
private HadoopInputFormat<String, Long> setupHadoopInputFormat(InputFormat<String, Long> inputFormat, Job job,
																RecordReader<String, Long> recordReader) {

	HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat,
			String.class, Long.class, job);
	hadoopInputFormat.recordReader = recordReader;

	return hadoopInputFormat;
}
 
源代码11 项目: kylin-on-parquet-v2   文件: AbstractHadoopJob.java
protected int getMapInputSplitCount()
        throws ClassNotFoundException, JobException, IOException, InterruptedException {
    if (job == null) {
        throw new JobException("Job is null");
    }
    InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
    return input.getSplits(job).size();
}
 
源代码12 项目: spork   文件: IndexedStorage.java
/**
 * IndexableLoadFunc interface implementation
 */
@Override
public void initialize(Configuration conf) throws IOException {
    try {
        InputFormat inputFormat = this.getInputFormat();
        TaskAttemptID id = HadoopShims.getNewTaskAttemptID();

        if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
                    conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY, System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
        }
        List<FileSplit> fileSplits = inputFormat.getSplits(HadoopShims.createJobContext(conf, null));
        this.readers = new IndexedStorageRecordReader[fileSplits.size()];

        int idx = 0;
        Iterator<FileSplit> it = fileSplits.iterator();
        while (it.hasNext()) {
            FileSplit fileSplit = it.next();
            TaskAttemptContext context = HadoopShims.createTaskAttemptContext(conf, id);
            IndexedStorageRecordReader r = (IndexedStorageRecordReader) inputFormat.createRecordReader(fileSplit, context);
            r.initialize(fileSplit, context);
            this.readers[idx] = r;
            idx++;
        }

        Arrays.sort(this.readers, this.readerComparator);
    } catch (InterruptedException e) {
        throw new IOException(e);
    }
}
 
源代码13 项目: iceberg   文件: IcebergStorage.java
@Override
public InputFormat getInputFormat() {
  LOG.info("[{}]: getInputFormat()", signature);
  String location = locations.get(signature);

  return new IcebergPigInputFormat(tables.get(location), signature);
}
 
源代码14 项目: spork   文件: CSVLoader.java
@SuppressWarnings("rawtypes")
@Override
public InputFormat getInputFormat() throws IOException {
    if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
        return new Bzip2TextInputFormat();
    } else {
        return new PigTextInputFormat();
    }
}
 
源代码15 项目: twister2   文件: BatchTSetEnvironment.java
public <K, V, F extends InputFormat<K, V>, I> SourceTSet<I> createHadoopSource(
    Configuration configuration, Class<F> inputFormat, int parallel,
    MapFunc<I, Tuple<K, V>> mapFunc) {
  SourceTSet<I> sourceT = new SourceTSet<>(this,
      new HadoopSourceWithMap<>(configuration, inputFormat, mapFunc), parallel);
  getGraph().addSourceTSet(sourceT);

  return sourceT;
}
 
源代码16 项目: twister2   文件: BatchTSetEnvironment.java
public <K, V,
    F extends InputFormat<K, V>, K2, V2> KeyedSourceTSet<K2, V2> createKeyedHadoopSource(
    Configuration configuration, Class<F> inputFormat, int parallel,
    MapFunc<Tuple<K2, V2>, Tuple<K, V>> mapFunc) {
  KeyedSourceTSet<K2, V2> sourceT = new KeyedSourceTSet<>(this,
      new HadoopSourceWithMap<>(configuration, inputFormat, mapFunc), parallel);
  getGraph().addSourceTSet(sourceT);

  return sourceT;
}
 
@Override
protected Class<? extends InputFormat> getInputFormatClass()
    throws ClassNotFoundException {
  if (isHCatJob) {
    return SqoopHCatUtilities.getInputFormatClass();
  }
  return super.getInputFormatClass();
}
 
源代码18 项目: tinkerpop   文件: InputFormatRDD.java
@Override
public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
    final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
    return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
            (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class),
            NullWritable.class,
            VertexWritable.class)
            .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())));
}
 
public JobBase(final SqoopOptions opts,
    final Class<? extends Mapper> mapperClass,
    final Class<? extends InputFormat> inputFormatClass,
    final Class<? extends OutputFormat> outputFormatClass) {

  this.options = opts;
  this.mapperClass = mapperClass;
  this.inputFormatClass = inputFormatClass;
  this.outputFormatClass = outputFormatClass;
  isHCatJob = options.getHCatTableName() != null;
}
 
源代码20 项目: beam   文件: HadoopFormatIO.java
@SuppressWarnings("unchecked")
private HadoopInputFormatReader(
    HadoopInputFormatBoundedSource<K, V> source,
    @Nullable SimpleFunction keyTranslationFunction,
    @Nullable SimpleFunction valueTranslationFunction,
    SerializableSplit split,
    InputFormat inputFormatObj,
    TaskAttemptContext taskAttemptContext) {
  this.source = source;
  this.keyTranslationFunction = keyTranslationFunction;
  this.valueTranslationFunction = valueTranslationFunction;
  this.split = split;
  this.inputFormatObj = inputFormatObj;
  this.taskAttemptContext = taskAttemptContext;
}
 
/**
 * Configure the inputformat to use for the job.
 */
protected void configureInputFormat(Job job, String tableName,
    String tableClassName, String splitByCol)
    throws ClassNotFoundException, IOException {
  //TODO: 'splitByCol' is import-job specific; lift it out of this API.
  Class<? extends InputFormat> ifClass = getInputFormatClass();
  LOG.debug("Using InputFormat: " + ifClass);
  job.setInputFormatClass(ifClass);
}
 
源代码22 项目: spork   文件: HBaseStorage.java
@Override
public InputFormat getInputFormat() {
    TableInputFormat inputFormat = new HBaseTableIFBuilder()
    .withLimit(limit_)
    .withGt(gt_)
    .withGte(gte_)
    .withLt(lt_)
    .withLte(lte_)
    .withConf(m_conf)
    .build();
    inputFormat.setScan(scan);
    return inputFormat;
}
 
源代码23 项目: beam   文件: HadoopFormatIO.java
/**
 * Creates instance of InputFormat class. The InputFormat class name is specified in the Hadoop
 * configuration.
 */
@SuppressWarnings("WeakerAccess")
protected void createInputFormatInstance() throws IOException {
  if (inputFormatObj == null) {
    try {
      taskAttemptContext = new TaskAttemptContextImpl(conf.get(), new TaskAttemptID());
      inputFormatObj =
          (InputFormat<?, ?>)
              conf.get()
                  .getClassByName(conf.get().get("mapreduce.job.inputformat.class"))
                  .getConstructor()
                  .newInstance();
      /*
       * If InputFormat explicitly implements interface {@link Configurable}, then setConf()
       * method of {@link Configurable} needs to be explicitly called to set all the
       * configuration parameters. For example: InputFormat classes which implement Configurable
       * are {@link org.apache.hadoop.mapreduce.lib.db.DBInputFormat DBInputFormat}, {@link
       * org.apache.hadoop.hbase.mapreduce.TableInputFormat TableInputFormat}, etc.
       */
      if (Configurable.class.isAssignableFrom(inputFormatObj.getClass())) {
        ((Configurable) inputFormatObj).setConf(conf.get());
      }
    } catch (InstantiationException
        | IllegalAccessException
        | ClassNotFoundException
        | NoSuchMethodException
        | InvocationTargetException e) {
      throw new IOException("Unable to create InputFormat object: ", e);
    }
  }
}
 
@Override
protected Class<? extends InputFormat> getInputFormatClass()
    throws ClassNotFoundException {
  Class<? extends InputFormat> configuredIF = super.getInputFormatClass();
  if (null == configuredIF) {
    return ExportInputFormat.class;
  } else {
    return configuredIF;
  }
}
 
源代码25 项目: mnemonic   文件: MneMapreduceLongDataTest.java
@Test(enabled = true, dependsOnMethods = { "testWriteLongData" })
public void testReadLongData() throws Exception {
  long sum = 0L;
  long reccnt = 0L;
  File folder = new File(m_workdir.toString());
  File[] listfiles = folder.listFiles();
  for (int idx = 0; idx < listfiles.length; ++idx) {
    if (listfiles[idx].isFile()
        && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
        && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
      System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
      FileSplit split = new FileSplit(
          new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
      InputFormat<NullWritable, MneDurableInputValue<Long>> inputFormat =
          new MneInputFormat<MneDurableInputValue<Long>, Long>();
      RecordReader<NullWritable, MneDurableInputValue<Long>> reader =
          inputFormat.createRecordReader(split, m_tacontext);
      MneDurableInputValue<Long> mdval = null;
      while (reader.nextKeyValue()) {
        mdval = reader.getCurrentValue();
        sum += mdval.getValue();
        ++reccnt;
      }
      reader.close();
    }
  }
  AssertJUnit.assertEquals(m_sum, sum);
  AssertJUnit.assertEquals(m_reccnt, reccnt);
  System.out.println(String.format("The checksum of long data is %d", sum));
}
 
public ImportJobBase(final SqoopOptions opts,
    final Class<? extends Mapper> mapperClass,
    final Class<? extends InputFormat> inputFormatClass,
    final Class<? extends OutputFormat> outputFormatClass,
    final ImportJobContext context) {
  super(opts, mapperClass, inputFormatClass, outputFormatClass, context);
}
 
源代码27 项目: big-c   文件: DelegatingRecordReader.java
/**
 * Constructs the DelegatingRecordReader.
 * 
 * @param split TaggegInputSplit object
 * @param context TaskAttemptContext object
 *  
 * @throws IOException
 * @throws InterruptedException
 */
@SuppressWarnings("unchecked")
public DelegatingRecordReader(InputSplit split, TaskAttemptContext context)
    throws IOException, InterruptedException {
  // Find the InputFormat and then the RecordReader from the
  // TaggedInputSplit.
  TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
  InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
      .newInstance(taggedInputSplit.getInputFormatClass(), context
          .getConfiguration());
  originalRR = inputFormat.createRecordReader(taggedInputSplit
      .getInputSplit(), context);
}
 
/**
 * Allow the user to inject custom mapper, input, and output formats
 * into the importTable() process.
 */
@Override
@SuppressWarnings("unchecked")
public void importTable(ImportJobContext context)
    throws IOException, ImportException {

  SqoopOptions options = context.getOptions();
  Configuration conf = options.getConf();

  Class<? extends Mapper> mapperClass = (Class<? extends Mapper>)
      conf.getClass(MAPPER_KEY, Mapper.class);
  Class<? extends InputFormat> ifClass = (Class<? extends InputFormat>)
      conf.getClass(INPUT_FORMAT_KEY, TextInputFormat.class);
  Class<? extends OutputFormat> ofClass = (Class<? extends OutputFormat>)
      conf.getClass(OUTPUT_FORMAT_KEY, TextOutputFormat.class);

  Class<? extends ImportJobBase> jobClass = (Class<? extends ImportJobBase>)
      conf.getClass(IMPORT_JOB_KEY, ImportJobBase.class);

  String tableName = context.getTableName();

  // Instantiate the user's chosen ImportJobBase instance.
  ImportJobBase importJob = ReflectionUtils.newInstance(jobClass, conf);

  // And configure the dependencies to inject
  importJob.setOptions(options);
  importJob.setMapperClass(mapperClass);
  importJob.setInputFormatClass(ifClass);
  importJob.setOutputFormatClass(ofClass);

  importJob.runImport(tableName, context.getJarFile(),
      getSplitColumn(options, tableName), conf);
}
 
源代码29 项目: hadoop   文件: TestCombineFileInputFormat.java
@Test
public void testRecordReaderInit() throws InterruptedException, IOException {
  // Test that we properly initialize the child recordreader when
  // CombineFileInputFormat and CombineFileRecordReader are used.

  TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
  Configuration conf1 = new Configuration();
  conf1.set(DUMMY_KEY, "STATE1");
  TaskAttemptContext context1 = new TaskAttemptContextImpl(conf1, taskId);

  // This will create a CombineFileRecordReader that itself contains a
  // DummyRecordReader.
  InputFormat inputFormat = new ChildRRInputFormat();

  Path [] files = { new Path("file1") };
  long [] lengths = { 1 };

  CombineFileSplit split = new CombineFileSplit(files, lengths);

  RecordReader rr = inputFormat.createRecordReader(split, context1);
  assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);

  // Verify that the initial configuration is the one being used.
  // Right after construction the dummy key should have value "STATE1"
  assertEquals("Invalid initial dummy key value", "STATE1",
    rr.getCurrentKey().toString());

  // Switch the active context for the RecordReader...
  Configuration conf2 = new Configuration();
  conf2.set(DUMMY_KEY, "STATE2");
  TaskAttemptContext context2 = new TaskAttemptContextImpl(conf2, taskId);
  rr.initialize(split, context2);

  // And verify that the new context is updated into the child record reader.
  assertEquals("Invalid secondary dummy key value", "STATE2",
    rr.getCurrentKey().toString());
}
 
源代码30 项目: hadoop   文件: TestCombineFileInputFormat.java
@Test
public void testReinit() throws Exception {
  // Test that a split containing multiple files works correctly,
  // with the child RecordReader getting its initialize() method
  // called a second time.
  TaskAttemptID taskId = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 0);
  Configuration conf = new Configuration();
  TaskAttemptContext context = new TaskAttemptContextImpl(conf, taskId);

  // This will create a CombineFileRecordReader that itself contains a
  // DummyRecordReader.
  InputFormat inputFormat = new ChildRRInputFormat();

  Path [] files = { new Path("file1"), new Path("file2") };
  long [] lengths = { 1, 1 };

  CombineFileSplit split = new CombineFileSplit(files, lengths);
  RecordReader rr = inputFormat.createRecordReader(split, context);
  assertTrue("Unexpected RR type!", rr instanceof CombineFileRecordReader);

  // first initialize() call comes from MapTask. We'll do it here.
  rr.initialize(split, context);

  // First value is first filename.
  assertTrue(rr.nextKeyValue());
  assertEquals("file1", rr.getCurrentValue().toString());

  // The inner RR will return false, because it only emits one (k, v) pair.
  // But there's another sub-split to process. This returns true to us.
  assertTrue(rr.nextKeyValue());
  
  // And the 2nd rr will have its initialize method called correctly.
  assertEquals("file2", rr.getCurrentValue().toString());
  
  // But after both child RR's have returned their singleton (k, v), this
  // should also return false.
  assertFalse(rr.nextKeyValue());
}
 
 同包方法