下面列出了org.apache.hadoop.io.compress.SplittableCompressionCodec#org.apache.hadoop.mapred.FileSplit 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test(dataProvider = "rowCount")
public void testAvroFileInSymlinkTable(int rowCount)
throws Exception
{
File file = File.createTempFile("presto_test", AVRO.name());
//noinspection ResultOfMethodCallIgnored
file.delete();
try {
FileSplit split = createTestFile(file.getAbsolutePath(), AVRO, HiveCompressionCodec.NONE, getTestColumnsSupportedByAvro(), rowCount);
Properties splitProperties = new Properties();
splitProperties.setProperty(FILE_INPUT_FORMAT, SymlinkTextInputFormat.class.getName());
splitProperties.setProperty(SERIALIZATION_LIB, AVRO.getSerDe());
testCursorProvider(createGenericHiveRecordCursorProvider(HDFS_ENVIRONMENT), split, splitProperties, getTestColumnsSupportedByAvro(), SESSION, rowCount);
}
finally {
//noinspection ResultOfMethodCallIgnored
file.delete();
}
}
@Override
public FileSplit[] getSplits(JobConf job, int numSplits) throws IOException {
// first, merge input table properties (since there's no access to them ...)
Settings settings = HadoopSettingsManager.loadFrom(job);
//settings.merge(IOUtils.propsFromString(settings.getProperty(HiveConstants.INPUT_TBL_PROPERTIES)));
Log log = LogFactory.getLog(getClass());
// move on to initialization
InitializationUtils.setValueReaderIfNotSet(settings, HiveValueReader.class, log);
InitializationUtils.setUserProviderIfNotSet(settings, HadoopUserProvider.class, log);
if (settings.getOutputAsJson() == false) {
// Only set the fields if we aren't asking for raw JSON
settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(HiveUtils.columnToAlias(settings), ","));
}
HiveUtils.init(settings, log);
// decorate original splits as FileSplit
InputSplit[] shardSplits = super.getSplits(job, numSplits);
FileSplit[] wrappers = new FileSplit[shardSplits.length];
Path path = new Path(job.get(HiveConstants.TABLE_LOCATION));
for (int i = 0; i < wrappers.length; i++) {
wrappers[i] = new EsHiveSplit(shardSplits[i], path);
}
return wrappers;
}
public ParsedRecordReader ( FileSplit split,
Configuration conf,
Class<? extends Parser> parser_class,
Trees args ) throws IOException {
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(conf);
fsin = fs.open(split.getPath());
try {
parser = parser_class.newInstance();
} catch (Exception ex) {
throw new Error("Unrecognized parser:"+parser_class);
};
parser.initialize(args);
parser.open(fsin,start,end);
result = null;
}
public static InputSplit[] sortInputSplits(InputSplit[] splits) {
if (splits[0] instanceof FileSplit) {
// The splits do not always arrive in order by file name.
// Sort the splits lexicographically by path so that the header will
// be in the first split.
// Note that we're assuming that the splits come in order by offset
Arrays.sort(splits, new Comparator<InputSplit>() {
@Override
public int compare(InputSplit o1, InputSplit o2) {
Path p1 = ((FileSplit) o1).getPath();
Path p2 = ((FileSplit) o2).getPath();
return p1.toString().compareTo(p2.toString());
}
});
}
return splits;
}
@Override
public RecordReader<Text, SpreadSheetCellDAO> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
throws IOException {
/** Create reader **/
try {
// send configuration option to ms excel. The format of the Excel (old vs new) is detected automaitcally
job.set(HadoopOfficeReadConfiguration.CONF_MIMETYPE,"ms-excel");
return new ExcelCellRecordReader( (FileSplit) split,job,reporter);
} catch (FormatNotUnderstoodException e) {
// log
LOGIF.error(e);
} catch (GeneralSecurityException gse) {
LOGIF.error(gse);
}
return null;
}
static RecordReader<NullWritable, DynamoDBItemWritable> getRecordReader(
InputSplit inputSplit, JobConf job, Reporter reporter) throws IOException {
// CombineFileSplit indicates the new export format which includes a manifest file
if (inputSplit instanceof CombineFileSplit) {
int version = job.getInt(DynamoDBConstants.EXPORT_FORMAT_VERSION, -1);
if (version != ExportManifestRecordWriter.FORMAT_VERSION) {
throw new IOException("Unknown version: " + job.get(DynamoDBConstants
.EXPORT_FORMAT_VERSION));
}
return new ImportCombineFileRecordReader((CombineFileSplit) inputSplit, job, reporter);
} else if (inputSplit instanceof FileSplit) {
// FileSplit indicates the old data pipeline format which doesn't include a manifest file
Path path = ((FileSplit) inputSplit).getPath();
return new ImportRecordReader(job, path);
} else {
throw new IOException("Expecting CombineFileSplit or FileSplit but the input split type is:"
+ " " + inputSplit.getClass());
}
}
@Override
public RecordReader<NullWritable,ColumnAndIndex> getRecordReader( final InputSplit split, final JobConf job, final Reporter reporter ) throws IOException {
FileSplit fileSplit = (FileSplit)split;
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem( job );
long fileLength = fs.getLength( path );
long start = fileSplit.getStart();
long length = fileSplit.getLength();
InputStream in = fs.open( path );
IJobReporter jobReporter = new HadoopJobReporter( reporter );
jobReporter.setStatus( String.format( "Read file : %s" , path.toString() ) );
HiveReaderSetting hiveConfig = new HiveReaderSetting( fileSplit , job );
if ( hiveConfig.isVectorMode() ){
IVectorizedReaderSetting vectorizedSetting = new HiveVectorizedReaderSetting( fileSplit , job , hiveConfig );
return (RecordReader)new MDSHiveDirectVectorizedReader( in , fileLength , start , length , vectorizedSetting , jobReporter );
}
else{
return new MDSHiveLineReader( in , fileLength , start , length , hiveConfig , jobReporter , spreadCounter );
}
}
/**
* Constructor
* @param job
* @param split
* @throws IOException
*/
public LineDocRecordReader(Configuration job, FileSplit split)
throws IOException {
long start = split.getStart();
long end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
InputStream in = fileIn;
boolean skipFirstLine = false;
if (start != 0) {
skipFirstLine = true; // wait till BufferedInputStream to skip
--start;
fileIn.seek(start);
}
this.in = new BufferedInputStream(in);
if (skipFirstLine) { // skip first line and re-establish "start".
start += LineDocRecordReader.readData(this.in, null, EOL);
}
this.start = start;
this.pos = start;
this.end = end;
}
@Override
public RecordReader<NullWritable, OrcLazyRow>
getRecordReader(InputSplit inputSplit, JobConf conf,
Reporter reporter) throws IOException {
ReaderWriterProfiler.setProfilerOptions(conf);
FileSplit fileSplit = (FileSplit) inputSplit;
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
reporter.setStatus(fileSplit.toString());
return new OrcRecordReader(
OrcFile.createReader(fs, path, conf),
conf,
fileSplit.getStart(),
fileSplit.getLength()
);
}
public StreamXmlRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
JobConf job, FileSystem fs) throws IOException {
super(in, split, reporter, job, fs);
beginMark_ = checkJobGet(CONF_NS + "begin");
endMark_ = checkJobGet(CONF_NS + "end");
maxRecSize_ = job_.getInt(CONF_NS + "maxrec", 50 * 1000);
lookAhead_ = job_.getInt(CONF_NS + "lookahead", 2 * maxRecSize_);
synched_ = false;
slowMatch_ = job_.getBoolean(CONF_NS + "slowmatch", false);
if (slowMatch_) {
beginPat_ = makePatternCDataOrMark(beginMark_);
endPat_ = makePatternCDataOrMark(endMark_);
}
init();
}
@Override
public ManagedReader<? extends SchemaNegotiator> next() {
FileSplit split = fileFramework.nextSplit();
if (split == null) {
return null;
}
return newReader(split);
}
@Override
protected void configure() {
super.configure();
FileScanBuilder options = options();
// Create the Drill file system.
try {
dfs = context.newFileSystem(options.fsConf);
} catch (IOException e) {
throw UserException.dataReadError(e)
.addContext("Failed to create FileSystem")
.build(logger);
}
// Prepare the list of files. We need the list of paths up
// front to compute the maximum partition. Then, we need to
// iterate over the splits to create readers on demand.
List<Path> paths = new ArrayList<>();
for (FileWork work : options.files) {
Path path = dfs.makeQualified(work.getPath());
paths.add(path);
FileSplit split = new FileSplit(path, work.getStart(), work.getLength(), new String[]{""});
spilts.add(split);
}
splitIter = spilts.iterator();
// Create the metadata manager to handle file metadata columns
// (so-called implicit columns and partition columns.)
options.metadataOptions().setFiles(paths);
metadataManager = new FileMetadataManager(
context.getFragmentContext().getOptions(),
options.metadataOptions());
builder.withMetadata(metadataManager);
}
public void map(LongWritable key, Text val,
OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
String fileName = fileSplit.getPath().getName();
location.set(fileName);
String line = val.toString();
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, location);
}
}
private ListenableFuture<?> addSplitsToSource(InputSplit[] targetSplits, InternalHiveSplitFactory splitFactory)
throws IOException
{
ListenableFuture<?> lastResult = COMPLETED_FUTURE;
for (InputSplit inputSplit : targetSplits) {
Optional<InternalHiveSplit> internalHiveSplit = splitFactory.createInternalHiveSplit((FileSplit) inputSplit);
if (internalHiveSplit.isPresent()) {
lastResult = hiveSplitSource.addToQueue(internalHiveSplit.get());
}
if (stopped) {
return COMPLETED_FUTURE;
}
}
return lastResult;
}
@Override
public RecordReader<LongWritable, BytesWritable> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new MainframeVBRecordReader(job, (FileSplit) genericSplit);
}
@Test
public void testCreateInputSplits() throws Exception {
FileSplit[] result = new FileSplit[1];
result[0] = getFileSplit();
DummyInputFormat inputFormat = mock(DummyInputFormat.class);
when(inputFormat.getSplits(any(JobConf.class), anyInt())).thenReturn(result);
HadoopInputFormat<String, Long> hadoopInputFormat = new HadoopInputFormat<>(inputFormat, String.class, Long.class, new JobConf());
hadoopInputFormat.createInputSplits(2);
verify(inputFormat, times(1)).getSplits(any(JobConf.class), anyInt());
}
public WarcFileRecordReader(Configuration conf, InputSplit split) throws IOException {
if (split instanceof FileSplit) {
this.filePathList=new Path[1];
this.filePathList[0]=((FileSplit)split).getPath();
} else if (split instanceof MultiFileSplit) {
this.filePathList=((MultiFileSplit)split).getPaths();
} else {
throw new IOException("InputSplit is not a file split or a multi-file split - aborting");
}
// Use FileSystem.get to open Common Crawl URIs using the S3 protocol.
URI uri = filePathList[0].toUri();
this.fs = FileSystem.get(uri, conf);
// get the total file sizes
for (int i=0; i < filePathList.length; i++) {
totalFileSize += fs.getFileStatus(filePathList[i]).getLen();
}
Class<? extends CompressionCodec> codecClass=null;
try {
codecClass=conf.getClassByName("org.apache.hadoop.io.compress.GzipCodec").asSubclass(CompressionCodec.class);
compressionCodec=(CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
} catch (ClassNotFoundException cnfEx) {
compressionCodec=null;
LOG.info("!!! ClassNotFoun Exception thrown setting Gzip codec");
}
openNextFile();
}
@BeforeEach
public void setUp(@TempDir java.nio.file.Path tempDir) throws Exception {
basePath = tempDir.toAbsolutePath().toString();
deltaLogPaths = Collections.singletonList(basePath + "/1.log");
fileSplitName = basePath + "/test.file";
baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[] {});
maxCommitTime = "10001";
split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogPaths, maxCommitTime);
}
@Override
public LWDocument[] toDocuments(Writable key, Writable value, Reporter reporter,
Configuration conf) throws IOException {
if (key != null && value != null) {
LWDocument doc = createDocument(key.toString() + "-" + System.currentTimeMillis(), null);
Matcher matcher = regex.matcher(value.toString());
if (matcher != null) {
if (match) {
if (matcher.matches()) {
processMatch(doc, matcher);
}
} else {//
while (matcher.find()) {
processMatch(doc, matcher);
reporter.progress();//do we really even need this?
}
}
}
// Adding the file path where this record was taken
FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
String originalLogFilePath = fileSplit.getPath().toUri().getPath();
doc.addField(FIELD_PATH, originalLogFilePath);
String docId = originalLogFilePath + "-" + doc.getId();
doc.setId(docId);
return new LWDocument[] {doc};
}
return null;
}
/**
* Produce splits such that each is no greater than the quotient of the
* total size and the number of splits requested.
* @param job The handle to the JobConf object
* @param numSplits Number of splits requested
*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
int cnfiles = job.getInt(SRC_COUNT_LABEL, -1);
long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1);
String srcFileList = job.get(SRC_LIST_LABEL, "");
Path srcFileListPath = new Path(srcFileList);
if (cnfiles < 0 || cbsize < 0 || "".equals(srcFileList)) {
throw new RuntimeException("Invalid metadata: #files(" + cnfiles +
") total_size(" + cbsize + ") src_chunk_file_list_uri(" +
srcFileList + ")");
}
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
SequenceFile.Reader sl = null;
String splitList = job.get(SPLIT_LIST_LABEL, "");
if("".equals(splitList)) {
throw new RuntimeException("Invalid metadata: split_list_uri(" +
srcFileList + ")");
}
//split file list which contains start pos and split length pairs
//they are used to split srcChunkFileList
Path splitListPath = new Path(splitList);
FileSystem splitListFs = splitListPath.getFileSystem(job);
try{
sl = new SequenceFile.Reader(splitListFs, splitListPath, job);
LongWritable startpos = new LongWritable();
LongWritable length = new LongWritable();
while (sl.next(startpos, length)) {
splits.add(new FileSplit(srcFileListPath, startpos.get(),
length.get(), (String[])null));
}
}
finally{
checkAndClose(sl);
}
return splits.toArray(new FileSplit[splits.size()]);
}
private void testCursorProvider(
HiveRecordCursorProvider cursorProvider,
FileSplit split,
Properties splitProperties,
List<TestColumn> testReadColumns,
ConnectorSession session,
int rowCount)
{
ConnectorPageSource pageSource = createPageSourceFromCursorProvider(cursorProvider, split, splitProperties, testReadColumns, session);
RecordCursor cursor = ((RecordPageSource) pageSource).getCursor();
checkCursor(cursor, testReadColumns, rowCount);
}
@Override
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
MyDemoRecordReader reader = new MyDemoRecordReader(
new LineRecordReader(job, (FileSplit) genericSplit));
return reader;
}
/**
* Generate the requested number of file splits, with the filename
* set to the filename of the output file.
*/
public InputSplit[] getSplits(JobConf job,
int numSplits) throws IOException {
InputSplit[] result = new InputSplit[numSplits];
Path outDir = FileOutputFormat.getOutputPath(job);
for(int i=0; i < result.length; ++i) {
result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1,
(String[])null);
}
return result;
}
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit,
JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new LineRecordReader(job, (FileSplit) genericSplit);
}
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit,
JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new LineRecordReader(job, (FileSplit) genericSplit);
}
@Override
public RecordReader<MRContainer, MRContainer> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
StormEvaluator.load_source_dir(); // load the parsed source parameters from a file
String path = ((FileSplit)split).getPath().toString();
ParsedDataSource ds = (ParsedDataSource)DataSource.get(path,Plan.conf);
return new ParsedRecordReader((FileSplit)split,job,ds.parser,(Trees)ds.args);
}
@Override
public RecordReader<Text,ArrayWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
/** Create reader **/
try {
// send configuration option to ms excel. The format of the Excel (old vs new) is detected automaitcally
job.set(HadoopOfficeReadConfiguration.CONF_MIMETYPE,"ms-excel");
return new ExcelRecordReader( (FileSplit) split,job,reporter);
} catch (FormatNotUnderstoodException e) {
// log
LOGIF.error(e);
} catch (GeneralSecurityException gse) {
LOGIF.error(gse);
}
return null;
}
@Override
protected Object getReader(JobConf conf, InputSplit split) throws IOException {
if (!isEmpty(identifier)) {
conf.set(JsonRecordReader.RECORD_MEMBER_IDENTIFIER, identifier);
conf.setInt(JsonRecordReader.RECORD_MAX_LENGTH, maxRecordLength);
return new JsonRecordReader(conf, (FileSplit) split);
} else {
return new LineRecordReader(conf, (FileSplit) split);
}
}
@VisibleForTesting
static FileSplit toHadoopFileSplit(FileInputSplit fileSplit) throws IOException {
URI uri = fileSplit.getPath().toUri();
long length = fileSplit.getLength();
// Hadoop FileSplit should not have -1 length.
if (length == -1) {
length = fileSplit.getPath().getFileSystem().getFileStatus(fileSplit.getPath()).getLen() -
fileSplit.getStart();
}
return new FileSplit(new Path(uri), fileSplit.getStart(), length, (String[]) null);
}
public void map(LongWritable key, Text val,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
String fileName = fileSplit.getPath().getName();
location.set(fileName);
String line = val.toString();
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, location);
}
}