下面列出了org.apache.hadoop.mapred.RecordReader#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
++records;
if ((double) kept / records < freq) {
++kept;
samples.add(key);
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* test DBInputFormat class. Class should split result for chunks
* @throws Exception
*/
@Test(timeout = 10000)
public void testDBInputFormat() throws Exception {
JobConf configuration = new JobConf();
setupDriver(configuration);
DBInputFormat<NullDBWritable> format = new DBInputFormat<NullDBWritable>();
format.setConf(configuration);
format.setConf(configuration);
DBInputFormat.DBInputSplit splitter = new DBInputFormat.DBInputSplit(1, 10);
Reporter reporter = mock(Reporter.class);
RecordReader<LongWritable, NullDBWritable> reader = format.getRecordReader(
splitter, configuration, reporter);
configuration.setInt(MRJobConfig.NUM_MAPS, 3);
InputSplit[] lSplits = format.getSplits(configuration, 3);
assertEquals(5, lSplits[0].getLength());
assertEquals(3, lSplits.length);
// test reader .Some simple tests
assertEquals(LongWritable.class, reader.createKey().getClass());
assertEquals(0, reader.getPos());
assertEquals(0, reader.getProgress(), 0.001);
reader.close();
}
@Test
public void readEthereumBlockInputFormatBlock1346406Bzip2Compressed() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth1346406.bin.bz2";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 1346406 contains at least one block");
assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions");
assertFalse( reader.next(key,block),"No further blocks in block 1346406");
reader.close();
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
++records;
if ((double) kept / records < freq) {
++kept;
samples.add(key);
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* test DBInputFormat class. Class should split result for chunks
* @throws Exception
*/
@Test(timeout = 10000)
public void testDBInputFormat() throws Exception {
JobConf configuration = new JobConf();
setupDriver(configuration);
DBInputFormat<NullDBWritable> format = new DBInputFormat<NullDBWritable>();
format.setConf(configuration);
format.setConf(configuration);
DBInputFormat.DBInputSplit splitter = new DBInputFormat.DBInputSplit(1, 10);
Reporter reporter = mock(Reporter.class);
RecordReader<LongWritable, NullDBWritable> reader = format.getRecordReader(
splitter, configuration, reporter);
configuration.setInt(MRJobConfig.NUM_MAPS, 3);
InputSplit[] lSplits = format.getSplits(configuration, 3);
assertEquals(5, lSplits[0].getLength());
assertEquals(3, lSplits.length);
// test reader .Some simple tests
assertEquals(LongWritable.class, reader.createKey().getClass());
assertEquals(0, reader.getPos());
assertEquals(0, reader.getProgress(), 0.001);
reader.close();
}
@Test
public void readEthereumBlockInputFormatGenesisBlock() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="ethgenesis.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for genesis block contains at least one block");
assertEquals( 0, block.getEthereumTransactions().size(),"Genesis Block must have 0 transactions");
assertFalse( reader.next(key,block),"No further blocks in genesis Block");
reader.close();
}
@Test
public void readEthereumBlockInputFormatBlock1() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth1.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 1 contains at least one block");
assertEquals( 0, block.getEthereumTransactions().size(),"Block 1 must have 0 transactions");
assertFalse( reader.next(key,block),"No further blocks in block 1");
reader.close();
}
@Test
public void readEthereumBlockInputFormatBlock3346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth3346406.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 3346406 contains at least one block");
assertEquals( 7, block.getEthereumTransactions().size(),"Block 3346406 must have 7 transactions");
assertFalse( reader.next(key,block),"No further blocks in block 3346406");
reader.close();
}
public static void closeSilently( RecordReader<?,?> rr )
{
try {
if( rr != null )
rr.close();
}
catch (Exception ex) {
LOG.error("Failed to close record reader.", ex);
}
}
/**
* Close all child RRs.
*/
public void close() throws IOException {
if (kids != null) {
for (RecordReader<K,? extends Writable> rr : kids) {
rr.close();
}
}
if (jc != null) {
jc.close();
}
}
/**
* Dump given list of files to standard output as typed bytes.
*/
@SuppressWarnings("unchecked")
private int dumpTypedBytes(List<FileStatus> files) throws IOException {
JobConf job = new JobConf(getConf());
DataOutputStream dout = new DataOutputStream(System.out);
AutoInputFormat autoInputFormat = new AutoInputFormat();
for (FileStatus fileStatus : files) {
FileSplit split = new FileSplit(fileStatus.getPath(), 0,
fileStatus.getLen() * fileStatus.getBlockSize(),
(String[]) null);
RecordReader recReader = null;
try {
recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
Object key = recReader.createKey();
Object value = recReader.createValue();
while (recReader.next(key, value)) {
if (key instanceof Writable) {
TypedBytesWritableOutput.get(dout).write((Writable) key);
} else {
TypedBytesOutput.get(dout).write(key);
}
if (value instanceof Writable) {
TypedBytesWritableOutput.get(dout).write((Writable) value);
} else {
TypedBytesOutput.get(dout).write(value);
}
}
} finally {
if (recReader != null) {
recReader.close();
}
}
}
dout.flush();
return 0;
}
/**
* Dump given list of files to standard output as typed bytes.
*/
@SuppressWarnings("unchecked")
private int dumpTypedBytes(List<FileStatus> files) throws IOException {
JobConf job = new JobConf(getConf());
DataOutputStream dout = new DataOutputStream(System.out);
AutoInputFormat autoInputFormat = new AutoInputFormat();
for (FileStatus fileStatus : files) {
FileSplit split = new FileSplit(fileStatus.getPath(), 0,
fileStatus.getLen() * fileStatus.getBlockSize(),
(String[]) null);
RecordReader recReader = null;
try {
recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
Object key = recReader.createKey();
Object value = recReader.createValue();
while (recReader.next(key, value)) {
if (key instanceof Writable) {
TypedBytesWritableOutput.get(dout).write((Writable) key);
} else {
TypedBytesOutput.get(dout).write(key);
}
if (value instanceof Writable) {
TypedBytesWritableOutput.get(dout).write((Writable) value);
} else {
TypedBytesOutput.get(dout).write(value);
}
}
} finally {
if (recReader != null) {
recReader.close();
}
}
}
dout.flush();
return 0;
}
@Test
public void readEthereumBlockInputFormatBlock447533() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="block447533.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for block 447533");
RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 447533 contains at least one block");
assertEquals( 2, block.getEthereumTransactions().size(),"Block 447533 must have 2 transactions");
EthereumBlockHeader ethereumBlockHeader = block.getEthereumBlockHeader();
assertEquals(
"a027231f42c80ca4125b5cb962a21cd4f812e88f",
bytesToHex(ethereumBlockHeader.getCoinBase()).toLowerCase(),
"Block 447533 was mined by a027231f42c80ca4125b5cb962a21cd4f812e88f"
);
assertEquals(
"043559b70c54f0eea6a90b384286d7ab312129603e750075d09fd35e66f8068a",
bytesToHex(ethereumBlockHeader.getParentHash()).toLowerCase(),
"The parent of block 447533 has hash 043559b70c54f0eea6a90b384286d7ab312129603e750075d09fd35e66f8068a"
);
assertFalse( reader.next(key,block),"No further block in block 447533");
reader.close();
}
@Test(enabled = true, dependsOnMethods = {"testWriteLongData"})
public void testReadLongData() throws Exception {
long sum = 0L;
long reccnt = 0L;
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
FileSplit split = new FileSplit(
new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<Long>> inputFormat =
new MneInputFormat<MneDurableInputValue<Long>, Long>();
RecordReader<NullWritable, MneDurableInputValue<Long>> reader =
inputFormat.getRecordReader(split, m_conf, null);
MneDurableInputValue<Long> mdval = null;
NullWritable mdkey = reader.createKey();
while (true) {
mdval = reader.createValue();
if (reader.next(mdkey, mdval)) {
sum += mdval.getValue();
++reccnt;
} else {
break;
}
}
reader.close();
}
}
AssertJUnit.assertEquals(m_sum, sum);
AssertJUnit.assertEquals(m_reccnt, reccnt);
System.out.println(String.format("The checksum of long data is %d", sum));
}
@Test(enabled = true, dependsOnMethods = {"testWritePersonData"})
public void testReadPersonData() throws Exception {
long sumage = 0L;
long reccnt = 0L;
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
FileSplit split = new FileSplit(
new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<Person<Long>>> inputFormat =
new MneInputFormat<MneDurableInputValue<Person<Long>>, Person<Long>>();
RecordReader<NullWritable, MneDurableInputValue<Person<Long>>> reader =
inputFormat.getRecordReader(split, m_conf, null);
MneDurableInputValue<Person<Long>> personval = null;
NullWritable personkey = reader.createKey();
while (true) {
personval = reader.createValue();
if (reader.next(personkey, personval)) {
AssertJUnit.assertTrue(personval.getValue().getAge() < 51);
sumage += personval.getValue().getAge();
++reccnt;
} else {
break;
}
}
reader.close();
}
}
AssertJUnit.assertEquals(m_reccnt, reccnt);
AssertJUnit.assertEquals(m_sumage, sumage);
System.out.println(String.format("The checksum of ages is %d", sumage));
}
/**
* Randomize the split order, then take the specified number of keys from
* each split sampled, where each key is selected with the specified
* probability and possibly replaced by a subsequently selected key when
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
for (int i = 0; i < splits.length; ++i) {
InputSplit tmp = splits[i];
int j = r.nextInt(splits.length);
splits[i] = splits[j];
splits[j] = tmp;
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for (int i = 0; i < splitsToSample ||
(i < splits.length && samples.size() < numSamples); ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
samples.add(key);
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
samples.set(ind, key);
}
freq *= (numSamples - 1) / (double) numSamples;
}
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* Randomize the split order, then take the specified number of keys from
* each split sampled, where each key is selected with the specified
* probability and possibly replaced by a subsequently selected key when
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
for (int i = 0; i < splits.length; ++i) {
InputSplit tmp = splits[i];
int j = r.nextInt(splits.length);
splits[i] = splits[j];
splits[j] = tmp;
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for (int i = 0; i < splitsToSample ||
(i < splits.length && samples.size() < numSamples); ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
samples.add(key);
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
samples.set(ind, key);
}
freq *= (numSamples - 1) / (double) numSamples;
}
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
@Test(enabled = true, dependsOnMethods = {"testWriteChunkData"})
public void testReadChunkData() throws Exception {
List<String> partfns = new ArrayList<String>();
long reccnt = 0L;
long tsize = 0L;
Checksum cs = new CRC32();
cs.reset();
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
partfns.add(listfiles[idx].getName());
}
}
Collections.sort(partfns); // keep the order for checksum
for (int idx = 0; idx < partfns.size(); ++idx) {
System.out.println(String.format("Verifying : %s", partfns.get(idx)));
FileSplit split = new FileSplit(
new Path(m_workdir, partfns.get(idx)), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<DurableChunk<?>>> inputFormat =
new MneInputFormat<MneDurableInputValue<DurableChunk<?>>, DurableChunk<?>>();
RecordReader<NullWritable, MneDurableInputValue<DurableChunk<?>>> reader =
inputFormat.getRecordReader(split, m_conf, null);
MneDurableInputValue<DurableChunk<?>> dchkval = null;
NullWritable dchkkey = reader.createKey();
while (true) {
dchkval = reader.createValue();
if (reader.next(dchkkey, dchkval)) {
byte b;
for (int j = 0; j < dchkval.getValue().getSize(); ++j) {
b = unsafe.getByte(dchkval.getValue().get() + j);
cs.update(b);
}
tsize += dchkval.getValue().getSize();
++reccnt;
} else {
break;
}
}
reader.close();
}
AssertJUnit.assertEquals(m_reccnt, reccnt);
AssertJUnit.assertEquals(m_totalsize, tsize);
AssertJUnit.assertEquals(m_checksum, cs.getValue());
System.out.println(String.format("The checksum of chunk is %d", m_checksum));
}