下面列出了org.apache.hadoop.mapred.RecordReader#createValue ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void ensureRecordsInCommit(String msg, String commit, int expectedNumberOfRecordsInCommit,
int totalExpected) throws IOException {
int actualCount = 0;
int totalCount = 0;
InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
for (InputSplit split : splits) {
RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat.getRecordReader(split, jobConf, null);
NullWritable key = recordReader.createKey();
ArrayWritable writable = recordReader.createValue();
while (recordReader.next(key, writable)) {
// writable returns an array with [field1, field2, _hoodie_commit_time,
// _hoodie_commit_seqno]
// Take the commit time and compare with the one we are interested in
if (commit.equals((writable.get()[2]).toString())) {
actualCount++;
}
totalCount++;
}
}
assertEquals(expectedNumberOfRecordsInCommit, actualCount, msg);
assertEquals(totalExpected, totalCount, msg);
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
++records;
if ((double) kept / records < freq) {
++kept;
samples.add(key);
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
++records;
if ((double) kept / records < freq) {
++kept;
samples.add(key);
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
@Test(enabled = true, dependsOnMethods = {"testWritePersonData"})
public void testReadPersonData() throws Exception {
long sumage = 0L;
long reccnt = 0L;
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
FileSplit split = new FileSplit(
new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<Person<Long>>> inputFormat =
new MneInputFormat<MneDurableInputValue<Person<Long>>, Person<Long>>();
RecordReader<NullWritable, MneDurableInputValue<Person<Long>>> reader =
inputFormat.getRecordReader(split, m_conf, null);
MneDurableInputValue<Person<Long>> personval = null;
NullWritable personkey = reader.createKey();
while (true) {
personval = reader.createValue();
if (reader.next(personkey, personval)) {
AssertJUnit.assertTrue(personval.getValue().getAge() < 51);
sumage += personval.getValue().getAge();
++reccnt;
} else {
break;
}
}
reader.close();
}
}
AssertJUnit.assertEquals(m_reccnt, reccnt);
AssertJUnit.assertEquals(m_sumage, sumage);
System.out.println(String.format("The checksum of ages is %d", sumage));
}
/**
* Dump given list of files to standard output as typed bytes.
*/
@SuppressWarnings("unchecked")
private int dumpTypedBytes(List<FileStatus> files) throws IOException {
JobConf job = new JobConf(getConf());
DataOutputStream dout = new DataOutputStream(System.out);
AutoInputFormat autoInputFormat = new AutoInputFormat();
for (FileStatus fileStatus : files) {
FileSplit split = new FileSplit(fileStatus.getPath(), 0,
fileStatus.getLen() * fileStatus.getBlockSize(),
(String[]) null);
RecordReader recReader = null;
try {
recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
Object key = recReader.createKey();
Object value = recReader.createValue();
while (recReader.next(key, value)) {
if (key instanceof Writable) {
TypedBytesWritableOutput.get(dout).write((Writable) key);
} else {
TypedBytesOutput.get(dout).write(key);
}
if (value instanceof Writable) {
TypedBytesWritableOutput.get(dout).write((Writable) value);
} else {
TypedBytesOutput.get(dout).write(value);
}
}
} finally {
if (recReader != null) {
recReader.close();
}
}
}
dout.flush();
return 0;
}
/**
* Dump given list of files to standard output as typed bytes.
*/
@SuppressWarnings("unchecked")
private int dumpTypedBytes(List<FileStatus> files) throws IOException {
JobConf job = new JobConf(getConf());
DataOutputStream dout = new DataOutputStream(System.out);
AutoInputFormat autoInputFormat = new AutoInputFormat();
for (FileStatus fileStatus : files) {
FileSplit split = new FileSplit(fileStatus.getPath(), 0,
fileStatus.getLen() * fileStatus.getBlockSize(),
(String[]) null);
RecordReader recReader = null;
try {
recReader = autoInputFormat.getRecordReader(split, job, Reporter.NULL);
Object key = recReader.createKey();
Object value = recReader.createValue();
while (recReader.next(key, value)) {
if (key instanceof Writable) {
TypedBytesWritableOutput.get(dout).write((Writable) key);
} else {
TypedBytesOutput.get(dout).write(key);
}
if (value instanceof Writable) {
TypedBytesWritableOutput.get(dout).write((Writable) value);
} else {
TypedBytesOutput.get(dout).write(value);
}
}
} finally {
if (recReader != null) {
recReader.close();
}
}
}
dout.flush();
return 0;
}
@Test(enabled = true, dependsOnMethods = {"testWriteLongData"})
public void testReadLongData() throws Exception {
long sum = 0L;
long reccnt = 0L;
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
System.out.println(String.format("Verifying : %s", listfiles[idx].getName()));
FileSplit split = new FileSplit(
new Path(m_workdir, listfiles[idx].getName()), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<Long>> inputFormat =
new MneInputFormat<MneDurableInputValue<Long>, Long>();
RecordReader<NullWritable, MneDurableInputValue<Long>> reader =
inputFormat.getRecordReader(split, m_conf, null);
MneDurableInputValue<Long> mdval = null;
NullWritable mdkey = reader.createKey();
while (true) {
mdval = reader.createValue();
if (reader.next(mdkey, mdval)) {
sum += mdval.getValue();
++reccnt;
} else {
break;
}
}
reader.close();
}
}
AssertJUnit.assertEquals(m_sum, sum);
AssertJUnit.assertEquals(m_reccnt, reccnt);
System.out.println(String.format("The checksum of long data is %d", sum));
}
/**
* Run the map task.
* @param input the set of inputs
* @param output the object to collect the outputs of the map
* @param reporter the object to update with status
*/
@SuppressWarnings("unchecked")
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter) throws IOException {
Application<K1, V1, K2, V2> application = null;
try {
RecordReader<FloatWritable, NullWritable> fakeInput =
(!Submitter.getIsJavaRecordReader(job) &&
!Submitter.getIsJavaMapper(job)) ?
(RecordReader<FloatWritable, NullWritable>) input : null;
application = new Application<K1, V1, K2, V2>(job, fakeInput, output,
reporter,
(Class<? extends K2>) job.getOutputKeyClass(),
(Class<? extends V2>) job.getOutputValueClass());
} catch (InterruptedException ie) {
throw new RuntimeException("interrupted", ie);
}
DownwardProtocol<K1, V1> downlink = application.getDownlink();
boolean isJavaInput = Submitter.getIsJavaRecordReader(job);
downlink.runMap(reporter.getInputSplit(),
job.getNumReduceTasks(), isJavaInput);
boolean skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
try {
if (isJavaInput) {
// allocate key & value instances that are re-used for all entries
K1 key = input.createKey();
V1 value = input.createValue();
downlink.setInputTypes(key.getClass().getName(),
value.getClass().getName());
while (input.next(key, value)) {
// map pair to output
downlink.mapItem(key, value);
if(skipping) {
//flush the streams on every record input if running in skip mode
//so that we don't buffer other records surrounding a bad record.
downlink.flush();
}
}
downlink.endOfInput();
}
application.waitForFinish();
} catch (Throwable t) {
application.abort(t);
} finally {
application.cleanup();
}
}
/**
* Randomize the split order, then take the specified number of keys from
* each split sampled, where each key is selected with the specified
* probability and possibly replaced by a subsequently selected key when
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
for (int i = 0; i < splits.length; ++i) {
InputSplit tmp = splits[i];
int j = r.nextInt(splits.length);
splits[i] = splits[j];
splits[j] = tmp;
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for (int i = 0; i < splitsToSample ||
(i < splits.length && samples.size() < numSamples); ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
samples.add(key);
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
samples.set(ind, key);
}
freq *= (numSamples - 1) / (double) numSamples;
}
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
public void testHiveInputFormat() throws Exception {
getConnection();
Connection conn = startNetserverAndGetLocalNetConnection();
Statement st = conn.createStatement();
st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "' batchtimeinterval 5000 milliseconds");
st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) persistent hdfsstore (myhdfs) BUCKETS 1");
PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)");
int NUM_ENTRIES = 20;
for(int i = 0; i < NUM_ENTRIES; i++) {
ps.setInt(1, i);
ps.setString(2, "Value-" + System.nanoTime());
ps.execute();
}
//Wait for data to get to HDFS...
String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1");
st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/"));
assertEquals(1, list.length);
conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1");
conf.set(RowInputFormat.HOME_DIR, HDFS_DIR);
JobConf job = new JobConf(conf);
job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
GFXDHiveInputFormat ipformat = new GFXDHiveInputFormat();
InputSplit[] splits = ipformat.getSplits(job, 2);
assertEquals(1, splits.length);
GFXDHiveSplit split = (GFXDHiveSplit) splits[0];
assertEquals(1, split.getPaths().length);
assertEquals(list[0].getPath().toString(), split.getPath(0).toString());
assertEquals(0, split.getOffset(0));
assertEquals(list[0].getLen(), split.getLength(0));
RecordReader<Key, Row> rr = ipformat.getRecordReader(split, job, null);
assertTrue("Row record reader should be an instace of GFXDHiveRowRecordReader " +
"but it is an instance of " + rr.getClass(), (rr instanceof GFXDHiveRowRecordReader));
Key key = rr.createKey();
Row value = rr.createValue();
int count = 0;
while (rr.next(key, value)) {
assertEquals(count++, value.getRowAsResultSet().getInt("col1"));
}
assertEquals(20, count);
TestUtil.shutDown();
}
public void testEventInputFormat() throws Exception {
getConnection();
Connection conn = startNetserverAndGetLocalNetConnection();
Statement st = conn.createStatement();
st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "' batchtimeinterval 5000 milliseconds");
st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) persistent hdfsstore (myhdfs) BUCKETS 1");
PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)");
int NUM_ENTRIES = 20;
for(int i = 0; i < NUM_ENTRIES; i++) {
ps.setInt(1, i);
ps.setString(2, "Value-" + System.nanoTime());
ps.execute();
}
//Wait for data to get to HDFS...
String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1");
st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/"));
assertEquals(1, list.length);
conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1");
conf.set(RowInputFormat.HOME_DIR, HDFS_DIR);
JobConf job = new JobConf(conf);
job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
RowInputFormat ipformat = new RowInputFormat();
InputSplit[] splits = ipformat.getSplits(job, 2);
assertEquals(1, splits.length);
CombineFileSplit split = (CombineFileSplit) splits[0];
assertEquals(1, split.getPaths().length);
assertEquals(list[0].getPath().toString(), split.getPath(0).toString());
assertEquals(0, split.getOffset(0));
assertEquals(list[0].getLen(), split.getLength(0));
RecordReader<Key, Row> rr = ipformat.getRecordReader(split, job, null);
Key key = rr.createKey();
Row value = rr.createValue();
int count = 0;
while (rr.next(key, value)) {
assertEquals(count++, value.getRowAsResultSet().getInt("col1"));
}
assertEquals(20, count);
TestUtil.shutDown();
}
public void testNoSecureHdfsCheck() throws Exception {
getConnection();
Connection conn = startNetserverAndGetLocalNetConnection();
Statement st = conn.createStatement();
st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "' batchtimeinterval 5000 milliseconds");
st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) persistent hdfsstore (myhdfs) BUCKETS 1");
PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)");
int NUM_ENTRIES = 20;
for(int i = 0; i < NUM_ENTRIES; i++) {
ps.setInt(1, i);
ps.setString(2, "Value-" + System.nanoTime());
ps.execute();
}
//Wait for data to get to HDFS...
String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1");
st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)");
stopNetServer();
FabricServiceManager.currentFabricServiceInstance().stop(new Properties());
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/"));
assertEquals(1, list.length);
conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1");
conf.set(RowInputFormat.HOME_DIR, HDFS_DIR);
conf.set("hadoop.security.authentication", "kerberos");
JobConf job = new JobConf(conf);
job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
RowInputFormat ipformat = new RowInputFormat();
InputSplit[] splits = ipformat.getSplits(job, 2);
assertEquals(1, splits.length);
CombineFileSplit split = (CombineFileSplit) splits[0];
assertEquals(1, split.getPaths().length);
assertEquals(list[0].getPath().toString(), split.getPath(0).toString());
assertEquals(0, split.getOffset(0));
assertEquals(list[0].getLen(), split.getLength(0));
RecordReader<Key, Row> rr = ipformat.getRecordReader(split, job, null);
Key key = rr.createKey();
Row value = rr.createValue();
int count = 0;
while (rr.next(key, value)) {
assertEquals(count++, value.getRowAsResultSet().getInt("col1"));
}
assertEquals(20, count);
TestUtil.shutDown();
}
private void doTestRowSerDe(boolean concurrencyChecks) throws Exception {
getConnection();
Connection conn = startNetserverAndGetLocalNetConnection();
final long statTS = System.currentTimeMillis();
Statement st = conn.createStatement();
st.execute("create hdfsstore myhdfs namenode 'localhost' homedir '" + HDFS_DIR + "' batchtimeinterval 5000 milliseconds");
String concurrency = "persistent ENABLE CONCURRENCY CHECKS";
st.execute("create table app.mytab1 (col1 int primary key, col2 varchar(100)) partition by primary key buckets 1 hdfsstore (myhdfs) "
+(concurrencyChecks ? concurrency : ""));
PreparedStatement ps = conn.prepareStatement("insert into mytab1 values (?, ?)");
ps.setInt(1, 1);
ps.setString(2, "Value-1");
ps.execute();
//Wait for data to get to HDFS...
String qname = HDFSStoreFactoryImpl.getEventQueueName("/APP/MYTAB1");
st.execute("CALL SYS.WAIT_FOR_SENDER_QUEUE_FLUSH('" + qname + "', 1, 0)");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus[] list = fs.listStatus(new Path(HDFS_DIR + "/APP_MYTAB1/0/"));
assertEquals(1, list.length);
conf.set(RowInputFormat.INPUT_TABLE, "MYTAB1");
conf.set(RowInputFormat.HOME_DIR, HDFS_DIR);
JobConf job = new JobConf(conf);
job.setBoolean(RowInputFormat.CHECKPOINT_MODE, false);
RowInputFormat ipformat = new RowInputFormat();
InputSplit[] splits = ipformat.getSplits(job, 2);
assertEquals(1, splits.length);
RecordReader<Key, Row> rr = ipformat.getRecordReader(splits[0], job, null);
Key key = rr.createKey();
Row value = rr.createValue();
assertTrue(rr.next(key, value));
assertEquals(1, value.getRowAsResultSet().getInt(1));
assertEquals("Value-1", value.getRowAsResultSet().getString(2));
assertTrue(value.getTimestamp() > statTS);
assertFalse(value.getRowAsResultSet().next());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
value.write(dos);
dos.close();
byte[] buf = baos.toByteArray();
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
Row row = new Row();
row.readFields(dis);
dis.close();
assertEquals(1, row.getRowAsResultSet().getInt(1));
assertEquals("Value-1", row.getRowAsResultSet().getString(2));
assertFalse(value.getRowAsResultSet().next());
TestUtil.shutDown();
}
@Test
@Disabled
public void testHoodieRealtimeCombineHoodieInputFormat() throws Exception {
Configuration conf = new Configuration();
// initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.init(hadoopConf, tempDir.toAbsolutePath().toString(), HoodieTableType.MERGE_ON_READ);
String commitTime = "100";
final int numRecords = 1000;
// Create 3 parquet files with 1000 records each
File partitionDir = InputFormatTestUtil.prepareParquetTable(tempDir, schema, 3, numRecords, commitTime);
InputFormatTestUtil.commit(tempDir, commitTime);
// insert 1000 update records to log file 0
String newCommitTime = "101";
HoodieLogFormat.Writer writer =
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid0", commitTime, newCommitTime,
numRecords, numRecords, 0);
writer.close();
// insert 1000 update records to log file 1
writer =
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid1", commitTime, newCommitTime,
numRecords, numRecords, 0);
writer.close();
// insert 1000 update records to log file 2
writer =
InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, schema, "fileid2", commitTime, newCommitTime,
numRecords, numRecords, 0);
writer.close();
TableDesc tblDesc = Utilities.defaultTd;
// Set the input format
tblDesc.setInputFileFormatClass(HoodieCombineHiveInputFormat.class);
PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
pt.put(new Path(tempDir.toAbsolutePath().toString()), partDesc);
MapredWork mrwork = new MapredWork();
mrwork.getMapWork().setPathToPartitionInfo(pt);
Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
Utilities.setMapRedWork(conf, mrwork, mapWorkPath);
jobConf = new JobConf(conf);
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
jobConf.set(HAS_MAP_WORK, "true");
// The following config tells Hive to choose ExecMapper to read the MAP_WORK
jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
// setting the split size to be 3 to create one split for 3 file groups
jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "3");
HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat();
String tripsHiveColumnTypes = "double,string,string,string,double,double,double,double,double";
InputFormatTestUtil.setPropsForInputFormat(jobConf, schema, tripsHiveColumnTypes);
InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
// Since the SPLIT_SIZE is 3, we should create only 1 split with all 3 file groups
assertEquals(1, splits.length);
RecordReader<NullWritable, ArrayWritable> recordReader =
combineHiveInputFormat.getRecordReader(splits[0], jobConf, null);
NullWritable nullWritable = recordReader.createKey();
ArrayWritable arrayWritable = recordReader.createValue();
int counter = 0;
while (recordReader.next(nullWritable, arrayWritable)) {
// read over all the splits
counter++;
}
// should read out 3 splits, each for file0, file1, file2 containing 1000 records each
assertEquals(3000, counter);
}
/**
* Run the map task.
* @param input the set of inputs
* @param output the object to collect the outputs of the map
* @param reporter the object to update with status
*/
@SuppressWarnings("unchecked")
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter) throws IOException {
Application<K1, V1, K2, V2> application = null;
try {
RecordReader<FloatWritable, NullWritable> fakeInput =
(!Submitter.getIsJavaRecordReader(job) &&
!Submitter.getIsJavaMapper(job)) ?
(RecordReader<FloatWritable, NullWritable>) input : null;
application = new Application<K1, V1, K2, V2>(job, fakeInput, output,
reporter,
(Class<? extends K2>) job.getOutputKeyClass(),
(Class<? extends V2>) job.getOutputValueClass());
} catch (InterruptedException ie) {
throw new RuntimeException("interrupted", ie);
}
DownwardProtocol<K1, V1> downlink = application.getDownlink();
boolean isJavaInput = Submitter.getIsJavaRecordReader(job);
downlink.runMap(reporter.getInputSplit(),
job.getNumReduceTasks(), isJavaInput);
boolean skipping = job.getBoolean(MRJobConfig.SKIP_RECORDS, false);
try {
if (isJavaInput) {
// allocate key & value instances that are re-used for all entries
K1 key = input.createKey();
V1 value = input.createValue();
downlink.setInputTypes(key.getClass().getName(),
value.getClass().getName());
while (input.next(key, value)) {
// map pair to output
downlink.mapItem(key, value);
if(skipping) {
//flush the streams on every record input if running in skip mode
//so that we don't buffer other records surrounding a bad record.
downlink.flush();
}
}
downlink.endOfInput();
}
application.waitForFinish();
} catch (Throwable t) {
application.abort(t);
} finally {
application.cleanup();
}
}
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter)
throws IOException {
try {
// allocate key & value instances these objects will not be reused
// because execution of Mapper.map is not serialized.
K1 key = input.createKey();
V1 value = input.createValue();
while (input.next(key, value)) {
executorService.execute(new MapperInvokeRunable(key, value, output,
reporter));
checkForExceptionsFromProcessingThreads();
// Allocate new key & value instances as mapper is running in parallel
key = input.createKey();
value = input.createValue();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Finished dispatching all Mappper.map calls, job "
+ job.getJobName());
}
// Graceful shutdown of the Threadpool, it will let all scheduled
// Runnables to end.
executorService.shutdown();
try {
// Now waiting for all Runnables to end.
while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Awaiting all running Mappper.map calls to finish, job "
+ job.getJobName());
}
// NOTE: while Mapper.map dispatching has concluded there are still
// map calls in progress and exceptions would be thrown.
checkForExceptionsFromProcessingThreads();
}
// NOTE: it could be that a map call has had an exception after the
// call for awaitTermination() returing true. And edge case but it
// could happen.
checkForExceptionsFromProcessingThreads();
} catch (IOException ioEx) {
// Forcing a shutdown of all thread of the threadpool and rethrowing
// the IOException
executorService.shutdownNow();
throw ioEx;
} catch (InterruptedException iEx) {
throw new RuntimeException(iEx);
}
} finally {
mapper.close();
}
}
/**
* Randomize the split order, then take the specified number of keys from
* each split sampled, where each key is selected with the specified
* probability and possibly replaced by a subsequently selected key when
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
for (int i = 0; i < splits.length; ++i) {
InputSplit tmp = splits[i];
int j = r.nextInt(splits.length);
splits[i] = splits[j];
splits[j] = tmp;
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for (int i = 0; i < splitsToSample ||
(i < splits.length && samples.size() < numSamples); ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
samples.add(key);
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
samples.set(ind, key);
}
freq *= (numSamples - 1) / (double) numSamples;
}
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
@Test(enabled = true, dependsOnMethods = {"testWriteChunkData"})
public void testReadChunkData() throws Exception {
List<String> partfns = new ArrayList<String>();
long reccnt = 0L;
long tsize = 0L;
Checksum cs = new CRC32();
cs.reset();
File folder = new File(m_workdir.toString());
File[] listfiles = folder.listFiles();
for (int idx = 0; idx < listfiles.length; ++idx) {
if (listfiles[idx].isFile()
&& listfiles[idx].getName().startsWith(MneConfigHelper.getBaseOutputName(m_conf, null))
&& listfiles[idx].getName().endsWith(MneConfigHelper.DEFAULT_FILE_EXTENSION)) {
partfns.add(listfiles[idx].getName());
}
}
Collections.sort(partfns); // keep the order for checksum
for (int idx = 0; idx < partfns.size(); ++idx) {
System.out.println(String.format("Verifying : %s", partfns.get(idx)));
FileSplit split = new FileSplit(
new Path(m_workdir, partfns.get(idx)), 0, 0L, new String[0]);
InputFormat<NullWritable, MneDurableInputValue<DurableChunk<?>>> inputFormat =
new MneInputFormat<MneDurableInputValue<DurableChunk<?>>, DurableChunk<?>>();
RecordReader<NullWritable, MneDurableInputValue<DurableChunk<?>>> reader =
inputFormat.getRecordReader(split, m_conf, null);
MneDurableInputValue<DurableChunk<?>> dchkval = null;
NullWritable dchkkey = reader.createKey();
while (true) {
dchkval = reader.createValue();
if (reader.next(dchkkey, dchkval)) {
byte b;
for (int j = 0; j < dchkval.getValue().getSize(); ++j) {
b = unsafe.getByte(dchkval.getValue().get() + j);
cs.update(b);
}
tsize += dchkval.getValue().getSize();
++reccnt;
} else {
break;
}
}
reader.close();
}
AssertJUnit.assertEquals(m_reccnt, reccnt);
AssertJUnit.assertEquals(m_totalsize, tsize);
AssertJUnit.assertEquals(m_checksum, cs.getValue());
System.out.println(String.format("The checksum of chunk is %d", m_checksum));
}