下面列出了怎么用org.apache.hadoop.mapred.FileInputFormat的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Submit this job to mapred. The state becomes RUNNING if submission
* is successful, FAILED otherwise.
*/
protected synchronized void submit() {
try {
if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
FileSystem fs = FileSystem.get(theJobConf);
Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
for (int i = 0; i < inputPaths.length; i++) {
if (!fs.exists(inputPaths[i])) {
try {
fs.mkdirs(inputPaths[i]);
} catch (IOException e) {
}
}
}
}
RunningJob running = jc.submitJob(theJobConf);
this.mapredJobID = running.getID();
this.state = Job.RUNNING;
} catch (IOException ioe) {
this.state = Job.FAILED;
this.message = StringUtils.stringifyException(ioe);
}
}
private static void runJobPv(String inputDir, String outputDir, String jobName, Class<? extends Mapper> mapClass,
Class<? extends Reducer> reduceClass) throws Exception {
JobConf conf = new JobConf(PersonVersion.class);
conf.setJobName(jobName);
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(IntWritable.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(mapClass);
conf.setCombinerClass(reduceClass);
conf.setReducerClass(reduceClass);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, inputDir);
FileOutputFormat.setOutputPath(conf, new Path(outputDir));
JobClient.runJob(conf);
}
public void merge(Path output, Path[] dbs, boolean normalize, boolean filter) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("LinkDb merge: starting at " + sdf.format(start));
JobConf job = createMergeJob(getConf(), output, normalize, filter);
for (int i = 0; i < dbs.length; i++) {
FileInputFormat.addInputPath(job, new Path(dbs[i], LinkDb.CURRENT_NAME));
}
JobClient.runJob(job);
FileSystem fs = FileSystem.get(getConf());
fs.mkdirs(output);
fs.rename(FileOutputFormat.getOutputPath(job), new Path(output, LinkDb.CURRENT_NAME));
long end = System.currentTimeMillis();
LOG.info("LinkDb merge: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
@Override
public RecordReader getRecordReader(JobConf job, CombineFileSplit split, Reporter reporter,
Class<RecordReader<K, V>> rrClass) throws IOException {
isRealTime = Boolean.valueOf(job.get("hudi.hive.realtime", "false"));
if (isRealTime) {
List<RecordReader> recordReaders = new LinkedList<>();
ValidationUtils.checkArgument(split instanceof HoodieCombineRealtimeFileSplit, "Only "
+ HoodieCombineRealtimeFileSplit.class.getName() + " allowed, found " + split.getClass().getName());
for (InputSplit inputSplit : ((HoodieCombineRealtimeFileSplit) split).getRealtimeFileSplits()) {
if (split.getPaths().length == 0) {
continue;
}
FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(split.getPath(0).toString(), true, job);
recordReaders.add(inputFormat.getRecordReader(inputSplit, job, reporter));
}
return new HoodieCombineRealtimeRecordReader(job, split, recordReaders);
}
return new HadoopShimsSecure.CombineFileRecordReader(job, split, reporter, rrClass);
}
protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest,
ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
if( fs.isDirectory(path) ) {
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
for(InputSplit split: splits)
readTextCellFrameFromInputSplit(split, informat, job, dest);
}
else {
readRawTextCellFrameFromHDFS(path, job, fs, dest, schema, names, rlen, clen);
}
}
/**
* Creates a simple copy job.
*
* @param indirs List of input directories.
* @param outdir Output directory.
* @return JobConf initialised for a simple copy job.
* @throws Exception If an error occurs creating job configuration.
*/
static JobConf createCopyJob(List<Path> indirs, Path outdir) throws Exception {
Configuration defaults = new Configuration();
JobConf theJob = new JobConf(defaults, TestJobControl.class);
theJob.setJobName("DataMoveJob");
FileInputFormat.setInputPaths(theJob, indirs.toArray(new Path[0]));
theJob.setMapperClass(DataCopy.class);
FileOutputFormat.setOutputPath(theJob, outdir);
theJob.setOutputKeyClass(Text.class);
theJob.setOutputValueClass(Text.class);
theJob.setReducerClass(DataCopy.class);
theJob.setNumMapTasks(12);
theJob.setNumReduceTasks(4);
return theJob;
}
public static void readTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ReadMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static void writeTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(DATA_DIR, true);
fs.delete(WRITE_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(WriteMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, WRITE_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
@Test
public void getNone() {
FileInputFormat.addInputPath(conf, new Path("unused"));
conf.set(AccumuloSerde.COLUMN_MAPPINGS, "cf|f1");
try {
InputSplit[] splits = inputformat.getSplits(conf, 0);
assertEquals(splits.length, 1);
RecordReader<Text,AccumuloHiveRow> reader = inputformat.getRecordReader(splits[0], conf, null);
Text rowId = new Text("r1");
AccumuloHiveRow row = new AccumuloHiveRow();
row.setRowId("r1");
assertFalse(reader.next(rowId, row));
}catch (IOException e) {
log.error(e);
fail();
}
//for whatever reason, this errors unless I manually reset. mockKeyValues() ignored @BeforeTest
conf.set(AccumuloSerde.COLUMN_MAPPINGS, "cf|name,cf|sid,cf|dgrs,cf|mills");
}
@Override
public InputSplit[] allocateWork(List<CamusRequest> requests,
JobConf conf) throws IOException {
int numTasks = conf.getInt("mapred.map.tasks", 30);
reverseSortRequests(requests);
List<InputSplit> kafkaETLSplits = new ArrayList<InputSplit>();
Path[] tablePaths = FileInputFormat.getInputPaths(conf);
for (int i = 0; i < numTasks; i++) {
if (requests.size() > 0) {
kafkaETLSplits.add(new KafkaSplit(tablePaths[0]));
}
}
for (CamusRequest r : requests) {
getSmallestMultiSplit(kafkaETLSplits).addRequest(r);
}
InputSplit[] inputSplits = new InputSplit[kafkaETLSplits.size()];
return kafkaETLSplits.toArray(inputSplits);
}
/**
* Run the test
*
* @throws IOException on error
*/
public static void runTests(Configuration config) throws IOException {
config.setLong("io.bytes.per.checksum", bytesPerChecksum);
JobConf job = new JobConf(config, NNBench.class);
job.setJobName("NNBench-" + operation);
FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
// Explicitly set number of max map attempts to 1.
job.setMaxMapAttempts(1);
// Explicitly turn off speculative execution
job.setSpeculativeExecution(false);
job.setMapperClass(NNBenchMapper.class);
job.setReducerClass(NNBenchReducer.class);
FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks((int) numberOfReduces);
JobClient.runJob(job);
}
public static void readTest(FileSystem fs, boolean fastCheck)
throws Exception {
fs.delete(READ_DIR, true);
JobConf job = new JobConf(conf, TestFileSystem.class);
job.setBoolean("fs.test.fastCheck", fastCheck);
FileInputFormat.setInputPaths(job, CONTROL_DIR);
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(ReadMapper.class);
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
@Test
public void readEthereumBlockInputFormatBlock1() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth1.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 1 contains at least one block");
assertEquals( 0, block.getEthereumTransactions().size(),"Block 1 must have 0 transactions");
assertFalse( reader.next(key,block),"No further blocks in block 1");
reader.close();
}
@Test
public void readEthereumBlockInputFormatBlock1346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth1346406.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 1346406 contains at least one block");
assertEquals( 6, block.getEthereumTransactions().size(),"Block 1346406 must have 6 transactions");
assertFalse( reader.next(key,block),"No further blocks in block 1346406");
reader.close();
}
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
// only gather base statistics for FileInputFormats
if (!(mapredInputFormat instanceof FileInputFormat)) {
return null;
}
final FileBaseStatistics cachedFileStats = (cachedStats instanceof FileBaseStatistics) ?
(FileBaseStatistics) cachedStats : null;
try {
final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf);
return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
} catch (IOException ioex) {
if (LOG.isWarnEnabled()) {
LOG.warn("Could not determine statistics due to an io error: "
+ ioex.getMessage());
}
} catch (Throwable t) {
if (LOG.isErrorEnabled()) {
LOG.error("Unexpected problem while getting the file statistics: "
+ t.getMessage(), t);
}
}
// no statistics available
return null;
}
/**
* This method constructs the JobConf to be used to run the map reduce job to
* download the files from S3. This is a potentially expensive method since it
* makes multiple calls to S3 to get a listing of all the input data. Clients
* are encouraged to cache the returned JobConf reference and not call this
* method multiple times unless necessary.
*
* @return the JobConf to be used to run the map reduce job to download the
* files from S3.
*/
public JobConf getJobConf() throws IOException, ParseException {
JobConf conf = new JobConf(CopyFromS3.class);
conf.setJobName("CopyFromS3");
conf.setOutputKeyClass(NullWritable.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(S3CopyMapper.class);
// We configure a reducer, even though we don't use it right now.
// The idea is that, in the future we may.
conf.setReducerClass(HDFSWriterReducer.class);
conf.setNumReduceTasks(0);
FileInputFormat.setInputPaths(conf, new Path(tempFile));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));
conf.setOutputFormat(TextOutputFormat.class);
conf.setCompressMapOutput(true);
JobClient jobClient = new JobClient(conf);
FileSystem inputFS = FileSystem.get(URI.create(inputPathPrefix), conf);
DatePathFilter datePathFilter = new DatePathFilter(startDate, endDate);
List<Path> filePaths = getFilePaths(inputFS, new Path(inputPathPrefix), datePathFilter, jobClient.getDefaultMaps());
// Write the file names to a temporary index file to be used
// as input to the map tasks.
FileSystem outputFS = FileSystem.get(URI.create(tempFile), conf);
FSDataOutputStream outputStream = outputFS.create(new Path(tempFile), true);
try {
for (Path path : filePaths) {
outputStream.writeBytes(path.toString() + "\n");
}
}
finally {
outputStream.close();
}
conf.setNumMapTasks(Math.min(filePaths.size(), jobClient.getDefaultMaps()));
return conf;
}
@Test(timeout = 5000)
public void testSingleSplit() throws Exception {
Path workDir = new Path(TEST_ROOT_DIR, "testSingleSplit");
JobConf jobConf = new JobConf(defaultConf);
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
InputContext inputContext = createTezInputContext(jobConf, new Configuration(false));
MultiMRInput input = new MultiMRInput(inputContext, 1);
input.initialize();
AtomicLong inputLength = new AtomicLong();
LinkedHashMap<LongWritable, Text> data = createSplits(1, workDir, jobConf, inputLength);
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(jobConf, 1);
assertEquals(1, splits.length);
MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]);
InputDataInformationEvent event =
InputDataInformationEvent.createWithSerializedPayload(0,
splitProto.toByteString().asReadOnlyByteBuffer());
List<Event> eventList = new ArrayList<Event>();
eventList.add(event);
input.handleEvents(eventList);
assertReaders(input, data, 1, inputLength.get());
}
@Test
public void testInputFormatWithCompaction() throws IOException {
// initial commit
File partitionDir = InputFormatTestUtil.prepareTable(basePath, baseFileFormat, 10, "100");
InputFormatTestUtil.commit(basePath, "100");
// Add the paths
FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
InputSplit[] inputSplits = inputFormat.getSplits(jobConf, 10);
assertEquals(10, inputSplits.length);
FileStatus[] files = inputFormat.listStatus(jobConf);
assertEquals(10, files.length);
// simulate compaction requested
createCompactionFile(basePath, "125");
// add inserts after compaction timestamp
InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, "fileId2", 5, "200");
InputFormatTestUtil.commit(basePath, "200");
// verify snapshot reads show all new inserts even though there is pending compaction
files = inputFormat.listStatus(jobConf);
assertEquals(15, files.length);
// verify that incremental reads do NOT show inserts after compaction timestamp
InputFormatTestUtil.setupIncremental(jobConf, "100", 10);
files = inputFormat.listStatus(jobConf);
assertEquals(0, files.length,
"We should exclude commit 200 when there is a pending compaction at 150");
}
protected TensorBlock readTextCellTensorFromHDFS(Path path, JobConf job, long[] dims, ValueType[] schema) throws IOException {
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
LongWritable key = new LongWritable();
Text value = new Text();
int[] idims = Arrays.stream(dims).mapToInt(i -> (int) i).toArray();
TensorBlock ret;
if (schema.length == 1)
ret = new TensorBlock(schema[0], idims).allocateBlock();
else
ret = new TensorBlock(schema, idims).allocateBlock();
try {
int[] ix = new int[dims.length];
for (InputSplit split : splits) {
RecordReader<LongWritable, Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
try {
while (reader.next(key, value)) {
String[] parts = Arrays.stream(IOUtilFunctions.splitCSV(value.toString(), " "))
.filter(s -> !s.isEmpty()).toArray(String[]::new);
for (int i = 0; i < ix.length; i++) {
ix[i] = Integer.parseInt(parts[i]) - 1;
}
ret.set(ix, parts[ix.length]);
}
}
finally {
IOUtilFunctions.closeSilently(reader);
}
}
}
catch (Exception ex) {
throw new IOException("Unable to read tensor in text cell format.", ex);
}
return ret;
}
@Override
public final FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names,
long rlen, long clen)
throws IOException, DMLRuntimeException
{
//prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path( fname );
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
FileInputFormat.addInputPath(job, path);
//check existence and non-empty file
checkValidInputFile(fs, path);
//compute size if necessary
if( rlen <= 0 || clen <= 0 ) {
Pair<Integer,Integer> size = computeCSVSize(path, job, fs);
rlen = size.getKey();
clen = size.getValue();
}
//allocate output frame block
ValueType[] lschema = createOutputSchema(schema, clen);
String[] lnames = createOutputNames(names, clen);
FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen);
//core read (sequential/parallel)
readCSVFrameFromHDFS(path, job, fs, ret, lschema, lnames, rlen, clen);
return ret;
}
@Override
public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen,
int blen, long estnnz)
throws IOException, DMLRuntimeException
{
// prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, _numThreads);
splits = IOUtilFunctions.sortInputSplits(splits);
// check existence and non-empty file
checkValidInputFile(fs, path);
// allocate output matrix block
// First Read Pass (count rows/cols, determine offsets, allocate matrix block)
MatrixBlock ret = computeLIBSVMSizeAndCreateOutputMatrixBlock(splits, path, job, rlen, clen, estnnz);
rlen = ret.getNumRows();
clen = ret.getNumColumns();
// Second Read Pass (read, parse strings, append to matrix block)
readLIBSVMMatrixFromHDFS(splits, path, job, ret, rlen, clen, blen);
//post-processing (representation-specific, change of sparse/dense block representation)
// - nnz explicitly maintained in parallel for the individual splits
ret.examSparsity();
// sanity check for parallel row count (since determined internally)
if (rlen >= 0 && rlen != ret.getNumRows())
throw new DMLRuntimeException("Read matrix inconsistent with given meta data: "
+ "expected nrow="+ rlen + ", real nrow=" + ret.getNumRows());
return ret;
}
public int run(String[] args) throws Exception
{
// Create a configuration
Configuration conf = getConf();
// Create a job from the default configuration that will use the WordCount class
JobConf job = new JobConf(conf, LogCountsPerHour.class);
// Define our input path as the first command line argument and our output path as the second
Path in = new Path(args[0]);
Path out = new Path(args[1]);
// Create File Input/Output formats for these paths (in the job)
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
// Configure the job: name, mapper, reducer, and combiner
job.setJobName("LogAveragePerHour");
job.setMapperClass(LogMapClass.class);
job.setReducerClass(LogReduce.class);
job.setCombinerClass(LogReduce.class);
// Configure the output
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(DateWritable.class);
job.setOutputValueClass(IntWritable.class);
// Run the job
JobClient.runJob(job);
return 0;
}
public int run(String[] argv) throws IOException {
if (argv.length < 2) {
System.out.println("ExternalMapReduce <input> <output>");
return -1;
}
Path outDir = new Path(argv[1]);
Path input = new Path(argv[0]);
JobConf testConf = new JobConf(getConf(), ExternalMapReduce.class);
//try to load a class from libjar
try {
testConf.getClassByName("testjar.ClassWordCount");
} catch (ClassNotFoundException e) {
System.out.println("Could not find class from libjar");
return -1;
}
testConf.setJobName("external job");
FileInputFormat.setInputPaths(testConf, input);
FileOutputFormat.setOutputPath(testConf, outDir);
testConf.setMapperClass(MapClass.class);
testConf.setReducerClass(Reduce.class);
testConf.setNumReduceTasks(1);
JobClient.runJob(testConf);
return 0;
}
@Test(timeout = 5000)
public void testMultipleSplits() throws Exception {
Path workDir = new Path(TEST_ROOT_DIR, "testMultipleSplits");
JobConf jobConf = new JobConf(defaultConf);
jobConf.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class);
FileInputFormat.setInputPaths(jobConf, workDir);
InputContext inputContext = createTezInputContext(jobConf, new Configuration(false));
MultiMRInput input = new MultiMRInput(inputContext, 2);
input.initialize();
AtomicLong inputLength = new AtomicLong();
LinkedHashMap<LongWritable, Text> data = createSplits(2, workDir, jobConf, inputLength);
SequenceFileInputFormat<LongWritable, Text> format =
new SequenceFileInputFormat<LongWritable, Text>();
InputSplit[] splits = format.getSplits(jobConf, 2);
assertEquals(2, splits.length);
MRSplitProto splitProto1 = MRInputHelpers.createSplitProto(splits[0]);
InputDataInformationEvent event1 =
InputDataInformationEvent.createWithSerializedPayload(0,
splitProto1.toByteString().asReadOnlyByteBuffer());
MRSplitProto splitProto2 = MRInputHelpers.createSplitProto(splits[1]);
InputDataInformationEvent event2 =
InputDataInformationEvent.createWithSerializedPayload(0,
splitProto2.toByteString().asReadOnlyByteBuffer());
List<Event> eventList = new ArrayList<Event>();
eventList.add(event1);
eventList.add(event2);
input.handleEvents(eventList);
assertReaders(input, data, 2, inputLength.get());
}
@Test
public void readEthereumBlockInputFormatBlock3510000to3510010() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth351000to3510010.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
int count=0;
while (count<11) {
if (reader.next(key,block)) {
count++;
}
}
assertEquals(11,count,"Block 3510000 .. 3510010 contains 11 blocks");
assertFalse( reader.next(key,block),"No further blocks in block 3510000 .. 3510010");
reader.close();
}
static boolean runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
int numReds) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
fs.delete(outDir, true);
}
if (!fs.exists(inDir)) {
fs.mkdirs(inDir);
}
String input = "The quick brown fox\n" + "has many silly\n"
+ "red fox sox\n";
for (int i = 0; i < numMaps; ++i) {
DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
file.writeBytes(input);
file.close();
}
DistributedCache.addFileToClassPath(TestMRJobs.APP_JAR, conf, fs);
conf.setOutputCommitter(CustomOutputCommitter.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReds);
JobClient jobClient = new JobClient(conf);
RunningJob job = jobClient.submitJob(conf);
return jobClient.monitorAndPrintJob(conf, job);
}
@Override
protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest,
ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
int numThreads = OptimizerUtils.getParallelTextReadParallelism();
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
try
{
//create read tasks for all splits
ExecutorService pool = CommonThreadPool.get(numThreads);
InputSplit[] splits = informat.getSplits(job, numThreads);
ArrayList<ReadTask> tasks = new ArrayList<>();
for( InputSplit split : splits )
tasks.add(new ReadTask(split, informat, job, dest));
//wait until all tasks have been executed
List<Future<Object>> rt = pool.invokeAll(tasks);
pool.shutdown();
//check for exceptions
for( Future<Object> task : rt )
task.get();
}
catch (Exception e) {
throw new IOException("Failed parallel read of text cell input.", e);
}
}
public int run(final String[] args) throws Exception {
log.info("run starting");
final Configuration conf = getConf();
JobConf job = new JobConf(conf, ExternalJoin.class);
job.setJobName("AerospikeExternalJoin");
job.setMapperClass(Map.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LongWritable.class);
// job.setCombinerClass(Reduce.class); // Reduce changes format.
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Session.class);
job.setOutputFormat(SessionOutputFormat.class);
for (int ii = 0; ii < args.length; ++ii)
FileInputFormat.addInputPath(job, new Path(args[ii]));
JobClient.runJob(job);
log.info("finished");
return 0;
}
protected CombineFileRecordReaderWrapper(FileInputFormat<K,V> inputFormat,
CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx)
throws IOException {
FileSplit fileSplit = new FileSplit(split.getPath(idx),
split.getOffset(idx),
split.getLength(idx),
split.getLocations());
delegate = inputFormat.getRecordReader(fileSplit, (JobConf)conf, reporter);
}
/**
* Obtain the avro input schema from data
* @param conf
* @return
* @throws IOException
*/
public static Schema getAvroInputSchema(JobConf conf) throws IOException
{
Path[] paths = FileInputFormat.getInputPaths(conf);
if (paths == null)
{
throw new IllegalStateException("input paths do not exist in jobConf!");
}
Schema inputSchema = AvroUtils.getSchemaFromFile(conf, paths[0]);
if (inputSchema == null)
{
throw new IllegalStateException("Input does not have schema info and/or input is missing.");
}
return inputSchema;
}