下面列出了怎么用org.apache.hadoop.fs.AvroFSInput的API类实例代码及写法,或者点击链接到github查看源代码。
public AvroFileInputStream(FileStatus status) throws IOException {
pos = 0;
buffer = new byte[0];
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
FileContext fc = FileContext.getFileContext(new Configuration());
fileReader =
DataFileReader.openReader(new AvroFSInput(fc, status.getPath()),reader);
Schema schema = fileReader.getSchema();
writer = new GenericDatumWriter<Object>(schema);
output = new ByteArrayOutputStream();
JsonGenerator generator =
new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8);
MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter();
prettyPrinter.setRootValueSeparator(System.getProperty("line.separator"));
generator.setPrettyPrinter(prettyPrinter);
encoder = EncoderFactory.get().jsonEncoder(schema, generator);
}
public AvroFileInputStream(FileStatus status) throws IOException {
pos = 0;
buffer = new byte[0];
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
FileContext fc = FileContext.getFileContext(new Configuration());
fileReader =
DataFileReader.openReader(new AvroFSInput(fc, status.getPath()),reader);
Schema schema = fileReader.getSchema();
writer = new GenericDatumWriter<Object>(schema);
output = new ByteArrayOutputStream();
JsonGenerator generator =
new JsonFactory().createJsonGenerator(output, JsonEncoding.UTF8);
MinimalPrettyPrinter prettyPrinter = new MinimalPrettyPrinter();
prettyPrinter.setRootValueSeparator(System.getProperty("line.separator"));
generator.setPrettyPrinter(prettyPrinter);
encoder = EncoderFactory.get().jsonEncoder(schema, generator);
}
public AvroFileReader(FileSystem fs, Path filePath, Map<String, Object> config) throws IOException {
super(fs, filePath, new GenericRecordToStruct(), config);
AvroFSInput input = new AvroFSInput(FileContext.getFileContext(filePath.toUri()), filePath);
if (this.schema == null) {
this.reader = new DataFileReader<>(input, new SpecificDatumReader<>());
} else {
this.reader = new DataFileReader<>(input, new SpecificDatumReader<>(this.schema));
}
this.closed = false;
}
@Override
public void open(String pathStr, String singleFileOffset) {
LOG.info(String.format("%s: Open file [%s] with file offset [%s] for read", systemStreamPartition, pathStr, singleFileOffset));
Path path = new Path(pathStr);
try {
AvroFSInput input = new AvroFSInput(FileContext.getFileContext(path.toUri()), path);
fileReader = new DataFileReader<>(input, new GenericDatumReader<>());
seek(singleFileOffset);
} catch (IOException e) {
throw new SamzaException(e);
}
}
public static void main(String[] args) throws IOException {
if (args.length == 0) {
System.out.println("AvroReader {dataFile} {schemaFile} {max.lines.to.read.optional}");
}
String dataFile = args[0];
String schemaFile = args[1];
int recordsToRead = Integer.MAX_VALUE;
if (args.length > 2) {
recordsToRead = Integer.parseInt(args[2]);
}
Schema.Parser parser = new Schema.Parser();
Configuration config = new Configuration();
FileSystem fs = FileSystem.get(config);
Schema schema = parser.parse(fs.open(new Path(schemaFile)));
Path dataFilePath = new Path(dataFile);
FileStatus fileStatus = fs.getFileStatus(dataFilePath);
AvroFSInput input = new AvroFSInput(fs.open(dataFilePath), fileStatus.getLen());
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(input, datumReader);
System.out.println("Schema: " + dataFileReader.getSchema());
System.out.println();
int counter = 0;
while (dataFileReader.hasNext() && counter++ < recordsToRead) {
GenericRecord r = dataFileReader.next();
System.out.println(counter + " : " + r);
}
}