类org.apache.hadoop.mapreduce.lib.input.CombineFileSplit源码实例Demo

下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.CombineFileSplit的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hadoop   文件: MultiFileWordCount.java
public CombineFileLineRecordReader(CombineFileSplit split,
    TaskAttemptContext context, Integer index) throws IOException {
  
  this.path = split.getPath(index);
  fs = this.path.getFileSystem(context.getConfiguration());
  this.startOffset = split.getOffset(index);
  this.end = startOffset + split.getLength(index);
  boolean skipFirstLine = false;
  
  //open the file
  fileIn = fs.open(path);
  if (startOffset != 0) {
    skipFirstLine = true;
    --startOffset;
    fileIn.seek(startOffset);
  }
  reader = new LineReader(fileIn);
  if (skipFirstLine) {  // skip first line and re-establish "startOffset".
    startOffset += reader.readLine(new Text(), 0,
                (int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
  }
  this.pos = startOffset;
}
 
源代码2 项目: hadoop   文件: GridmixSplit.java
public GridmixSplit(CombineFileSplit cfsplit, int maps, int id,
    long inputBytes, long inputRecords, long outputBytes,
    long outputRecords, double[] reduceBytes, double[] reduceRecords,
    long[] reduceOutputBytes, long[] reduceOutputRecords)
    throws IOException {
  super(cfsplit);
  this.id = id;
  this.maps = maps;
  reduces = reduceBytes.length;
  this.inputRecords = inputRecords;
  this.outputBytes = outputBytes;
  this.outputRecords = outputRecords;
  this.reduceBytes = reduceBytes;
  this.reduceRecords = reduceRecords;
  nSpec = reduceOutputBytes.length;
  this.reduceOutputBytes = reduceOutputBytes;
  this.reduceOutputRecords = reduceOutputRecords;
}
 
源代码3 项目: hadoop   文件: LoadSplit.java
public LoadSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes, 
                 long inputRecords, long outputBytes, long outputRecords, 
                 double[] reduceBytes, double[] reduceRecords, 
                 long[] reduceOutputBytes, long[] reduceOutputRecords,
                 ResourceUsageMetrics metrics,
                 ResourceUsageMetrics[] rMetrics)
throws IOException {
  super(cfsplit);
  this.id = id;
  this.maps = maps;
  reduces = reduceBytes.length;
  this.inputRecords = inputRecords;
  this.outputBytes = outputBytes;
  this.outputRecords = outputRecords;
  this.reduceBytes = reduceBytes;
  this.reduceRecords = reduceRecords;
  nSpec = reduceOutputBytes.length;
  this.reduceOutputBytes = reduceOutputBytes;
  this.reduceOutputRecords = reduceOutputRecords;
  this.mapMetrics = metrics;
  this.reduceMetrics = rMetrics;
}
 
源代码4 项目: hadoop   文件: TestFileQueue.java
@Test
public void testRepeat() throws Exception {
  final Configuration conf = new Configuration();
  Arrays.fill(loc, "");
  Arrays.fill(start, 0L);
  Arrays.fill(len, BLOCK);

  final ByteArrayOutputStream out = fillVerif();
  final FileQueue q =
    new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
  final byte[] verif = out.toByteArray();
  final byte[] check = new byte[2 * NFILES * BLOCK];
  q.read(check, 0, NFILES * BLOCK);
  assertArrayEquals(verif, Arrays.copyOf(check, NFILES * BLOCK));

  final byte[] verif2 = new byte[2 * NFILES * BLOCK];
  System.arraycopy(verif, 0, verif2, 0, verif.length);
  System.arraycopy(verif, 0, verif2, verif.length, verif.length);
  q.read(check, 0, 2 * NFILES * BLOCK);
  assertArrayEquals(verif2, check);

}
 
源代码5 项目: hadoop   文件: TestGridMixClasses.java
private LoadSplit getLoadSplit() throws Exception {

    Path[] files = {new Path("one"), new Path("two")};
    long[] start = {1, 2};
    long[] lengths = {100, 200};
    String[] locations = {"locOne", "loctwo"};

    CombineFileSplit cfSplit = new CombineFileSplit(files, start, lengths,
            locations);
    ResourceUsageMetrics metrics = new ResourceUsageMetrics();
    metrics.setCumulativeCpuUsage(200);
    ResourceUsageMetrics[] rMetrics = {metrics};

    double[] reduceBytes = {8.1d, 8.2d};
    double[] reduceRecords = {9.1d, 9.2d};
    long[] reduceOutputBytes = {101L, 102L};
    long[] reduceOutputRecords = {111L, 112L};

    return new LoadSplit(cfSplit, 2, 1, 4L, 5L, 6L, 7L,
            reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords,
            metrics, rMetrics);
  }
 
源代码6 项目: gemfirexd-oss   文件: RowRecordReader.java
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
    throws IOException {
  Configuration conf = context.getConfiguration();
  CombineFileSplit cSplit =  (CombineFileSplit) split;
  Path[] path = cSplit.getPaths();
  long[] start = cSplit.getStartOffsets();
  long[] len = cSplit.getLengths();
  
  FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
  
  long startTS = conf.getLong(RowInputFormat.START_TIME_MILLIS, 0l);
  long endTS = conf.getLong(RowInputFormat.END_TIME_MILLIS, 0l);
  this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, startTS, endTS);

  instantiateGfxdLoner(conf);
}
 
源代码7 项目: gemfirexd-oss   文件: HDFSSplitIterator.java
public HDFSSplitIterator(FileSystem fs, Path[] paths, long[] offsets, long[] lengths, long startTime, long endTime) throws IOException {
  this.fs = fs;
  this.split = new CombineFileSplit(paths, offsets, lengths, null);
  while(currentHopIndex < split.getNumPaths() && !fs.exists(split.getPath(currentHopIndex))){
    logger.warning(LocalizedStrings.HOPLOG_CLEANED_UP_BY_JANITOR, split.getPath(currentHopIndex));
    currentHopIndex++;
  }
  if(currentHopIndex == split.getNumPaths()){
    this.hoplog = null;
    iterator = null;
  } else {
    this.hoplog = getHoplog(fs,split.getPath(currentHopIndex));
    iterator = hoplog.getReader().scan(split.getOffset(currentHopIndex), split.getLength(currentHopIndex));
  }
  this.startTime = startTime;
  this.endTime = endTime;
}
 
源代码8 项目: kite   文件: AbstractCombineFileRecordReader.java
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  if (delegate != null) {
    delegate.close();
  }
  if (split instanceof CombineFileSplit) {
    CombineFileSplit combineSplit = (CombineFileSplit) split;
    FileSplit fileSplit = new FileSplit(combineSplit.getPath(idx), combineSplit.getOffset(idx),
        combineSplit.getLength(idx), combineSplit.getLocations());
    delegate = getInputFormat().createRecordReader(fileSplit, context);
    delegate.initialize(fileSplit, context);
  } else {
    throw new DatasetOperationException(
        "Split is not a CombineFileSplit: %s:%s",
        split.getClass().getCanonicalName(), split);
  }
}
 
源代码9 项目: big-c   文件: MultiFileWordCount.java
public CombineFileLineRecordReader(CombineFileSplit split,
    TaskAttemptContext context, Integer index) throws IOException {
  
  this.path = split.getPath(index);
  fs = this.path.getFileSystem(context.getConfiguration());
  this.startOffset = split.getOffset(index);
  this.end = startOffset + split.getLength(index);
  boolean skipFirstLine = false;
  
  //open the file
  fileIn = fs.open(path);
  if (startOffset != 0) {
    skipFirstLine = true;
    --startOffset;
    fileIn.seek(startOffset);
  }
  reader = new LineReader(fileIn);
  if (skipFirstLine) {  // skip first line and re-establish "startOffset".
    startOffset += reader.readLine(new Text(), 0,
                (int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
  }
  this.pos = startOffset;
}
 
源代码10 项目: big-c   文件: GridmixSplit.java
public GridmixSplit(CombineFileSplit cfsplit, int maps, int id,
    long inputBytes, long inputRecords, long outputBytes,
    long outputRecords, double[] reduceBytes, double[] reduceRecords,
    long[] reduceOutputBytes, long[] reduceOutputRecords)
    throws IOException {
  super(cfsplit);
  this.id = id;
  this.maps = maps;
  reduces = reduceBytes.length;
  this.inputRecords = inputRecords;
  this.outputBytes = outputBytes;
  this.outputRecords = outputRecords;
  this.reduceBytes = reduceBytes;
  this.reduceRecords = reduceRecords;
  nSpec = reduceOutputBytes.length;
  this.reduceOutputBytes = reduceOutputBytes;
  this.reduceOutputRecords = reduceOutputRecords;
}
 
源代码11 项目: big-c   文件: LoadSplit.java
public LoadSplit(CombineFileSplit cfsplit, int maps, int id, long inputBytes, 
                 long inputRecords, long outputBytes, long outputRecords, 
                 double[] reduceBytes, double[] reduceRecords, 
                 long[] reduceOutputBytes, long[] reduceOutputRecords,
                 ResourceUsageMetrics metrics,
                 ResourceUsageMetrics[] rMetrics)
throws IOException {
  super(cfsplit);
  this.id = id;
  this.maps = maps;
  reduces = reduceBytes.length;
  this.inputRecords = inputRecords;
  this.outputBytes = outputBytes;
  this.outputRecords = outputRecords;
  this.reduceBytes = reduceBytes;
  this.reduceRecords = reduceRecords;
  nSpec = reduceOutputBytes.length;
  this.reduceOutputBytes = reduceOutputBytes;
  this.reduceOutputRecords = reduceOutputRecords;
  this.mapMetrics = metrics;
  this.reduceMetrics = rMetrics;
}
 
源代码12 项目: big-c   文件: TestFileQueue.java
@Test
public void testRepeat() throws Exception {
  final Configuration conf = new Configuration();
  Arrays.fill(loc, "");
  Arrays.fill(start, 0L);
  Arrays.fill(len, BLOCK);

  final ByteArrayOutputStream out = fillVerif();
  final FileQueue q =
    new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
  final byte[] verif = out.toByteArray();
  final byte[] check = new byte[2 * NFILES * BLOCK];
  q.read(check, 0, NFILES * BLOCK);
  assertArrayEquals(verif, Arrays.copyOf(check, NFILES * BLOCK));

  final byte[] verif2 = new byte[2 * NFILES * BLOCK];
  System.arraycopy(verif, 0, verif2, 0, verif.length);
  System.arraycopy(verif, 0, verif2, verif.length, verif.length);
  q.read(check, 0, 2 * NFILES * BLOCK);
  assertArrayEquals(verif2, check);

}
 
源代码13 项目: big-c   文件: TestFileQueue.java
@Test
public void testUneven() throws Exception {
  final Configuration conf = new Configuration();
  Arrays.fill(loc, "");
  Arrays.fill(start, 0L);
  Arrays.fill(len, BLOCK);

  final int B2 = BLOCK / 2;
  for (int i = 0; i < NFILES; i += 2) {
    start[i] += B2;
    len[i] -= B2;
  }
  final FileQueue q =
    new FileQueue(new CombineFileSplit(paths, start, len, loc), conf);
  final ByteArrayOutputStream out = fillVerif();
  final byte[] verif = out.toByteArray();
  final byte[] check = new byte[NFILES / 2 * BLOCK + NFILES / 2 * B2];
  q.read(check, 0, verif.length);
  assertArrayEquals(verif, Arrays.copyOf(check, verif.length));
  q.read(check, 0, verif.length);
  assertArrayEquals(verif, Arrays.copyOf(check, verif.length));
}
 
源代码14 项目: big-c   文件: TestGridMixClasses.java
private LoadSplit getLoadSplit() throws Exception {

    Path[] files = {new Path("one"), new Path("two")};
    long[] start = {1, 2};
    long[] lengths = {100, 200};
    String[] locations = {"locOne", "loctwo"};

    CombineFileSplit cfSplit = new CombineFileSplit(files, start, lengths,
            locations);
    ResourceUsageMetrics metrics = new ResourceUsageMetrics();
    metrics.setCumulativeCpuUsage(200);
    ResourceUsageMetrics[] rMetrics = {metrics};

    double[] reduceBytes = {8.1d, 8.2d};
    double[] reduceRecords = {9.1d, 9.2d};
    long[] reduceOutputBytes = {101L, 102L};
    long[] reduceOutputRecords = {111L, 112L};

    return new LoadSplit(cfSplit, 2, 1, 4L, 5L, 6L, 7L,
            reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords,
            metrics, rMetrics);
  }
 
源代码15 项目: Halyard   文件: HalyardBulkLoad.java
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
    List<InputSplit> splits = super.getSplits(job);
    long maxSize = MAX_SINGLE_FILE_MULTIPLIER * job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
    if (maxSize > 0) {
        List<InputSplit> newSplits = new ArrayList<>();
        for (InputSplit spl : splits) {
            CombineFileSplit cfs = (CombineFileSplit)spl;
            for (int i=0; i<cfs.getNumPaths(); i++) {
                long length = cfs.getLength();
                if (length > maxSize) {
                    int replicas = (int)Math.ceil((double)length / (double)maxSize);
                    Path path = cfs.getPath(i);
                    for (int r=1; r<replicas; r++) {
                        newSplits.add(new CombineFileSplit(new Path[]{path}, new long[]{r}, new long[]{length}, cfs.getLocations()));
                    }
                }
            }
        }
        splits.addAll(newSplits);
    }
    return splits;
}
 
源代码16 项目: gemfirexd-oss   文件: RowRecordReader.java
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
    throws IOException {
  Configuration conf = context.getConfiguration();
  CombineFileSplit cSplit =  (CombineFileSplit) split;
  Path[] path = cSplit.getPaths();
  long[] start = cSplit.getStartOffsets();
  long[] len = cSplit.getLengths();
  
  FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
  
  long startTS = conf.getLong(RowInputFormat.START_TIME_MILLIS, 0l);
  long endTS = conf.getLong(RowInputFormat.END_TIME_MILLIS, 0l);
  this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, startTS, endTS);

  instantiateGfxdLoner(conf);
}
 
源代码17 项目: gemfirexd-oss   文件: HDFSSplitIterator.java
public HDFSSplitIterator(FileSystem fs, Path[] paths, long[] offsets, long[] lengths, long startTime, long endTime) throws IOException {
  this.fs = fs;
  this.split = new CombineFileSplit(paths, offsets, lengths, null);
  while(currentHopIndex < split.getNumPaths() && !fs.exists(split.getPath(currentHopIndex))){
    logger.warning(LocalizedStrings.HOPLOG_CLEANED_UP_BY_JANITOR, split.getPath(currentHopIndex));
    currentHopIndex++;
  }
  if(currentHopIndex == split.getNumPaths()){
    this.hoplog = null;
    iterator = null;
  } else {
    this.hoplog = getHoplog(fs,split.getPath(currentHopIndex));
    iterator = hoplog.getReader().scan(split.getOffset(currentHopIndex), split.getLength(currentHopIndex));
  }
  this.startTime = startTime;
  this.endTime = endTime;
}
 
源代码18 项目: Cubert   文件: CombinedFileRecordReader.java
public CombinedFileRecordReader(InputFormat<K, V> inputFormat,
                                CombineFileSplit combineFileSplit,
                                TaskAttemptContext context)
{
    this.inputFormat = inputFormat;
    this.combineFileSplit = combineFileSplit;
    this.context = context;

    long[] lengths = combineFileSplit.getLengths();
    long totalLength = 0;
    for (long length : lengths)
        totalLength += length;
    fractionLength = new float[lengths.length];
    for (int i = 0; i < lengths.length; i++)
        fractionLength[i] = ((float) lengths[i]) / totalLength;
}
 
源代码19 项目: incubator-gobblin   文件: AvroKeyMapper.java
@Override
protected void map(AvroKey<GenericRecord> key, NullWritable value, Context context)
    throws IOException, InterruptedException {
  if (context.getNumReduceTasks() == 0) {
    context.write(key, NullWritable.get());
  } else {
    populateComparableKeyRecord(key.datum(), this.outKey.datum());
    this.outValue.datum(key.datum());
    try {
      context.write(this.outKey, this.outValue);
    } catch (AvroRuntimeException e) {
      final Path[] paths = ((CombineFileSplit) context.getInputSplit()).getPaths();
      throw new IOException("Unable to process paths " + StringUtils.join(paths, ','), e);
    }
  }
  context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
}
 
/**
 * Set the number of locations in the split to SPLIT_MAX_NUM_LOCATIONS if it is larger than
 * SPLIT_MAX_NUM_LOCATIONS (MAPREDUCE-5186).
 */
private static List<InputSplit> cleanSplits(List<InputSplit> splits) throws IOException {
  if (VersionInfo.getVersion().compareTo("2.3.0") >= 0) {
    // This issue was fixed in 2.3.0, if newer version, no need to clean up splits
    return splits;
  }

  List<InputSplit> cleanedSplits = Lists.newArrayList();

  for (int i = 0; i < splits.size(); i++) {
    CombineFileSplit oldSplit = (CombineFileSplit) splits.get(i);
    String[] locations = oldSplit.getLocations();

    Preconditions.checkNotNull(locations, "CombineFileSplit.getLocations() returned null");

    if (locations.length > SPLIT_MAX_NUM_LOCATIONS) {
      locations = Arrays.copyOf(locations, SPLIT_MAX_NUM_LOCATIONS);
    }

    cleanedSplits.add(new CombineFileSplit(oldSplit.getPaths(), oldSplit.getStartOffsets(), oldSplit.getLengths(),
        locations));
  }
  return cleanedSplits;
}
 
源代码21 项目: hraven   文件: CombineFileInputFormat.java
/**
 * Create a single split from the list of blocks specified in validBlocks
 * Add this new split into splitList.
 */
private void addCreatedSplit(List<InputSplit> splitList, 
                             List<String> locations, 
                             ArrayList<OneBlockInfo> validBlocks) {
  // create an input split
  Path[] fl = new Path[validBlocks.size()];
  long[] offset = new long[validBlocks.size()];
  long[] length = new long[validBlocks.size()];
  for (int i = 0; i < validBlocks.size(); i++) {
    fl[i] = validBlocks.get(i).onepath; 
    offset[i] = validBlocks.get(i).offset;
    length[i] = validBlocks.get(i).length;
  }

   // add this split to the list that is returned
  CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 
                                 length, locations.toArray(new String[0]));
  splitList.add(thissplit); 
}
 
源代码22 项目: kite   文件: AbstractKiteCombineFileInputFormat.java
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
  List<InputSplit> kiteCombineSplits = Lists.newArrayList();
  for (InputSplit inputSplit : super.getSplits(job)) {
    kiteCombineSplits.add(new KiteCombineFileSplit((CombineFileSplit) inputSplit));
  }
  return kiteCombineSplits;
}
 
public MDSCombineSpreadReader( final CombineFileSplit split , final TaskAttemptContext context , final Integer index ) throws IOException{
  Configuration config = context.getConfiguration();
  Path path = split.getPath( index );
  FileSystem fs = path.getFileSystem( config );
  long fileLength = fs.getLength( path );
  InputStream in = fs.open( path );

  innerReader = new MDSSpreadReader();
  innerReader.setStream( in , fileLength , 0 , fileLength );
}
 
源代码24 项目: hadoop   文件: FileQueue.java
/**
 * @param split Description of input sources.
 * @param conf Used to resolve FileSystem instances.
 */
public FileQueue(CombineFileSplit split, Configuration conf)
    throws IOException {
  this.conf = conf;
  paths = split.getPaths();
  startoffset = split.getStartOffsets();
  lengths = split.getLengths();
  nextSource();
}
 
源代码25 项目: hadoop   文件: TestFileQueue.java
@Test
public void testEmpty() throws Exception {
  final Configuration conf = new Configuration();
  // verify OK if unused
  final FileQueue q = new FileQueue(new CombineFileSplit(
        new Path[0], new long[0], new long[0], new String[0]), conf);
}
 
源代码26 项目: hadoop   文件: TestGridMixClasses.java
@Test (timeout=1000)
public void testGridmixSplit() throws Exception {
  Path[] files = {new Path("one"), new Path("two")};
  long[] start = {1, 2};
  long[] lengths = {100, 200};
  String[] locations = {"locOne", "loctwo"};

  CombineFileSplit cfSplit = new CombineFileSplit(files, start, lengths,
          locations);
  ResourceUsageMetrics metrics = new ResourceUsageMetrics();
  metrics.setCumulativeCpuUsage(200);

  double[] reduceBytes = {8.1d, 8.2d};
  double[] reduceRecords = {9.1d, 9.2d};
  long[] reduceOutputBytes = {101L, 102L};
  long[] reduceOutputRecords = {111L, 112L};

  GridmixSplit test = new GridmixSplit(cfSplit, 2, 3, 4L, 5L, 6L, 7L,
          reduceBytes, reduceRecords, reduceOutputBytes, reduceOutputRecords);

  ByteArrayOutputStream data = new ByteArrayOutputStream();
  DataOutputStream out = new DataOutputStream(data);
  test.write(out);
  GridmixSplit copy = new GridmixSplit();
  copy.readFields(new DataInputStream(new ByteArrayInputStream(data
          .toByteArray())));

  // data should be the same
  assertEquals(test.getId(), copy.getId());
  assertEquals(test.getMapCount(), copy.getMapCount());
  assertEquals(test.getInputRecords(), copy.getInputRecords());

  assertEquals(test.getOutputBytes()[0], copy.getOutputBytes()[0]);
  assertEquals(test.getOutputRecords()[0], copy.getOutputRecords()[0]);
  assertEquals(test.getReduceBytes(0), copy.getReduceBytes(0));
  assertEquals(test.getReduceRecords(0), copy.getReduceRecords(0));

}
 
源代码27 项目: hadoop   文件: TestFilePool.java
void checkSplitEq(FileSystem fs, CombineFileSplit split, long bytes)
    throws Exception {
  long splitBytes = 0L;
  HashSet<Path> uniq = new HashSet<Path>();
  for (int i = 0; i < split.getNumPaths(); ++i) {
    splitBytes += split.getLength(i);
    assertTrue(
        split.getLength(i) <= fs.getFileStatus(split.getPath(i)).getLen());
    assertFalse(uniq.contains(split.getPath(i)));
    uniq.add(split.getPath(i));
  }
  assertEquals(bytes, splitBytes);
}
 
源代码28 项目: gemfirexd-oss   文件: AbstractGFRecordReader.java
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
  CombineFileSplit cSplit = (CombineFileSplit) split;
  Path[] path = cSplit.getPaths();
  long[] start = cSplit.getStartOffsets();
  long[] len = cSplit.getLengths();

  Configuration conf = context.getConfiguration();
  FileSystem fs = cSplit.getPath(0).getFileSystem(conf);
  
  this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l);
}
 
源代码29 项目: gemfirexd-oss   文件: GFInputFormatJUnitTest.java
public void testHfileSplitCompleteness() throws Exception {
  cluster = initMiniCluster(CLUSTER_PORT, 1);

  int count = 40;
  HdfsSortedOplogOrganizer bucket1 = new HdfsSortedOplogOrganizer(
      regionManager, 1);
  ArrayList<TestEvent> items = new ArrayList<TestEvent>();
  for (int i = 0; i < count; i++) {
    items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
  }
  bucket1.flush(items.iterator(), count);

  Configuration conf = hdfsStore.getFileSystem().getConf();
  GFInputFormat gfInputFormat = new GFInputFormat();
  Job job = Job.getInstance(conf, "test");

  conf = job.getConfiguration();
  conf.set(GFInputFormat.INPUT_REGION, getName());
  conf.set(GFInputFormat.HOME_DIR, testDataDir.getName());
  conf.setBoolean(GFInputFormat.CHECKPOINT, false);

  List<InputSplit> splits = gfInputFormat.getSplits(job);
  assertTrue(1 < splits.size());

  long lastBytePositionOfPrevious = 0;
  for (InputSplit inputSplit : splits) {
    CombineFileSplit split = (CombineFileSplit) inputSplit;
    assertEquals(1, split.getPaths().length);
    assertEquals(lastBytePositionOfPrevious, split.getOffset(0));
    lastBytePositionOfPrevious += split.getLength();
    assertEquals(1, split.getLocations().length);
  }

  Path bucketPath = new Path(regionPath, "1");
  Path hopPath = new Path(bucketPath, bucket1.getSortedOplogs().iterator()
      .next().get().getFileName());
  FileStatus status = hdfsStore.getFileSystem().getFileStatus(hopPath);
  assertEquals(status.getLen(), lastBytePositionOfPrevious);
}
 
源代码30 项目: big-c   文件: FileQueue.java
/**
 * @param split Description of input sources.
 * @param conf Used to resolve FileSystem instances.
 */
public FileQueue(CombineFileSplit split, Configuration conf)
    throws IOException {
  this.conf = conf;
  paths = split.getPaths();
  startoffset = split.getStartOffsets();
  lengths = split.getLengths();
  nextSource();
}
 
 同包方法