类org.apache.hadoop.mapred.lib.CombineFileSplit源码实例Demo

下面列出了怎么用org.apache.hadoop.mapred.lib.CombineFileSplit的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: gemfirexd-oss   文件: FragmenterJUnitTest.java
private void verifyFragments(InputSplit[] fs, List<Fragment> fragments) throws Exception {
  log("Total fragments [expected, actual]: " + fs.length + ", " + fragments.size());
  assertEquals(fs.length, fragments.size());

  for (int i = 0; i < fs.length; i++) {
    CombineFileSplit split = (CombineFileSplit) fs[i];
    Fragment frag = fragments.get(i);

    log("Number of hosts hosting the fragment [expected, actual]: " + fs[i].getLocations().length + ",  " +  frag.getReplicas().length);
    assertEquals(fs[i].getLocations().length, frag.getReplicas().length);

    log("Fragment source name [expected, actual]: " + split.getPath(0).toString() +  ",  " + frag.getSourceName());
    assertEquals(split.getPath(0).toString(), "/" + frag.getSourceName());

    for (int j = 0; j < frag.getReplicas().length; j++) {
      log("Fragment host [expected, actual]: " + fs[i].getLocations()[j] + ",  " + frag.getReplicas()[j]);
      assertEquals(fs[i].getLocations()[j], frag.getReplicas()[j]);

      log(" User data [expected, actual]: " + null + ",  " + frag.getUserData());
      assertEquals(null, frag.getUserData());
    }
  }
}
 
源代码2 项目: gemfirexd-oss   文件: RowInputFormat.java
private InputSplit[] createSplits(JobConf job, Collection<FileStatus> hoplogs)
    throws IOException {
  if (hoplogs == null || hoplogs.isEmpty()) {
    return new InputSplit[0];
  }
  
  HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs);
  List<org.apache.hadoop.mapreduce.InputSplit> mr2Splits = splitter.getOptimizedSplits(conf);
  InputSplit[] splits = new InputSplit[mr2Splits.size()];
  int i = 0;
  for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) {
    org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit;
    mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit;
    
    CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(),
        mr2Spit.getStartOffsets(), mr2Spit.getLengths(),
        mr2Spit.getLocations());
    splits[i] = getSplit(split);
    i++;
  }

  return splits;
}
 
源代码3 项目: gemfirexd-oss   文件: GemFireXDFragmenter.java
@Override
  public List<Fragment> getFragments() throws IOException {
    InputSplit[] splits;
//    try {
      splits = getSplits();
//    } finally {
//      this.gfxdManager.resetLonerSystemInUse();
//    }

    for (InputSplit split : splits) {
      CombineFileSplit cSplit = (CombineFileSplit)split;
      
      if (cSplit.getLength() > 0L) {
        String filepath = cSplit.getPath(0).toUri().getPath();
        filepath = filepath.substring(1);
        if (this.gfxdManager.getLogger().isDebugEnabled()) {
          this.gfxdManager.getLogger().debug("fragment-filepath " + filepath);
        }
        byte[] data = this.gfxdManager.populateUserData(cSplit);
        this.fragments.add(new Fragment(filepath, cSplit.getLocations(), data));
      }
    }
    return this.fragments;
  }
 
源代码4 项目: gemfirexd-oss   文件: GFInputFormat.java
/**
 * Creates an input split for every block occupied by hoplogs of the input
 * regions
 * 
 * @param job 
 * @param hoplogs
 * @return array of input splits of type file input split
 * @throws IOException
 */
private InputSplit[] createSplits(JobConf job, Collection<FileStatus> hoplogs)
    throws IOException {
  if (hoplogs == null || hoplogs.isEmpty()) {
    return new InputSplit[0];
  }

  HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs);
  List<org.apache.hadoop.mapreduce.InputSplit> mr2Splits = splitter.getOptimizedSplits(conf);
  InputSplit[] splits = new InputSplit[mr2Splits.size()];
  int i = 0;
  for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) {
    org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit;
    mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit;
    
    CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(),
        mr2Spit.getStartOffsets(), mr2Spit.getLengths(),
        mr2Spit.getLocations());
    splits[i] = split;
    i++;
  }

  return splits;
}
 
static RecordReader<NullWritable, DynamoDBItemWritable> getRecordReader(
    InputSplit inputSplit, JobConf job, Reporter reporter) throws IOException {
  // CombineFileSplit indicates the new export format which includes a manifest file
  if (inputSplit instanceof CombineFileSplit) {
    int version = job.getInt(DynamoDBConstants.EXPORT_FORMAT_VERSION, -1);
    if (version != ExportManifestRecordWriter.FORMAT_VERSION) {
      throw new IOException("Unknown version: " + job.get(DynamoDBConstants
          .EXPORT_FORMAT_VERSION));
    }
    return new ImportCombineFileRecordReader((CombineFileSplit) inputSplit, job, reporter);
  } else if (inputSplit instanceof FileSplit) {
    // FileSplit indicates the old data pipeline format which doesn't include a manifest file
    Path path = ((FileSplit) inputSplit).getPath();
    return new ImportRecordReader(job, path);
  } else {
    throw new IOException("Expecting CombineFileSplit or FileSplit but the input split type is:"
        + " " + inputSplit.getClass());
  }
}
 
源代码6 项目: hudi   文件: HoodieCombineHiveInputFormat.java
public CombineHiveInputSplit(JobConf job, CombineFileSplit inputSplitShim,
    Map<Path, PartitionDesc> pathToPartitionInfo) throws IOException {
  this.inputSplitShim = inputSplitShim;
  this.pathToPartitionInfo = pathToPartitionInfo;
  if (job != null) {
    if (this.pathToPartitionInfo == null) {
      this.pathToPartitionInfo = Utilities.getMapWork(job).getPathToPartitionInfo();
    }

    // extract all the inputFormatClass names for each chunk in the
    // CombinedSplit.
    Path[] ipaths = inputSplitShim.getPaths();
    if (ipaths.length > 0) {
      PartitionDesc part = getPartitionFromPath(this.pathToPartitionInfo, ipaths[0],
          IOPrepareCache.get().getPartitionDescMap());
      inputFormatClassName = part.getInputFileFormatClass().getName();
    }
  }
}
 
源代码7 项目: hudi   文件: HoodieCombineHiveInputFormat.java
@Override
public RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter reporter,
    Class<RecordReader<K, V>> rrClass) throws IOException {
  isRealTime = Boolean.valueOf(job.get("hudi.hive.realtime", "false"));
  if (isRealTime) {
    List<RecordReader> recordReaders = new LinkedList<>();
    ValidationUtils.checkArgument(split instanceof HoodieCombineRealtimeFileSplit, "Only "
        + HoodieCombineRealtimeFileSplit.class.getName() + " allowed, found " + split.getClass().getName());
    for (InputSplit inputSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) {
      if (split.getPaths().length == 0) {
        continue;
      }
      FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(split.getPath(0).toString(), true, job);
      recordReaders.add(inputFormat.getRecordReader(inputSplit, job, reporter));
    }
    return new HoodieCombineRealtimeRecordReader(job, split, recordReaders);
  }
  return new HadoopShimsSecure.CombineFileRecordReader(job, split, reporter, rrClass);
}
 
源代码8 项目: gemfirexd-oss   文件: FragmenterJUnitTest.java
private void verifyFragments(InputSplit[] fs, List<Fragment> fragments) throws Exception {
  log("Total fragments [expected, actual]: " + fs.length + ", " + fragments.size());
  assertEquals(fs.length, fragments.size());

  for (int i = 0; i < fs.length; i++) {
    CombineFileSplit split = (CombineFileSplit) fs[i];
    Fragment frag = fragments.get(i);

    log("Number of hosts hosting the fragment [expected, actual]: " + fs[i].getLocations().length + ",  " +  frag.getReplicas().length);
    assertEquals(fs[i].getLocations().length, frag.getReplicas().length);

    log("Fragment source name [expected, actual]: " + split.getPath(0).toString() +  ",  " + frag.getSourceName());
    assertEquals(split.getPath(0).toString(), "/" + frag.getSourceName());

    for (int j = 0; j < frag.getReplicas().length; j++) {
      log("Fragment host [expected, actual]: " + fs[i].getLocations()[j] + ",  " + frag.getReplicas()[j]);
      assertEquals(fs[i].getLocations()[j], frag.getReplicas()[j]);

      log(" User data [expected, actual]: " + null + ",  " + frag.getUserData());
      assertEquals(null, frag.getUserData());
    }
  }
}
 
源代码9 项目: gemfirexd-oss   文件: RowInputFormat.java
private InputSplit[] createSplits(JobConf job, Collection<FileStatus> hoplogs)
    throws IOException {
  if (hoplogs == null || hoplogs.isEmpty()) {
    return new InputSplit[0];
  }
  
  HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs);
  List<org.apache.hadoop.mapreduce.InputSplit> mr2Splits = splitter.getOptimizedSplits(conf);
  InputSplit[] splits = new InputSplit[mr2Splits.size()];
  int i = 0;
  for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) {
    org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit;
    mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit;
    
    CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(),
        mr2Spit.getStartOffsets(), mr2Spit.getLengths(),
        mr2Spit.getLocations());
    splits[i] = getSplit(split);
    i++;
  }

  return splits;
}
 
源代码10 项目: gemfirexd-oss   文件: GemFireXDFragmenter.java
@Override
  public List<Fragment> getFragments() throws IOException {
    InputSplit[] splits;
//    try {
      splits = getSplits();
//    } finally {
//      this.gfxdManager.resetLonerSystemInUse();
//    }

    for (InputSplit split : splits) {
      CombineFileSplit cSplit = (CombineFileSplit)split;
      
      if (cSplit.getLength() > 0L) {
        String filepath = cSplit.getPath(0).toUri().getPath();
        filepath = filepath.substring(1);
        if (this.gfxdManager.getLogger().isDebugEnabled()) {
          this.gfxdManager.getLogger().debug("fragment-filepath " + filepath);
        }
        byte[] data = this.gfxdManager.populateUserData(cSplit);
        this.fragments.add(new Fragment(filepath, cSplit.getLocations(), data));
      }
    }
    return this.fragments;
  }
 
源代码11 项目: gemfirexd-oss   文件: GFInputFormat.java
/**
 * Creates an input split for every block occupied by hoplogs of the input
 * regions
 * 
 * @param job 
 * @param hoplogs
 * @return array of input splits of type file input split
 * @throws IOException
 */
private InputSplit[] createSplits(JobConf job, Collection<FileStatus> hoplogs)
    throws IOException {
  if (hoplogs == null || hoplogs.isEmpty()) {
    return new InputSplit[0];
  }

  HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs);
  List<org.apache.hadoop.mapreduce.InputSplit> mr2Splits = splitter.getOptimizedSplits(conf);
  InputSplit[] splits = new InputSplit[mr2Splits.size()];
  int i = 0;
  for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) {
    org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit;
    mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit;
    
    CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(),
        mr2Spit.getStartOffsets(), mr2Spit.getLengths(),
        mr2Spit.getLocations());
    splits[i] = split;
    i++;
  }

  return splits;
}
 
源代码12 项目: gemfirexd-oss   文件: GemFireXDManager.java
/**
 * Make sure we do not generate a lot of data here as this will be duplicated
 * per split and sent to HAWQ master and later to datanodes.
 * 
 * The sequence in which data is written to out must match the sequence it is
 * read in {@link #readUserData()}
 * 
 * <p>
 * Only called from Fragmenter.
 * 
 * @param cSplit
 * @return
 */
public byte[] populateUserData(CombineFileSplit cSplit) throws IOException {
  // Construct user data
  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  DataOutput out = new DataOutputStream(baos);

  // TODO Uncomment below statement (and its corresponding code in
  // readUserData()) when loner system is started from fragmenter as well as
  // from accessor.
  // 1. restart loner
  // out.write(RESTART_LONER_SYSTEM_CODE);
  // out.writeBoolean(this.restartLoner);

  // 2. home dir
  out.write(HOME_DIR_CODE);
  out.writeUTF(this.homeDir);

  // 3. schema.table
  out.write(SCHEMA_TABLE_NAME_CODE);
  out.writeUTF(this.schemaTableName);

  out.write(SPLIT_CODE);
  cSplit.write(out);

  // Serialize it and return
  return baos.toByteArray();
}
 
源代码13 项目: gemfirexd-oss   文件: GemFireXDManager.java
/**
 * This is only called from Accessor. The sequence in which switch cases
 * appear must match to the sequence followed in writing data to out in
 * {@link #populateUserData(FileSplit)}
 * 
 * @param data
 * @throws IOException
 */
public void readUserData() throws IOException {
  byte[] data = this.inputData.getFragmentMetadata();
  if (data != null && data.length > 0) {
    boolean done = false;
    ByteArrayDataInput in = new ByteArrayDataInput();
    in.initialize(data, null);
    while (!done) {
      try {
        switch (in.readByte()) {
        case HOME_DIR_CODE:
          this.homeDir = in.readUTF();
          this.logger.debug("Accessor received home dir: " + this.homeDir);
          break;
        case SCHEMA_TABLE_NAME_CODE:
          this.schemaTableName = in.readUTF();
          this.logger.debug("Accessor received schemaTable name: "
              + this.schemaTableName);
          break;
        case SPLIT_CODE:
          this.split = new CombineFileSplit();
          this.split.readFields(in);
          this.logger.debug("Accessor split read, total length: " + this.split.getLength());
          done = true;
          break;
        default:
          this.logger.error("Internal error: Invalid data from fragmenter.");
          done = true;
          break;
        }
      } catch (EOFException eofe) {
        this.logger.error("Internal error: Invalid data from fragmenter.");
        break; // from while().
      }
    }
  }
}
 
源代码14 项目: gemfirexd-oss   文件: AbstractGFRecordReader.java
/**
 * Initializes instance of record reader using file split and job
 * configuration
 * 
 * @param split
 * @param conf
 * @throws IOException
 */
public void initialize(CombineFileSplit split, JobConf conf) throws IOException {
  CombineFileSplit cSplit = (CombineFileSplit) split;
  Path[] path = cSplit.getPaths();
  long[] start = cSplit.getStartOffsets();
  long[] len = cSplit.getLengths();

  FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
  this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l);
}
 
源代码15 项目: gemfirexd-oss   文件: GFInputFormat.java
@Override
public RecordReader<GFKey, PersistedEventImpl> getRecordReader(
    InputSplit split, JobConf job, Reporter reporter) throws IOException {

  CombineFileSplit cSplit = (CombineFileSplit) split;
  AbstractGFRecordReader reader = new AbstractGFRecordReader();
  reader.initialize(cSplit, job);
  return reader;
}
 
public ImportCombineFileRecordReader(CombineFileSplit combineFileSplit, JobConf job, Reporter
    reporter) throws IOException {
  this.combineFileSplit = combineFileSplit;
  this.job = job;
  this.reporter = reporter;

  processedPathCount = 0;
  currentRecordReader = getRecordReader(combineFileSplit.getPath(processedPathCount));
}
 
源代码17 项目: hudi   文件: HoodieCombineHiveInputFormat.java
/**
 * Create a generic Hive RecordReader than can iterate over all chunks in a CombinedFileSplit.
 */
@Override
public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
  if (!(split instanceof CombineHiveInputSplit)) {
    return super.getRecordReader(split, job, reporter);
  }

  CombineHiveInputSplit hsplit = (CombineHiveInputSplit) split;

  String inputFormatClassName = null;
  Class<?> inputFormatClass;
  try {
    inputFormatClassName = hsplit.inputFormatClassName();
    inputFormatClass = job.getClassByName(inputFormatClassName);
  } catch (Exception e) {
    throw new IOException("cannot find class " + inputFormatClassName);
  }

  pushProjectionsAndFilters(job, inputFormatClass, hsplit.getPath(0));

  if (inputFormatClass.getName().equals(HoodieParquetInputFormat.class.getName())) {
    return ShimLoader.getHadoopShims().getCombineFileInputFormat().getRecordReader(job, (CombineFileSplit) split,
        reporter, CombineHiveRecordReader.class);
  } else if (inputFormatClass.getName().equals(HoodieParquetRealtimeInputFormat.class.getName())) {
    HoodieCombineFileInputFormatShim shims = new HoodieCombineFileInputFormatShim();
    IOContextMap.get(job).setInputPath(((CombineHiveInputSplit) split).getPath(0));
    return shims.getRecordReader(job, ((CombineHiveInputSplit) split).getInputSplitShim(),
        reporter, CombineHiveRecordReader.class);
  } else {
    throw new HoodieException("Unexpected input format : " + inputFormatClassName);
  }
}
 
源代码18 项目: hudi   文件: HoodieCombineRealtimeRecordReader.java
public HoodieCombineRealtimeRecordReader(JobConf jobConf, CombineFileSplit split,
    List<RecordReader> readers) {
  try {
    ValidationUtils.checkArgument(((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits().size() == readers
        .size(), "Num Splits does not match number of unique RecordReaders!");
    for (InputSplit rtSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) {
      LOG.info("Creating new RealtimeRecordReader for split");
      recordReaders.add(
          new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) rtSplit, jobConf, readers.remove(0)));
    }
    currentRecordReader = recordReaders.remove(0);
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}
 
源代码19 项目: gemfirexd-oss   文件: GemFireXDManager.java
/**
 * Make sure we do not generate a lot of data here as this will be duplicated
 * per split and sent to HAWQ master and later to datanodes.
 * 
 * The sequence in which data is written to out must match the sequence it is
 * read in {@link #readUserData()}
 * 
 * <p>
 * Only called from Fragmenter.
 * 
 * @param cSplit
 * @return
 */
public byte[] populateUserData(CombineFileSplit cSplit) throws IOException {
  // Construct user data
  ByteArrayOutputStream baos = new ByteArrayOutputStream();
  DataOutput out = new DataOutputStream(baos);

  // TODO Uncomment below statement (and its corresponding code in
  // readUserData()) when loner system is started from fragmenter as well as
  // from accessor.
  // 1. restart loner
  // out.write(RESTART_LONER_SYSTEM_CODE);
  // out.writeBoolean(this.restartLoner);

  // 2. home dir
  out.write(HOME_DIR_CODE);
  out.writeUTF(this.homeDir);

  // 3. schema.table
  out.write(SCHEMA_TABLE_NAME_CODE);
  out.writeUTF(this.schemaTableName);

  out.write(SPLIT_CODE);
  cSplit.write(out);

  // Serialize it and return
  return baos.toByteArray();
}
 
源代码20 项目: gemfirexd-oss   文件: GemFireXDManager.java
/**
 * This is only called from Accessor. The sequence in which switch cases
 * appear must match to the sequence followed in writing data to out in
 * {@link #populateUserData(FileSplit)}
 * 
 * @param data
 * @throws IOException
 */
public void readUserData() throws IOException {
  byte[] data = this.inputData.getFragmentMetadata();
  if (data != null && data.length > 0) {
    boolean done = false;
    ByteArrayDataInput in = new ByteArrayDataInput();
    in.initialize(data, null);
    while (!done) {
      try {
        switch (in.readByte()) {
        case HOME_DIR_CODE:
          this.homeDir = in.readUTF();
          this.logger.debug("Accessor received home dir: " + this.homeDir);
          break;
        case SCHEMA_TABLE_NAME_CODE:
          this.schemaTableName = in.readUTF();
          this.logger.debug("Accessor received schemaTable name: "
              + this.schemaTableName);
          break;
        case SPLIT_CODE:
          this.split = new CombineFileSplit();
          this.split.readFields(in);
          this.logger.debug("Accessor split read, total length: " + this.split.getLength());
          done = true;
          break;
        default:
          this.logger.error("Internal error: Invalid data from fragmenter.");
          done = true;
          break;
        }
      } catch (EOFException eofe) {
        this.logger.error("Internal error: Invalid data from fragmenter.");
        break; // from while().
      }
    }
  }
}
 
源代码21 项目: gemfirexd-oss   文件: AbstractGFRecordReader.java
/**
 * Initializes instance of record reader using file split and job
 * configuration
 * 
 * @param split
 * @param conf
 * @throws IOException
 */
public void initialize(CombineFileSplit split, JobConf conf) throws IOException {
  CombineFileSplit cSplit = (CombineFileSplit) split;
  Path[] path = cSplit.getPaths();
  long[] start = cSplit.getStartOffsets();
  long[] len = cSplit.getLengths();

  FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
  this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l);
}
 
源代码22 项目: gemfirexd-oss   文件: GFInputFormat.java
@Override
public RecordReader<GFKey, PersistedEventImpl> getRecordReader(
    InputSplit split, JobConf job, Reporter reporter) throws IOException {

  CombineFileSplit cSplit = (CombineFileSplit) split;
  AbstractGFRecordReader reader = new AbstractGFRecordReader();
  reader.initialize(cSplit, job);
  return reader;
}
 
@Test(timeout=10000)
public void testFormat() throws Exception {
  JobConf job = new JobConf(conf);

  Reporter reporter = Reporter.NULL;

  Random random = new Random();
  long seed = random.nextLong();
  LOG.info("seed = "+seed);
  random.setSeed(seed);

  localFs.delete(workDir, true);

  FileInputFormat.setInputPaths(job, workDir);

  final int length = 10000;
  final int numFiles = 10;

  // create a file with various lengths
  createFiles(length, numFiles, random);

  // create a combine split for the files
  InputFormat<IntWritable, BytesWritable> format =
    new CombineSequenceFileInputFormat<IntWritable, BytesWritable>();
  IntWritable key = new IntWritable();
  BytesWritable value = new BytesWritable();
  for (int i = 0; i < 3; i++) {
    int numSplits =
      random.nextInt(length/(SequenceFile.SYNC_INTERVAL/20))+1;
    LOG.info("splitting: requesting = " + numSplits);
    InputSplit[] splits = format.getSplits(job, numSplits);
    LOG.info("splitting: got =        " + splits.length);

    // we should have a single split as the length is comfortably smaller than
    // the block size
    assertEquals("We got more than one splits!", 1, splits.length);
    InputSplit split = splits[0];
    assertEquals("It should be CombineFileSplit",
      CombineFileSplit.class, split.getClass());

    // check each split
    BitSet bits = new BitSet(length);
    RecordReader<IntWritable, BytesWritable> reader =
      format.getRecordReader(split, job, reporter);
    try {
      while (reader.next(key, value)) {
        assertFalse("Key in multiple partitions.", bits.get(key.get()));
        bits.set(key.get());
      }
    } finally {
      reader.close();
    }
    assertEquals("Some keys in no partition.", length, bits.cardinality());
  }
}
 
源代码24 项目: hadoop   文件: TestCombineTextInputFormat.java
@Test(timeout=10000)
public void testFormat() throws Exception {
  JobConf job = new JobConf(defaultConf);

  Random random = new Random();
  long seed = random.nextLong();
  LOG.info("seed = "+seed);
  random.setSeed(seed);

  localFs.delete(workDir, true);
  FileInputFormat.setInputPaths(job, workDir);

  final int length = 10000;
  final int numFiles = 10;

  createFiles(length, numFiles, random);

  // create a combined split for the files
  CombineTextInputFormat format = new CombineTextInputFormat();
  LongWritable key = new LongWritable();
  Text value = new Text();
  for (int i = 0; i < 3; i++) {
    int numSplits = random.nextInt(length/20)+1;
    LOG.info("splitting: requesting = " + numSplits);
    InputSplit[] splits = format.getSplits(job, numSplits);
    LOG.info("splitting: got =        " + splits.length);

    // we should have a single split as the length is comfortably smaller than
    // the block size
    assertEquals("We got more than one splits!", 1, splits.length);
    InputSplit split = splits[0];
    assertEquals("It should be CombineFileSplit",
      CombineFileSplit.class, split.getClass());

    // check the split
    BitSet bits = new BitSet(length);
    LOG.debug("split= " + split);
    RecordReader<LongWritable, Text> reader =
      format.getRecordReader(split, job, voidReporter);
    try {
      int count = 0;
      while (reader.next(key, value)) {
        int v = Integer.parseInt(value.toString());
        LOG.debug("read " + v);
        if (bits.get(v)) {
          LOG.warn("conflict with " + v +
                   " at position "+reader.getPos());
        }
        assertFalse("Key in multiple partitions.", bits.get(v));
        bits.set(v);
        count++;
      }
      LOG.info("splits="+split+" count=" + count);
    } finally {
      reader.close();
    }
    assertEquals("Some keys in no partition.", length, bits.cardinality());
  }
}
 
源代码25 项目: gemfirexd-oss   文件: EventInputFormatTest.java
public void testEventInputFormat() throws Exception {
  getConnection();
  Connection conn = startNetserverAndGetLocalNetConnection();
  
  Statement st = conn.createStatement();
  st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "' batchtimeinterval 5000 milliseconds");
  st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) persistent hdfsstore (myhdfs) BUCKETS 1");

  PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)");
  int NUM_ENTRIES = 20;
  for(int i = 0; i < NUM_ENTRIES; i++) {
    ps.setInt(1, i);
    ps.setString(2, "Value-" + System.nanoTime());
    ps.execute();
  }
  //Wait for data to get to HDFS...
  String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1");
  st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)");
  
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);
  FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/"));
  assertEquals(1, list.length);
  
  conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1");
  conf.set(RowInputFormat.HOME_DIR, HDFS_DIR);
  
  JobConf job = new JobConf(conf);
  job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
  RowInputFormat ipformat = new RowInputFormat();
  InputSplit[] splits = ipformat.getSplits(job, 2);
  assertEquals(1, splits.length);
  CombineFileSplit split = (CombineFileSplit) splits[0];
  assertEquals(1, split.getPaths().length);
  assertEquals(list[0].getPath().toString(), split.getPath(0).toString());
  assertEquals(0, split.getOffset(0));
  assertEquals(list[0].getLen(), split.getLength(0));

  RecordReader<Key, Row> rr = ipformat.getRecordReader(split, job, null);
  Key key = rr.createKey();
  Row value = rr.createValue();

  int count = 0;
  while (rr.next(key, value)) {
    assertEquals(count++, value.getRowAsResultSet().getInt("col1"));
  }
  
  assertEquals(20, count);
  
  TestUtil.shutDown();
}
 
源代码26 项目: gemfirexd-oss   文件: EventInputFormatTest.java
public void testNoSecureHdfsCheck() throws Exception {
  getConnection();
  Connection conn = startNetserverAndGetLocalNetConnection();
  
  
  Statement st = conn.createStatement();
  st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "'  batchtimeinterval 5000 milliseconds");
  st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) persistent hdfsstore (myhdfs) BUCKETS 1");
  
  PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)");
  int NUM_ENTRIES = 20;
  for(int i = 0; i < NUM_ENTRIES; i++) {
    ps.setInt(1, i);
    ps.setString(2, "Value-" + System.nanoTime());
    ps.execute();
  }
  //Wait for data to get to HDFS...
  String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1");
  st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)");
  
  stopNetServer();
  FabricServiceManager.currentFabricServiceInstance().stop(new Properties());
  
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(conf);
  FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/"));
  assertEquals(1, list.length);
  
  conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1");
  conf.set(RowInputFormat.HOME_DIR, HDFS_DIR);
  conf.set("hadoop.security.authentication", "kerberos");
  
  JobConf job = new JobConf(conf);
  job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
  RowInputFormat ipformat = new RowInputFormat();
  InputSplit[] splits = ipformat.getSplits(job, 2);
  assertEquals(1, splits.length);
  CombineFileSplit split = (CombineFileSplit) splits[0];
  assertEquals(1, split.getPaths().length);
  assertEquals(list[0].getPath().toString(), split.getPath(0).toString());
  assertEquals(0, split.getOffset(0));
  assertEquals(list[0].getLen(), split.getLength(0));
  
  RecordReader<Key, Row> rr = ipformat.getRecordReader(split, job, null);
  Key key = rr.createKey();
  Row value = rr.createValue();
  
  int count = 0;
  while (rr.next(key, value)) {
    assertEquals(count++, value.getRowAsResultSet().getInt("col1"));
  }
  
  assertEquals(20, count);
  
  TestUtil.shutDown();
}
 
源代码27 项目: gemfirexd-oss   文件: TestDataHelper.java
private static InputSplit getCombineSplit(Path path, long l, long m, String[] strings) {
  Path[] paths = {path};
  long[] offsets = {l};
  long[] lengths = {m};
  return new CombineFileSplit(null, paths, offsets, lengths, strings);
}
 
源代码28 项目: gemfirexd-oss   文件: TestGemFireXDManager.java
@Override
public byte[] populateUserData(CombineFileSplit fileSplit) throws IOException {
  return null;
}
 
源代码29 项目: gemfirexd-oss   文件: GFXDHiveInputFormat.java
@Override
protected InputSplit getSplit(CombineFileSplit split) {
  return new GFXDHiveSplit(split);
}
 
源代码30 项目: gemfirexd-oss   文件: GFXDHiveSplit.java
public GFXDHiveSplit(CombineFileSplit fileSplit) {
  this.combineFileSplit = fileSplit;
}
 
 类所在包
 类方法
 同包方法