下面列出了怎么用org.apache.hadoop.mapreduce.lib.input.LineRecordReader的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Actually instantiate the user's chosen RecordReader implementation.
*/
@SuppressWarnings("unchecked")
private void createChildReader() throws IOException, InterruptedException {
LOG.debug("ChildSplit operates on: " + split.getPath(index));
Configuration conf = context.getConfiguration();
// Determine the file format we're reading.
Class rrClass;
if (ExportJobBase.isSequenceFiles(conf, split.getPath(index))) {
rrClass = SequenceFileRecordReader.class;
} else {
rrClass = LineRecordReader.class;
}
// Create the appropriate record reader.
this.rr = (RecordReader<LongWritable, Object>)
ReflectionUtils.newInstance(rrClass, conf);
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException
{
key = new Text();
value = new MapWritable();
jsonParser = new JSONParser();
lineReader = new LineRecordReader();
lineReader.initialize(inputSplit, context);
queryString = context.getConfiguration().get("query", "?q=*");
// Load the data schemas
FileSystem fs = FileSystem.get(context.getConfiguration());
try
{
SystemConfiguration.setProperty("data.schemas", context.getConfiguration().get("data.schemas"));
DataSchemaLoader.initialize(true, fs);
} catch (Exception e)
{
e.printStackTrace();
}
String dataSchemaName = context.getConfiguration().get("dataSchemaName");
dataSchema = DataSchemaRegistry.get(dataSchemaName);
}
protected Job jobInstance() throws IOException {
Job job = Job.getInstance();
// deserialize map to conf
Configuration conf = job.getConfiguration();
for (Map.Entry<String, String> entry : map.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
// TODO: We've explicitly listed all the schemas supported here, but the filesystem schema could be dynamically
// generated from the path (resolved against the default name node).
conf.set("fs.gs.impl.disable.cache", "true");
conf.set("fs.s3t.impl.disable.cache", "true");
conf.set("fs.file.impl.disable.cache", "true");
conf.set("fs.hdfs.impl.disable.cache", "true");
conf.set(LineRecordReader.MAX_LINE_LENGTH,
Integer.toString(conf.getInt("maxRowSize", 10*1024*1024))); // 10 Mo Max per line.
return job;
}
/**
* Called once at initialization to initialize the RecordReader.
*
* @param genericSplit the split that defines the range of records to read.
* @param context the information about the task.
* @throws IOException on IO Error.
*/
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
if (logger.atFine().isEnabled()) {
logger.atFine().log(
"initialize('%s', '%s')",
HadoopToStringUtil.toString(genericSplit), HadoopToStringUtil.toString(context));
}
Preconditions.checkArgument(genericSplit instanceof FileSplit,
"InputSplit genericSplit should be an instance of FileSplit.");
// Get FileSplit.
FileSplit fileSplit = (FileSplit) genericSplit;
// Create the JsonParser.
jsonParser = new JsonParser();
// Initialize the LineRecordReader.
lineReader = new LineRecordReader();
lineReader.initialize(fileSplit, context);
}
public static <KEYIN, VALUEIN> RecordReader<KEYIN, VALUEIN> buildReader(RecordReader<KEYIN, VALUEIN> reader) {
if (reader instanceof LimitedLineRecordReader) {
// already encapsulate
return reader;
}
if (!LineRecordReader.class.isInstance(reader) &&
!CSVFileRecordReader.class.isInstance(reader)) {
// only line file reader.
return reader;
}
return (RecordReader<KEYIN, VALUEIN>) new LimitedLineRecordReader(reader);
}
@Override
@SuppressWarnings("squid:S2095") // recordReader is closed explictly in the close() method
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException
{
if (split instanceof FileSplit)
{
FileSplit fsplit = (FileSplit) split;
delimitedParser = getDelimitedParser(fsplit.getPath().toString(),
context.getConfiguration());
recordReader = new LineRecordReader();
recordReader.initialize(fsplit, context);
// Skip the first
if (delimitedParser.getSkipFirstLine())
{
// Only skip the first line of the first split. The other
// splits are somewhere in the middle of the original file,
// so their first lines should not be skipped.
if (fsplit.getStart() != 0)
{
nextKeyValue();
}
}
}
else
{
throw new IOException("input split is not a FileSplit");
}
}
@Override
public Optional<ReaderRecordCursorWithProjections> createRecordCursor(
Configuration configuration,
ConnectorSession session,
Path path,
long start,
long length,
long fileSize,
Properties schema,
List<HiveColumnHandle> columns,
TupleDomain<HiveColumnHandle> effectivePredicate,
DateTimeZone hiveStorageTimeZone,
TypeManager typeManager,
boolean s3SelectPushdownEnabled)
{
configuration.setInt(LineRecordReader.MAX_LINE_LENGTH, textMaxLineLengthBytes);
// make sure the FileSystem is created with the proper Configuration object
try {
this.hdfsEnvironment.getFileSystem(session.getUser(), path, configuration);
}
catch (IOException e) {
throw new PrestoException(HIVE_FILESYSTEM_ERROR, "Failed getting FileSystem: " + path, e);
}
Optional<ReaderProjections> projectedReaderColumns = projectBaseColumns(columns);
RecordCursor cursor = hdfsEnvironment.doAs(session.getUser(), () -> {
RecordReader<?, ?> recordReader = HiveUtil.createRecordReader(
configuration,
path,
start,
length,
schema,
projectedReaderColumns
.map(ReaderProjections::getReaderColumns)
.orElse(columns));
return new GenericHiveRecordCursor<>(
configuration,
path,
genericRecordReader(recordReader),
length,
schema,
projectedReaderColumns
.map(ReaderProjections::getReaderColumns)
.orElse(columns),
hiveStorageTimeZone);
});
return Optional.of(new ReaderRecordCursorWithProjections(cursor, projectedReaderColumns));
}
public ScriptRecordReader() {
this.lineRecordReader = new LineRecordReader();
}
public GraphSONRecordReader() {
this.lineRecordReader = new LineRecordReader();
}
@Override
public RecordReader<LongWritable, Text> createDelegateRecordReader(
InputSplit split, Configuration configuration) throws IOException, InterruptedException {
logger.atFine().log("createDelegateRecordReader -> new LineRecordReader");
return new LineRecordReader();
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
in = (LineRecordReader) reader;
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
RecordReader<LongWritable,Text> rr = super.createRecordReader(split, context);
return new PhoenixLineRecordReader((LineRecordReader) rr);
}
private PhoenixLineRecordReader(LineRecordReader rr) {
this.rr = rr;
}
public VectorLineProducer(LineRecordReader recordReader)
{
lineRecordReader = recordReader;
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(
final InputSplit split,
final TaskAttemptContext context) throws IOException, InterruptedException {
return new LineRecordReader();
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
final String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");
return new LineRecordReader(delimiter != null ? delimiter.getBytes() : null);
}