下面列出了怎么用org.apache.hadoop.mapred.InputSplit的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testOpenClose() throws Exception {
DummyRecordReader recordReader = mock(DummyRecordReader.class);
DummyInputFormat inputFormat = mock(DummyInputFormat.class);
when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader);
HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
hadoopInputFormat.open(getHadoopInputSplit());
verify(inputFormat, times(1)).getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class));
verify(recordReader, times(1)).createKey();
verify(recordReader, times(1)).createValue();
assertThat(hadoopInputFormat.fetched, is(false));
hadoopInputFormat.close();
verify(recordReader, times(1)).close();
}
@Test
public void testOpenWithConfigurableReader() throws Exception {
ConfigurableDummyRecordReader recordReader = mock(ConfigurableDummyRecordReader.class);
DummyInputFormat inputFormat = mock(DummyInputFormat.class);
when(inputFormat.getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class))).thenReturn(recordReader);
HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
hadoopInputFormat.open(getHadoopInputSplit());
verify(inputFormat, times(1)).getRecordReader(any(InputSplit.class), any(JobConf.class), any(Reporter.class));
verify(recordReader, times(1)).setConf(any(JobConf.class));
verify(recordReader, times(1)).createKey();
verify(recordReader, times(1)).createValue();
assertThat(hadoopInputFormat.fetched, is(false));
}
@Override
public RecordReader<Text, DynamoDBItemWritable> getRecordReader(InputSplit split, JobConf conf,
Reporter reporter) throws
IOException {
reporter.progress();
Map<String, String> columnMapping =
HiveDynamoDBUtil.fromJsonString(conf.get(DynamoDBConstants.DYNAMODB_COLUMN_MAPPING));
Map<String, String> hiveTypeMapping = HiveDynamoDBUtil.extractHiveTypeMapping(conf);
DynamoDBQueryFilter queryFilter = getQueryFilter(conf, columnMapping, hiveTypeMapping);
DynamoDBSplit bbSplit = (DynamoDBSplit) split;
bbSplit.setDynamoDBFilterPushdown(queryFilter);
Collection<String> attributes = (columnMapping == null ? null : columnMapping.values());
DynamoDBRecordReaderContext context = buildHiveDynamoDBRecordReaderContext(bbSplit, conf,
reporter, attributes);
return new DefaultDynamoDBRecordReader(context);
}
@Override
public RecordReader<NullWritable, OrcLazyRow>
getRecordReader(InputSplit inputSplit, JobConf conf,
Reporter reporter) throws IOException {
ReaderWriterProfiler.setProfilerOptions(conf);
FileSplit fileSplit = (FileSplit) inputSplit;
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
reporter.setStatus(fileSplit.toString());
return new OrcRecordReader(
OrcFile.createReader(fs, path, conf),
conf,
fileSplit.getStart(),
fileSplit.getLength()
);
}
/**
* Allocates the first available split into the evaluator.
*
* @param evaluatorId
* the evaluator id
* @param value
* the queue of splits
* @return a numberedSplit or null if it cannot find one
*/
protected NumberedSplit<InputSplit> allocateSplit(final String evaluatorId,
final BlockingQueue<NumberedSplit<InputSplit>> value) {
if (value == null) {
LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null");
return null;
}
while (true) {
final NumberedSplit<InputSplit> split = value.poll();
if (split == null) {
return null;
}
if (value == unallocatedSplits || unallocatedSplits.remove(split)) {
LOG.log(Level.FINE, "Found split-" + split.getIndex() + " in the queue");
final NumberedSplit<InputSplit> old = evaluatorToSplits.putIfAbsent(evaluatorId, split);
if (old != null) {
throw new RuntimeException("Trying to assign different splits to the same evaluator is not supported");
} else {
LOG.log(Level.FINE, "Returning " + split.getIndex());
return split;
}
}
}
}
protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest,
ValueType[] schema, String[] names, long rlen, long clen)
throws IOException
{
if( fs.isDirectory(path) ) {
FileInputFormat.addInputPath(job, path);
TextInputFormat informat = new TextInputFormat();
informat.configure(job);
InputSplit[] splits = informat.getSplits(job, 1);
for(InputSplit split: splits)
readTextCellFrameFromInputSplit(split, informat, job, dest);
}
else {
readRawTextCellFrameFromHDFS(path, job, fs, dest, schema, names, rlen, clen);
}
}
private void doTest(List<List<String>> frameSplits, int overlapBefore, int overlapAfter, Integer chunkSize) throws Exception {
AlgorithmConfig algoConfig = m_mgr.getExecutionSource().getAlgorithmConfig();
algoConfig.addDimension(new DimensionConfig("frame", true, overlapBefore, overlapAfter, null, chunkSize, false));
algoConfig.addDimension(new DimensionConfig("type", false, 0, 0, null, null, false));
Set<Set<DataKey>> expectedKeySplits = buildExpectedSplits(frameSplits);
MR4CInputFormat format = new MR4CInputFormat();
InputSplit[] splits = format.getSplits( m_mgr.getExecutionSource(), 4);
Set<Set<DataKey>> actualKeySplits= new HashSet<Set<DataKey>>();
for ( InputSplit split : splits ) {
MR4CInputSplit bbSplit = (MR4CInputSplit) split;
actualKeySplits.add(new HashSet<DataKey>(bbSplit.getKeys().getKeys()));
}
assertEquals(expectedKeySplits, actualKeySplits);
}
protected List<DataReaderFactory<ColumnarBatch>> getSplitsFactories(String query) {
List<DataReaderFactory<ColumnarBatch>> tasks = new ArrayList<>();
try {
JobConf jobConf = JobUtil.createJobConf(options, query);
LlapBaseInputFormat llapInputFormat = new LlapBaseInputFormat(false, Long.MAX_VALUE);
//numSplits arg not currently supported, use 1 as dummy arg
InputSplit[] splits = llapInputFormat.getSplits(jobConf, 1);
for (InputSplit split : splits) {
tasks.add(getDataReaderFactory(split, jobConf, getArrowAllocatorMax()));
}
} catch (IOException e) {
LOG.error("Unable to submit query to HS2");
throw new RuntimeException(e);
}
return tasks;
}
static RecordReader<NullWritable, DynamoDBItemWritable> getRecordReader(
InputSplit inputSplit, JobConf job, Reporter reporter) throws IOException {
// CombineFileSplit indicates the new export format which includes a manifest file
if (inputSplit instanceof CombineFileSplit) {
int version = job.getInt(DynamoDBConstants.EXPORT_FORMAT_VERSION, -1);
if (version != ExportManifestRecordWriter.FORMAT_VERSION) {
throw new IOException("Unknown version: " + job.get(DynamoDBConstants
.EXPORT_FORMAT_VERSION));
}
return new ImportCombineFileRecordReader((CombineFileSplit) inputSplit, job, reporter);
} else if (inputSplit instanceof FileSplit) {
// FileSplit indicates the old data pipeline format which doesn't include a manifest file
Path path = ((FileSplit) inputSplit).getPath();
return new ImportRecordReader(job, path);
} else {
throw new IOException("Expecting CombineFileSplit or FileSplit but the input split type is:"
+ " " + inputSplit.getClass());
}
}
protected List<InputSplit> getSplits(Path path) throws IOException {
PxfInputFormat pxfInputFormat = new PxfInputFormat();
PxfInputFormat.setInputPaths(jobConf, path);
InputSplit[] splits = pxfInputFormat.getSplits(jobConf, 1);
List<InputSplit> result = new ArrayList<>();
/*
* HD-2547: If the file is empty, an empty split is returned: no
* locations and no length.
*/
if (splits != null) {
for (InputSplit split : splits) {
if (split.getLength() > 0) {
result.add(split);
}
}
}
return result;
}
@Test
public void readEthereumBlockInputFormatBlock3346406() throws IOException, EthereumBlockReadException, ParseException, InterruptedException {
JobConf job = new JobConf(defaultConf);
ClassLoader classLoader = getClass().getClassLoader();
String fileName="eth3346406.bin";
String fileNameBlock=classLoader.getResource("testdata/"+fileName).getFile();
Path file = new Path(fileNameBlock);
FileInputFormat.setInputPaths(job, file);
EthereumBlockFileInputFormat format = new EthereumBlockFileInputFormat();
format.configure(job);
InputSplit[] inputSplits = format.getSplits(job,1);
assertEquals( 1, inputSplits.length,"Only one split generated for genesis block");
RecordReader<BytesWritable, EthereumBlock> reader = format.getRecordReader(inputSplits[0], job, reporter);
assertNotNull( reader,"Format returned null RecordReader");
BytesWritable key = new BytesWritable();
EthereumBlock block = new EthereumBlock();
assertTrue( reader.next(key,block),"Input Split for block 3346406 contains at least one block");
assertEquals( 7, block.getEthereumTransactions().size(),"Block 3346406 must have 7 transactions");
assertFalse( reader.next(key,block),"No further blocks in block 3346406");
reader.close();
}
private ListenableFuture<?> addSplitsToSource(InputSplit[] targetSplits, InternalHiveSplitFactory splitFactory)
throws IOException
{
ListenableFuture<?> lastResult = COMPLETED_FUTURE;
for (InputSplit inputSplit : targetSplits) {
Optional<InternalHiveSplit> internalHiveSplit = splitFactory.createInternalHiveSplit((FileSplit) inputSplit);
if (internalHiveSplit.isPresent()) {
lastResult = hiveSplitSource.addToQueue(internalHiveSplit.get());
}
if (stopped) {
return COMPLETED_FUTURE;
}
}
return lastResult;
}
@SuppressWarnings("unchecked")
@Override
public void readFields(DataInput in) throws IOException {
wrappedInputFormatName = Text.readString(in);
String inputSplitClassName = Text.readString(in);
Class<? extends InputSplit> clazz = null;
try {
clazz = (Class<? extends InputSplit>)
TezGroupedSplitsInputFormat.getClassFromName(inputSplitClassName);
} catch (TezException e) {
throw new IOException(e);
}
int numSplits = in.readInt();
wrappedSplits = new ArrayList<InputSplit>(numSplits);
for (int i=0; i<numSplits; ++i) {
addSplit(readWrappedSplit(in, clazz));
}
long recordedLength = in.readLong();
if(recordedLength != length) {
throw new TezUncheckedException("Expected length: " + recordedLength
+ " actual length: " + length);
}
int numLocs = in.readInt();
if (numLocs > 0) {
locations = new String[numLocs];
for (int i=0; i<numLocs; ++i) {
locations[i] = Text.readString(in);
}
}
}
@Override
public RecordReader<BytesWritable,BitcoinTransaction> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
/** Create reader **/
try {
return new BitcoinTransactionRecordReader( (FileSplit) split,job,reporter);
} catch (HadoopCryptoLedgerConfigurationException|BitcoinBlockReadException e) {
// log
LOGFI.error(e);
}
return null;
}
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
// Delegate the generation of input splits to the 'original' InputFormat
return ReflectionUtils.newInstance(
job.getClass("mapred.pipes.user.inputformat",
TextInputFormat.class,
InputFormat.class), job).getSplits(job, numSplits);
}
public ParquetRecordReaderWrapper(
final ParquetInputFormat<ArrayWritable> newInputFormat,
final InputSplit oldSplit,
final JobConf oldJobConf,
final Reporter reporter)
throws IOException, InterruptedException {
this(newInputFormat, oldSplit, oldJobConf, reporter,
(new HiveBindingFactory()).create());
}
public void runMap(InputSplit split, int numReduces,
boolean pipedInput) throws IOException {
WritableUtils.writeVInt(stream, MessageType.RUN_MAP.code);
writeObject(split);
WritableUtils.writeVInt(stream, numReduces);
WritableUtils.writeVInt(stream, pipedInput ? 1 : 0);
}
public void addSplit(InputSplit split) {
wrappedSplits.add(split);
try {
length += split.getLength();
} catch (Exception e) {
throw new TezUncheckedException(e);
}
}
@Override
public RecordReader<GFKey, PersistedEventImpl> getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException {
CombineFileSplit cSplit = (CombineFileSplit) split;
AbstractGFRecordReader reader = new AbstractGFRecordReader();
reader.initialize(cSplit, job);
return reader;
}
public void _testGetFragmentsWithTestInputFormat() throws Exception {
int tableIndex = 0;
String tableName = TestDataHelper.TABLE_NAMES[tableIndex];
//String inputDataPath =
InputSplit[] fs = TestDataHelper.getSplits(tableName);
/*
InputData inputData = new InputData(TestDataHelper.populateInputDataParam(tableIndex), null);
GemFireXDFragmenter fragmenter = new GemFireXDFragmenter(inputData,
new TestGemFireXDManager(inputData));
FragmentsOutput fragments = fragmenter.GetFragments();
verifyFragments(fs, fragments);*/
}
/**
* Add an InputSplit to this collection.
* @throws IOException If capacity was not specified during construction
* or if capacity has been reached.
*/
public void add(InputSplit s) throws IOException {
if (null == splits) {
throw new IOException("Uninitialized InputSplit");
}
if (fill == splits.length) {
throw new IOException("Too many splits");
}
splits[fill++] = s;
totsize += s.getLength();
}
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit,
JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new LineRecordReader(job, (FileSplit) genericSplit);
}
@Override
public RecordReader<MRContainer, MRContainer> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
StormEvaluator.load_source_dir(); // load the parsed source parameters from a file
String path = ((FileSplit)split).getPath().toString();
ParsedDataSource ds = (ParsedDataSource)DataSource.get(path,Plan.conf);
return new ParsedRecordReader((FileSplit)split,job,ds.parser,(Trees)ds.args);
}
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit inputSplit, JobConf job, Reporter reporter) throws IOException {
InputStream inputStream = null;
try {
inputStream = getInputStream(job, (FileSplit) inputSplit);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
long start = ((FileSplit) inputSplit).getStart();
long end = start + inputSplit.getLength();
return new HiveXmlRecordReader(job, inputStream, start, end);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private static org.apache.hadoop.mapred.InputSplit[] generateOldSplits(
JobConf jobConf, boolean groupSplits, boolean sortSplits, int numTasks)
throws IOException {
// This is the real InputFormat
org.apache.hadoop.mapred.InputFormat inputFormat;
try {
inputFormat = jobConf.getInputFormat();
} catch (Exception e) {
throw new TezUncheckedException(e);
}
org.apache.hadoop.mapred.InputFormat finalInputFormat = inputFormat;
if (groupSplits) {
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat groupedFormat =
new org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat();
groupedFormat.setConf(jobConf);
groupedFormat.setInputFormat(inputFormat);
groupedFormat.setDesiredNumberOfSplits(numTasks);
finalInputFormat = groupedFormat;
} else {
finalInputFormat = inputFormat;
}
org.apache.hadoop.mapred.InputSplit[] splits = finalInputFormat
.getSplits(jobConf, jobConf.getNumMapTasks());
if (sortSplits) {
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(splits, new OldInputSplitComparator());
}
return splits;
}
/**
* Collect a set of hosts from all child InputSplits.
*/
public String[] getLocations() throws IOException {
HashSet<String> hosts = new HashSet<String>();
for (InputSplit s : splits) {
String[] hints = s.getLocations();
if (hints != null && hints.length > 0) {
for (String host : hints) {
hosts.add(host);
}
}
}
return hosts.toArray(new String[hosts.size()]);
}
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit ignored, JobConf conf, Reporter reporter) throws IOException {
return new RecordReader<LongWritable, Text>() {
boolean sentOneRecord = false;
public boolean next(LongWritable key, Text value)
throws IOException {
key.set(1);
value.set("dummy");
if (sentOneRecord == false) { // first call
sentOneRecord = true;
return true;
}
return false; // we have sent one record - we are done
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
public long getPos() throws IOException {
return 1;
}
public void close() throws IOException {
}
public float getProgress() throws IOException {
return 1;
}
};
}
@Override
public RecordReader<GFKey, PersistedEventImpl> getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException {
CombineFileSplit cSplit = (CombineFileSplit) split;
AbstractGFRecordReader reader = new AbstractGFRecordReader();
reader.initialize(cSplit, job);
return reader;
}
@Test
public void testEmptyFile() throws Exception {
JobConf job = new JobConf(conf);
Properties properties = new Properties();
HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
FileSinkOperator.RecordWriter writer =
outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true,
properties, Reporter.NULL);
writer.close(true);
properties.setProperty("columns", "x,y");
properties.setProperty("columns.types", "int:int");
SerDe serde = new OrcSerde();
serde.initialize(conf, properties);
InputFormat<?,?> in = new OrcInputFormat();
FileInputFormat.setInputPaths(conf, testFilePath.toString());
InputSplit[] splits = in.getSplits(conf, 1);
assertEquals(1, splits.length);
// read the whole file
conf.set("hive.io.file.readcolumn.ids", "0,1");
org.apache.hadoop.mapred.RecordReader reader =
in.getRecordReader(splits[0], conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
assertEquals(0.0, reader.getProgress(), 0.00001);
assertEquals(0, reader.getPos());
assertEquals(false, reader.next(key, value));
reader.close();
assertEquals(null, serde.getSerDeStats());
}
/** {@inheritDoc} */
public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
List<org.apache.hadoop.mapreduce.InputSplit> newSplits =
super.getSplits(Job.getInstance(job));
InputSplit[] ret = new InputSplit[newSplits.size()];
int i = 0;
for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) {
org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split =
(org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s;
ret[i++] = new DBInputSplit(split.getStart(), split.getEnd());
}
return ret;
}