org.apache.hadoop.mapred.RecordReader#close ( )源码实例Demo

下面列出了org.apache.hadoop.mapred.RecordReader#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hadoop   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码2 项目: hadoop   文件: InputSampler.java
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码3 项目: hadoop   文件: TestDBInputFormat.java
/**
 * test DBInputFormat class. Class should split result for chunks
 * @throws Exception
 */
@Test(timeout = 10000)
public void testDBInputFormat() throws Exception {
  JobConf configuration = new JobConf();
  setupDriver(configuration);
  
  DBInputFormat<NullDBWritable> format = new DBInputFormat<NullDBWritable>();
  format.setConf(configuration);
  format.setConf(configuration);
  DBInputFormat.DBInputSplit splitter = new DBInputFormat.DBInputSplit(1, 10);
  Reporter reporter = mock(Reporter.class);
  RecordReader<LongWritable, NullDBWritable> reader = format.getRecordReader(
      splitter, configuration, reporter);

  configuration.setInt(MRJobConfig.NUM_MAPS, 3);
  InputSplit[] lSplits = format.getSplits(configuration, 3);
  assertEquals(5, lSplits[0].getLength());
  assertEquals(3, lSplits.length);

  // test reader .Some simple tests
  assertEquals(LongWritable.class, reader.createKey().getClass());
  assertEquals(0, reader.getPos());
  assertEquals(0, reader.getProgress(), 0.001);
  reader.close();
}
 
@Test
 public void readEthereumBlockInputFormatBlock1346406Bzip2Compressed() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
	JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth1346406.bin.bz2";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();	
Path file = new Path(fileNameBlock);
   FileInputFormat.setInputPaths(job, file);
   EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
   format.configure(job);
   InputSplit[] inputSplits = format.getSplits(job,1);
 
   assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
   	RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned  null RecordReader");
	
BytesWritable key = new BytesWritable();	
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 1346406 contains at least one block");
	
assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions");
   	assertFalse( reader.next(key,block),"No further blocks in block 1346406");
   	reader.close();
}
 
源代码5 项目: big-c   文件: InputSampler.java
/**
 * From each split sampled, take the first numSamples / numSplits records.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  int samplesPerSplit = numSamples / splitsToSample;
  long records = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      samples.add(key);
      key = reader.createKey();
      ++records;
      if ((i+1) * samplesPerSplit <= records) {
        break;
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码6 项目: big-c   文件: InputSampler.java
/**
 * For each split sampled, emit when the ratio of the number of records
 * retained to the total record count is less than the specified
 * frequency.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>();
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  int splitStep = splits.length / splitsToSample;
  long records = 0;
  long kept = 0;
  for (int i = 0; i < splitsToSample; ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
        job, Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      ++records;
      if ((double) kept / records < freq) {
        ++kept;
        samples.add(key);
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码7 项目: big-c   文件: TestDBInputFormat.java
/**
 * test DBInputFormat class. Class should split result for chunks
 * @throws Exception
 */
@Test(timeout = 10000)
public void testDBInputFormat() throws Exception {
  JobConf configuration = new JobConf();
  setupDriver(configuration);
  
  DBInputFormat<NullDBWritable> format = new DBInputFormat<NullDBWritable>();
  format.setConf(configuration);
  format.setConf(configuration);
  DBInputFormat.DBInputSplit splitter = new DBInputFormat.DBInputSplit(1, 10);
  Reporter reporter = mock(Reporter.class);
  RecordReader<LongWritable, NullDBWritable> reader = format.getRecordReader(
      splitter, configuration, reporter);

  configuration.setInt(MRJobConfig.NUM_MAPS, 3);
  InputSplit[] lSplits = format.getSplits(configuration, 3);
  assertEquals(5, lSplits[0].getLength());
  assertEquals(3, lSplits.length);

  // test reader .Some simple tests
  assertEquals(LongWritable.class, reader.createKey().getClass());
  assertEquals(0, reader.getPos());
  assertEquals(0, reader.getProgress(), 0.001);
  reader.close();
}
 
@Test
 public void readEthereumBlockInputFormatGenesisBlock() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="ethgenesis.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();	
Path file = new Path(fileNameBlock);
   FileInputFormat.setInputPaths(job, file);
   EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
   format.configure(job);
   InputSplit[] inputSplits = format.getSplits(job,1);
 
   assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
   	RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned  null RecordReader");
BytesWritable key = new BytesWritable();	
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for genesis block contains at least one block");
assertEquals( 0, block.getEthereumTransactions().size(),"Genesis Block must have 0 transactions");
   	assertFalse( reader.next(key,block),"No further blocks in genesis Block");
   	reader.close();
}
 
@Test
 public void readEthereumBlockInputFormatBlock1() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
	JobConf job = new JobConf(defaultConf);
			ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth1.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();	
Path file = new Path(fileNameBlock);
   FileInputFormat.setInputPaths(job, file);
   EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
   format.configure(job);
   InputSplit[] inputSplits = format.getSplits(job,1);
 
   assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
   	RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned  null RecordReader");
BytesWritable key = new BytesWritable();	
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 1 contains at least one block");
assertEquals( 0, block.getEthereumTransactions().size(),"Block 1 must have 0 transactions");
   	assertFalse( reader.next(key,block),"No further blocks in block 1");
   	reader.close();
}
 
@Test
 public void readEthereumBlockInputFormatBlock3346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth3346406.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();	
Path file = new Path(fileNameBlock);
   FileInputFormat.setInputPaths(job, file);
   EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
   format.configure(job);
   InputSplit[] inputSplits = format.getSplits(job,1);
 
   assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
   	RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned  null RecordReader");

BytesWritable key = new BytesWritable();	
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 3346406 contains at least one block");
assertEquals( 7, block.getEthereumTransactions().size(),"Block 3346406 must have 7 transactions");
   	assertFalse( reader.next(key,block),"No further blocks in block 3346406");
   	reader.close();
}
 
源代码11 项目: systemds   文件: IOUtilFunctions.java
public static void closeSilently( RecordReader<?,?> rr ) 
{
	try {
		if( rr != null )
			rr.close();
	}
	catch (Exception ex) {
		LOG.error("Failed to close record reader.", ex);
	}
}
 
源代码12 项目: hadoop   文件: CompositeRecordReader.java
/**
 * Close all child RRs.
 */
public void close() throws IOException {
  if (kids != null) {
    for (RecordReader<K,? extends Writable> rr : kids) {
      rr.close();
    }
  }
  if (jc != null) {
    jc.close();
  }
}
 
源代码13 项目: hadoop   文件: DumpTypedBytes.java
/**
 * Dump given list of files to standard output as typed bytes.
 */
@SuppressWarnings("unchecked")
private int dumpTypedBytes(List<FileStatus> files) throws IOException {
  JobConf job = new JobConf(getConf()); 
  DataOutputStream dout = new DataOutputStream(System.out);
  AutoInputFormat autoInputFormat = new AutoInputFormat();
  for (FileStatus fileStatus : files) {
    FileSplit split = new FileSplit(fileStatus.getPath(), 0,
      fileStatus.getLen() * fileStatus.getBlockSize(),
      (String[]) null);
    RecordReader recReader = null;
    try {
      recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
      Object key = recReader.createKey();
      Object value = recReader.createValue();
      while (recReader.next(key, value)) {
        if (key instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) key);
        } else {
          TypedBytesOutput.get(dout).write(key);
        }
        if (value instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) value);
        } else {
          TypedBytesOutput.get(dout).write(value);
        }
      }
    } finally {
      if (recReader != null) {
        recReader.close();
      }
    }
  }
  dout.flush();
  return 0;
}
 
源代码14 项目: big-c   文件: DumpTypedBytes.java
/**
 * Dump given list of files to standard output as typed bytes.
 */
@SuppressWarnings("unchecked")
private int dumpTypedBytes(List<FileStatus> files) throws IOException {
  JobConf job = new JobConf(getConf()); 
  DataOutputStream dout = new DataOutputStream(System.out);
  AutoInputFormat autoInputFormat = new AutoInputFormat();
  for (FileStatus fileStatus : files) {
    FileSplit split = new FileSplit(fileStatus.getPath(), 0,
      fileStatus.getLen() * fileStatus.getBlockSize(),
      (String[]) null);
    RecordReader recReader = null;
    try {
      recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
      Object key = recReader.createKey();
      Object value = recReader.createValue();
      while (recReader.next(key, value)) {
        if (key instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) key);
        } else {
          TypedBytesOutput.get(dout).write(key);
        }
        if (value instanceof Writable) {
          TypedBytesWritableOutput.get(dout).write((Writable) value);
        } else {
          TypedBytesOutput.get(dout).write(value);
        }
      }
    } finally {
      if (recReader != null) {
        recReader.close();
      }
    }
  }
  dout.flush();
  return 0;
}
 
@Test
public void readEthereumBlockInputFormatBlock447533() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
	JobConf job = new JobConf(defaultConf);
	ClassLoader classLoader = getClass().getClassLoader();
	String fileName="block447533.bin";
	String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();	
	Path file = new Path(fileNameBlock);
    FileInputFormat.setInputPaths(job, file);
    EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
    format.configure(job);
    InputSplit[] inputSplits = format.getSplits(job,1);
  
    assertEquals( 1, inputSplits.length,"Only one split generated for block 447533");
    	RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
	assertNotNull( reader,"Format returned  null RecordReader");
	BytesWritable key = new BytesWritable();	
	EthereumBlock block = new EthereumBlock();
	assertTrue( reader.next(key,block),"Input Split for block 447533 contains at least one block");
	assertEquals( 2, block.getEthereumTransactions().size(),"Block 447533 must have 2 transactions");
	EthereumBlockHeader ethereumBlockHeader = block.getEthereumBlockHeader();
	assertEquals(
			"a027231f42c80ca4125b5cb962a21cd4f812e88f",
			bytesToHex(ethereumBlockHeader.getCoinBase()).toLowerCase(),
			"Block 447533 was mined by a027231f42c80ca4125b5cb962a21cd4f812e88f"
	);
	assertEquals(
			"043559b70c54f0eea6a90b384286d7ab312129603e750075d09fd35e66f8068a",
			bytesToHex(ethereumBlockHeader.getParentHash()).toLowerCase(),
			"The parent of block 447533 has hash 043559b70c54f0eea6a90b384286d7ab312129603e750075d09fd35e66f8068a"
	);
    	assertFalse( reader.next(key,block),"No further block  in  block 447533");
    	
    	reader.close();
	
}
 
源代码16 项目: mnemonic   文件: MneMapredLongDataTest.java
@Test(enabled = true, dependsOnMethods = {"testWriteLongData"})
public void testReadLongData() throws Exception {
  long sum = 0L;
  long reccnt = 0L;
  File folder = new File(m_workdir.toString());
  File[] listfiles = folder.listFiles();
  for (int idx = 0; idx < listfiles.length; ++idx) {
    if (listfiles[idx].isFile()
            && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
            && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
      System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
      FileSplit split = new FileSplit(
              new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
      InputFormat<NullWritable, MneDurableInputValue<Long>> inputFormat =
              new MneInputFormat<MneDurableInputValue<Long>, Long>();
      RecordReader<NullWritable, MneDurableInputValue<Long>> reader =
              inputFormat.getRecordReader(split, m_conf, null);
      MneDurableInputValue<Long> mdval = null;
      NullWritable mdkey = reader.createKey();
      while (true) {
        mdval = reader.createValue();
        if (reader.next(mdkey, mdval)) {
          sum += mdval.getValue();
          ++reccnt;
        } else {
          break;
        }
      }
      reader.close();
    }
  }
  AssertJUnit.assertEquals(m_sum, sum);
  AssertJUnit.assertEquals(m_reccnt, reccnt);
  System.out.println(String.format("The checksum of long data is %d", sum));
}
 
源代码17 项目: mnemonic   文件: MneMapredPersonDataTest.java
@Test(enabled = true, dependsOnMethods = {"testWritePersonData"})
public void testReadPersonData() throws Exception {
  long sumage = 0L;
  long reccnt = 0L;
  File folder = new File(m_workdir.toString());
  File[] listfiles = folder.listFiles();
  for (int idx = 0; idx < listfiles.length; ++idx) {
    if (listfiles[idx].isFile()
            && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
            && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
      System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
      FileSplit split = new FileSplit(
              new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
      InputFormat<NullWritable, MneDurableInputValue<Person<Long>>> inputFormat =
              new MneInputFormat<MneDurableInputValue<Person<Long>>, Person<Long>>();
      RecordReader<NullWritable, MneDurableInputValue<Person<Long>>> reader =
              inputFormat.getRecordReader(split, m_conf, null);
      MneDurableInputValue<Person<Long>> personval = null;
      NullWritable personkey = reader.createKey();
      while (true) {
        personval = reader.createValue();
        if (reader.next(personkey, personval)) {
          AssertJUnit.assertTrue(personval.getValue().getAge() < 51);
          sumage += personval.getValue().getAge();
          ++reccnt;
        } else {
          break;
        }
      }
      reader.close();
    }
  }
  AssertJUnit.assertEquals(m_reccnt, reccnt);
  AssertJUnit.assertEquals(m_sumage, sumage);
  System.out.println(String.format("The checksum of ages is %d", sumage));
}
 
源代码18 项目: hadoop   文件: InputSampler.java
/**
 * Randomize the split order, then take the specified number of keys from
 * each split sampled, where each key is selected with the specified
 * probability and possibly replaced by a subsequently selected key when
 * the quota of keys from that split is satisfied.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);

  Random r = new Random();
  long seed = r.nextLong();
  r.setSeed(seed);
  LOG.debug("seed: " + seed);
  // shuffle splits
  for (int i = 0; i < splits.length; ++i) {
    InputSplit tmp = splits[i];
    int j = r.nextInt(splits.length);
    splits[i] = splits[j];
    splits[j] = tmp;
  }
  // our target rate is in terms of the maximum number of sample splits,
  // but we accept the possibility of sampling additional splits to hit
  // the target sample keyset
  for (int i = 0; i < splitsToSample ||
                 (i < splits.length && samples.size() < numSamples); ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
        Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      if (r.nextDouble() <= freq) {
        if (samples.size() < numSamples) {
          samples.add(key);
        } else {
          // When exceeding the maximum number of samples, replace a
          // random element with this one, then adjust the frequency
          // to reflect the possibility of existing elements being
          // pushed out
          int ind = r.nextInt(numSamples);
          if (ind != numSamples) {
            samples.set(ind, key);
          }
          freq *= (numSamples - 1) / (double) numSamples;
        }
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码19 项目: big-c   文件: InputSampler.java
/**
 * Randomize the split order, then take the specified number of keys from
 * each split sampled, where each key is selected with the specified
 * probability and possibly replaced by a subsequently selected key when
 * the quota of keys from that split is satisfied.
 */
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  ArrayList<K> samples = new ArrayList<K>(numSamples);
  int splitsToSample = Math.min(maxSplitsSampled, splits.length);

  Random r = new Random();
  long seed = r.nextLong();
  r.setSeed(seed);
  LOG.debug("seed: " + seed);
  // shuffle splits
  for (int i = 0; i < splits.length; ++i) {
    InputSplit tmp = splits[i];
    int j = r.nextInt(splits.length);
    splits[i] = splits[j];
    splits[j] = tmp;
  }
  // our target rate is in terms of the maximum number of sample splits,
  // but we accept the possibility of sampling additional splits to hit
  // the target sample keyset
  for (int i = 0; i < splitsToSample ||
                 (i < splits.length && samples.size() < numSamples); ++i) {
    RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
        Reporter.NULL);
    K key = reader.createKey();
    V value = reader.createValue();
    while (reader.next(key, value)) {
      if (r.nextDouble() <= freq) {
        if (samples.size() < numSamples) {
          samples.add(key);
        } else {
          // When exceeding the maximum number of samples, replace a
          // random element with this one, then adjust the frequency
          // to reflect the possibility of existing elements being
          // pushed out
          int ind = r.nextInt(numSamples);
          if (ind != numSamples) {
            samples.set(ind, key);
          }
          freq *= (numSamples - 1) / (double) numSamples;
        }
        key = reader.createKey();
      }
    }
    reader.close();
  }
  return (K[])samples.toArray();
}
 
源代码20 项目: mnemonic   文件: MneMapredChunkDataTest.java
@Test(enabled = true, dependsOnMethods = {"testWriteChunkData"})
public void testReadChunkData() throws Exception {
  List<String> partfns = new ArrayList<String>();
  long reccnt = 0L;
  long tsize = 0L;
  Checksum cs = new CRC32();
  cs.reset();
  File folder = new File(m_workdir.toString());
  File[] listfiles = folder.listFiles();
  for (int idx = 0; idx < listfiles.length; ++idx) {
    if (listfiles[idx].isFile()
            && listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
            && listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
      partfns.add(listfiles[idx].getName());
    }
  }
  Collections.sort(partfns); // keep the order for checksum
  for (int idx = 0; idx < partfns.size(); ++idx) {
    System.out.println(String.format("Verifying : %s", partfns.get(idx)));
    FileSplit split = new FileSplit(
            new Path(m_workdir, partfns.get(idx)), 0, 0L, new String[0]);
    InputFormat<NullWritable, MneDurableInputValue<DurableChunk<?>>> inputFormat =
            new MneInputFormat<MneDurableInputValue<DurableChunk<?>>, DurableChunk<?>>();
    RecordReader<NullWritable, MneDurableInputValue<DurableChunk<?>>> reader =
            inputFormat.getRecordReader(split, m_conf, null);
    MneDurableInputValue<DurableChunk<?>> dchkval = null;
    NullWritable dchkkey = reader.createKey();
    while (true) {
      dchkval = reader.createValue();
      if (reader.next(dchkkey, dchkval)) {
        byte b;
        for (int j = 0; j < dchkval.getValue().getSize(); ++j) {
          b = unsafe.getByte(dchkval.getValue().get() + j);
          cs.update(b);
        }
        tsize += dchkval.getValue().getSize();
        ++reccnt;
      } else {
        break;
      }
    }
    reader.close();
  }
  AssertJUnit.assertEquals(m_reccnt, reccnt);
  AssertJUnit.assertEquals(m_totalsize, tsize);
  AssertJUnit.assertEquals(m_checksum, cs.getValue());
  System.out.println(String.format("The checksum of chunk is %d", m_checksum));
}