下面列出了怎么用org.apache.avro.specific.SpecificDatumReader的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Generates if needed and returns specific-class aware avro {@link FastDeserializer}.
*
* @param writerSchema
* {@link Schema} of written data
* @param readerSchema
* {@link Schema} intended to be used during deserialization
* @return specific-class aware avro {@link FastDeserializer}
*/
public FastDeserializer<?> getFastSpecificDeserializer(Schema writerSchema,
Schema readerSchema) {
String schemaKey = getSchemaKey(writerSchema, readerSchema);
FastDeserializer<?> deserializer = fastSpecificRecordDeserializersCache.get(schemaKey);
if (deserializer == null) {
SpecificDatumReader<?> fallbackReader = new SpecificDatumReader<>(writerSchema, readerSchema);
deserializer = fastSpecificRecordDeserializersCache.putIfAbsent(schemaKey,
d -> fallbackReader.read(null, d));
if (deserializer == null) {
deserializer = fastSpecificRecordDeserializersCache.get(schemaKey);
CompletableFuture
.supplyAsync(() -> buildSpecificDeserializer(writerSchema, readerSchema), executor).thenAccept(
d -> {
fastSpecificRecordDeserializersCache.put(schemaKey, d);
});
}
}
return deserializer;
}
void checkAvroInitialized() {
if (datumReader != null) {
return;
}
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
SpecificData specificData = new SpecificData(cl);
this.datumReader = new SpecificDatumReader<>(specificData);
this.reader = specificData.getSchema(recordClazz);
} else {
this.reader = new Schema.Parser().parse(schemaString);
GenericData genericData = new GenericData(cl);
this.datumReader = new GenericDatumReader<>(null, this.reader, genericData);
}
this.inputStream = new MutableByteArrayInputStream();
this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
DatumReader<E> datumReader;
if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
datumReader = new GenericDatumReader<E>();
} else {
datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
}
if (LOG.isInfoEnabled()) {
LOG.info("Opening split {}", split);
}
SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
}
end = split.getStart() + split.getLength();
recordsReadSinceLastSync = 0;
return dataFileReader;
}
@Test
public void testAvroParsing() throws IOException, SAXException {
File input = new File("src/test/resources/en/mercedes.xml");
final File output = File.createTempFile("jsonwikipedia-mercedes", ".avro");
output.deleteOnExit();
WikipediaArticleReader wap = new WikipediaArticleReader(input, output, "en");
wap.start();
// reading the encoded avro and checking that it is correct
DatumReader<Article> userDatumReader = new SpecificDatumReader<>(Article.getClassSchema());
DataFileReader<Article> dataFileReader = new DataFileReader<>(output, userDatumReader);
assertTrue(dataFileReader.hasNext());
Article article = new Article();
dataFileReader.next(article);
assertEquals("Mercedes-Benz", article.getTitle());
assertEquals("Mercedes-Benz", article.getWikiTitle());
}
private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
DatumReader<E> datumReader;
if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
datumReader = new GenericDatumReader<E>();
} else {
datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
}
if (LOG.isInfoEnabled()) {
LOG.info("Opening split {}", split);
}
SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
}
end = split.getStart() + split.getLength();
recordsReadSinceLastSync = 0;
return dataFileReader;
}
public synchronized void recieveMessages(Message message) {
final DatumReader<B5MEvent> reader = new SpecificDatumReader<B5MEvent>(
B5MEvent.SCHEMA$);
final B5MEvent b5mEvent = new B5MEvent();
byte[] data = message.getData();
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
try {
reader.read(b5mEvent, decoder);
for (LaserMessageConsumer consumer : this.consumer) {
consumer.write(b5mEvent);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads)
throws IOException, ReflectiveOperationException {
super(topic, config, numThreads);
if (ConfigUtils.getBoolean(config, ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new KafkaAvroSchemaRegistryFactory().
create(ConfigUtils.configToProperties(config));
this.schemaVersionWriter = new SchemaRegistryVersionWriter(schemaRegistry, topic, GobblinTrackingEvent.SCHEMA$);
} else {
this.schemaVersionWriter = new FixedSchemaVersionWriter();
}
this.decoder = ThreadLocal.withInitial(() -> {
InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
return DecoderFactory.get().binaryDecoder(dummyInputStream, null);
});
this.reader = ThreadLocal.withInitial(() -> new SpecificDatumReader<>(GobblinTrackingEvent.SCHEMA$));
}
@SuppressWarnings("unchecked")
private <T> T decodeRecordFast(Schema writerSchema, Decoder decoder) {
SpecificDatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema);
try {
return datumReader.read(null, decoder);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void renderJSON(PrintWriter writer, Map<String, String> params) {
JsonArray json = new JsonArray();
for (ConsumerRecord<byte[], byte[]> record : Lists.reverse(retrieveActionReportMessages())) {
try {
JsonObject jsonRecord = new JsonObject();
BinaryDecoder binaryDecoder = avroDecoderFactory.binaryDecoder(record.value(), null);
SpecificDatumReader<OperatorAction> reader =
new SpecificDatumReader<>(operatorActionSchema);
OperatorAction result = new OperatorAction();
reader.read(result, binaryDecoder);
jsonRecord.add("date",gson.toJsonTree(new Date(result.getTimestamp())));
jsonRecord.add("clusterName",gson.toJsonTree(result.getClusterName()));
jsonRecord.add("description",gson.toJsonTree(result.getDescription()));
json.add(jsonRecord);
} catch (Exception e) {
LOG.info("Fail to decode an message", e);
}
}
writer.print(json);
}
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
this.wrapper = InstantiationUtil.instantiate(avroWrapperTypeClass, AvroBaseValue.class);
DatumReader<E> datumReader;
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
datumReader = new SpecificDatumReader<E>(avroValueType);
} else {
datumReader = new ReflectDatumReader<E>(avroValueType);
}
LOG.info("Opening split " + split);
SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
dataFileReader = DataFileReader.openReader(in, datumReader);
dataFileReader.sync(split.getStart());
reuseAvroValue = null;
}
public KafkaAvroJobMonitor(String topic, MutableJobCatalog catalog, Config config, Schema schema,
SchemaVersionWriter<?> versionWriter) {
super(topic, catalog, config);
this.schema = schema;
this.decoder = new ThreadLocal<BinaryDecoder>() {
@Override
protected BinaryDecoder initialValue() {
InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
return DecoderFactory.get().binaryDecoder(dummyInputStream, null);
}
};
this.reader = new ThreadLocal<SpecificDatumReader<T>>() {
@Override
protected SpecificDatumReader<T> initialValue() {
return new SpecificDatumReader<>(KafkaAvroJobMonitor.this.schema);
}
};
this.versionWriter = versionWriter;
}
@Before
public void setUp() throws Exception {
siddhiManager= new StreamingSiddhiConfiguration().siddhiManager();
siddhiManager.defineStream(StreamsHelper.STREAM_DEFINITION);
metadataService= new StreamMetadataService(siddhiManager);
javaToSiddhiSerializer= new JavaToSiddhiSerializer(metadataService);
javaToAvroSerializer = new JavaToAvroSerializer(new SpecificDatumReader(InsertMessage.getClassSchema()));
Set<StreamAction> activeActions= new ListOrderedSet();
activeActions.add(StreamAction.LISTEN);
producer= Mockito.mock(Producer.class);
avroProducer= Mockito.mock(Producer.class);
//List<KeyedMessage<String, String>> km= any();
//doNothing().when(producer).send(km);
doNothing().when(producer).send(Matchers.<List<KeyedMessage<String, String>>>any());
cbk= new StreamToActionBusCallback(activeActions, streamName, avroProducer,
javaToSiddhiSerializer, javaToAvroSerializer);
}
private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
DatumReader<E> datumReader;
if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
datumReader = new GenericDatumReader<E>();
} else {
datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
}
if (LOG.isInfoEnabled()) {
LOG.info("Opening split {}", split);
}
SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
}
end = split.getStart() + split.getLength();
recordsReadSinceLastSync = 0;
return dataFileReader;
}
public static void dumpStream(InputStream is) throws IOException {
DataFileStream<Stock> reader =
new DataFileStream<Stock>(
is,
new SpecificDatumReader<Stock>(Stock.class));
for (Stock a : reader) {
System.out.println(ToStringBuilder.reflectionToString(a,
ToStringStyle.SIMPLE_STYLE
));
}
IOUtils.closeStream(is);
IOUtils.closeStream(reader);
}
private DatumReader<Object> buildDatumReader(Schema schema,
Schema writtenSchema) {
if (specific) {
return new SpecificDatumReader<Object>(writtenSchema, schema);
} else {
return new GenericDatumReader<Object>(writtenSchema, schema);
}
}
/**
* Deserialize EvaluatorRequest.
*/
public static EvaluatorRequest fromString(final String serializedRequest) {
try {
final Decoder decoder =
DecoderFactory.get().jsonDecoder(AvroEvaluatorRequest.getClassSchema(), serializedRequest);
final SpecificDatumReader<AvroEvaluatorRequest> reader = new SpecificDatumReader<>(AvroEvaluatorRequest.class);
return fromAvro(reader.read(null, decoder));
} catch (final IOException ex) {
throw new RuntimeException("Unable to deserialize compute request", ex);
}
}
private byte[] generateFailedTaskSerializedAvro() throws IOException {
AvroFailedTask avroFailedTask = null;
if (jfailedTask.getData() != null && jfailedTask.getData().isPresent()) {
// Deserialize what was passed in from C#.
try (ByteArrayInputStream fileInputStream = new ByteArrayInputStream(jfailedTask.getData().get())) {
final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
AvroFailedTask.getClassSchema(), fileInputStream);
final SpecificDatumReader<AvroFailedTask> reader =
new SpecificDatumReader<>(AvroFailedTask.class);
avroFailedTask = reader.read(null, decoder);
}
} else {
// This may result from a failed Evaluator.
avroFailedTask = AvroFailedTask.newBuilder()
.setIdentifier(jfailedTask.getId())
.setCause(ByteBuffer.wrap(new byte[0]))
.setData(ByteBuffer.wrap(new byte[0]))
.setMessage("")
.build();
}
// Overwrite the message if Java provides a message and C# does not.
// Typically the case for failed Evaluators.
if (StringUtils.isNoneBlank(jfailedTask.getMessage()) &&
StringUtils.isBlank(avroFailedTask.getMessage().toString())) {
avroFailedTask.setMessage(jfailedTask.getMessage());
}
final DatumWriter<AvroFailedTask> datumWriter = new SpecificDatumWriter<>(AvroFailedTask.class);
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(avroFailedTask.getSchema(), outputStream);
datumWriter.write(avroFailedTask, encoder);
encoder.flush();
outputStream.flush();
return outputStream.toByteArray();
}
}
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
recordClazz = (Class<? extends SpecificRecord>) inputStream.readObject();
schemaString = inputStream.readUTF();
typeInfo = (RowTypeInfo) AvroSchemaConverter.<Row>convertToTypeInfo(schemaString);
schema = new Schema.Parser().parse(schemaString);
if (recordClazz != null) {
record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
} else {
record = new GenericData.Record(schema);
}
datumReader = new SpecificDatumReader<>(schema);
this.inputStream = new MutableByteArrayInputStream();
decoder = DecoderFactory.get().binaryDecoder(this.inputStream, null);
}
/**
* This test validates proper serialization with specific (generated POJO) types.
*/
@Test
public void testDeserializeToSpecificType() throws IOException {
DatumReader<User> datumReader = new SpecificDatumReader<>(userSchema);
try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
User rec = dataFileReader.next();
// check if record has been read correctly
assertNotNull(rec);
assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());
// now serialize it with our framework:
ExecutionConfig ec = new ExecutionConfig();
TypeInformation<User> te = TypeExtractor.createTypeInfo(User.class);
assertEquals(AvroTypeInfo.class, te.getClass());
TypeSerializer<User> tser = te.createSerializer(ec);
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
tser.serialize(rec, outView);
}
User newRec;
try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
new ByteArrayInputStream(out.toByteArray()))) {
newRec = tser.deserialize(inView);
}
// check if it is still the same
assertNotNull(newRec);
assertEquals("name not equal", TEST_NAME, newRec.getName().toString());
assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString());
}
}
public static void readFromAvro(InputStream is) throws IOException {
DataFileStream<StockAvg> reader = //<co id="ch03_smallfileread_comment1"/>
new DataFileStream<StockAvg>(
is,
new SpecificDatumReader<StockAvg>(StockAvg.class));
for (StockAvg a : reader) { //<co id="ch03_smallfileread_comment2"/>
System.out.println(ToStringBuilder.reflectionToString(a,
ToStringStyle.SHORT_PREFIX_STYLE
));
}
IOUtils.closeStream(is);
IOUtils.closeStream(reader);
}
/**
* Reads avro object from input stream.
*
* @param inputStream The input stream to read from
* @return Avro object
* @throws IOException
*/
AvroMultiRuntimeAppSubmissionParameters fromInputStream(final InputStream inputStream) throws IOException {
final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
AvroMultiRuntimeAppSubmissionParameters.getClassSchema(), inputStream);
final SpecificDatumReader<AvroMultiRuntimeAppSubmissionParameters> reader = new SpecificDatumReader<>(
AvroMultiRuntimeAppSubmissionParameters.class);
return reader.read(null, decoder);
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private static <T> AvroFactory<T> fromSpecific(Class<T> type, ClassLoader cl, Optional<Schema> previousSchema) {
SpecificData specificData = new SpecificData(cl);
Schema newSchema = specificData.getSchema(type);
return new AvroFactory<>(
specificData,
newSchema,
new SpecificDatumReader<>(previousSchema.orElse(newSchema), newSchema, specificData),
new SpecificDatumWriter<>(newSchema, specificData)
);
}
/**
* Creates a Avro deserialization schema for the given specific record class. Having the
* concrete Avro record class might improve performance.
*
* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
*/
public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
this.recordClazz = recordClazz;
schema = SpecificData.get().getSchema(recordClazz);
typeInfo = (RowTypeInfo) AvroSchemaConverter.convertToTypeInfo(recordClazz);
schemaString = schema.toString();
record = (IndexedRecord) SpecificData.newInstance(recordClazz, schema);
datumReader = new SpecificDatumReader<>(schema);
inputStream = new MutableByteArrayInputStream();
decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
recordClazz = (Class<? extends SpecificRecord>) inputStream.readObject();
schemaString = inputStream.readUTF();
typeInfo = (RowTypeInfo) AvroSchemaConverter.<Row>convertToTypeInfo(schemaString);
schema = new Schema.Parser().parse(schemaString);
if (recordClazz != null) {
record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
} else {
record = new GenericData.Record(schema);
}
datumReader = new SpecificDatumReader<>(schema);
this.inputStream = new MutableByteArrayInputStream();
decoder = DecoderFactory.get().binaryDecoder(this.inputStream, null);
}
private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException {
DatumReader<LogLine> reader =
new SpecificDatumReader<LogLine>(LogLine.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(in, null);
setValues(reader.read(null, decoder));
}
private void roundtrip(Object thingie) throws Exception {
Schema schema = SpecificData.get().getSchema(thingie.getClass());
ByteArrayOutputStream os = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder = AvroCompatibilityHelper.newBinaryEncoder(os, false, null);
SpecificDatumWriter<Object> writer = new SpecificDatumWriter<>(schema);
writer.write(thingie, binaryEncoder);
binaryEncoder.flush();
byte[] serialized = os.toByteArray();
ByteArrayInputStream is = new ByteArrayInputStream(serialized);
BinaryDecoder binaryDecoder = AvroCompatibilityHelper.newBinaryDecoder(is, false, null);
SpecificDatumReader<Object> reader = new SpecificDatumReader<>(schema);
Object deserialize = reader.read(null, binaryDecoder);
Assert.assertEquals(deserialize, thingie);
}
private void roundtrip(Object thingie) throws Exception {
Schema schema = SpecificData.get().getSchema(thingie.getClass());
ByteArrayOutputStream os = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder = AvroCompatibilityHelper.newBinaryEncoder(os, false, null);
SpecificDatumWriter<Object> writer = new SpecificDatumWriter<>(schema);
writer.write(thingie, binaryEncoder);
binaryEncoder.flush();
byte[] serialized = os.toByteArray();
ByteArrayInputStream is = new ByteArrayInputStream(serialized);
BinaryDecoder binaryDecoder = AvroCompatibilityHelper.newBinaryDecoder(is, false, null);
SpecificDatumReader<Object> reader = new SpecificDatumReader<>(schema);
Object deserialize = reader.read(null, binaryDecoder);
Assert.assertEquals(deserialize, thingie);
}
/**
* Parses a {@link org.apache.gobblin.metrics.GobblinTrackingEvent} from a byte array Avro serialization.
* @param reuse GobblinTrackingEvent to reuse.
* @param bytes Input bytes.
* @param schemaId Expected schemaId.
* @return GobblinTrackingEvent.
* @throws java.io.IOException
*/
public synchronized static GobblinTrackingEvent deserializeEventFromAvroSerialization(GobblinTrackingEvent reuse, byte[] bytes, @Nullable String schemaId)
throws IOException {
if (!reader.isPresent()) {
reader = Optional.of(new SpecificDatumReader<>(GobblinTrackingEvent.class));
}
Closer closer = Closer.create();
try {
DataInputStream inputStream = closer.register(new DataInputStream(new ByteArrayInputStream(bytes)));
if (schemaId != null) {
MetricReportUtils.readAndVerifySchemaId(inputStream, schemaId);
} else {
MetricReportUtils.readAndVerifySchemaVersion(inputStream);
}
// Decode the rest
Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
return reader.get().read(reuse, decoder);
} catch(Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
@SuppressWarnings("unchecked")
private <T> T decodeRecordSlow(Schema readerSchema, Schema writerSchema, Decoder decoder) {
org.apache.avro.io.DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
try {
return datumReader.read(null, decoder);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
@Test(invocationCount = 5, groups = {"perfTest"})
public void testAvroDeserialization() throws Exception {
byte[] serializedBytes = serializedTestObjects(1000);
long startInMs = System.currentTimeMillis();
AvroGenericDeserializer<BenchmarkTestObject> deserializer =
new AvroGenericDeserializer<>(new SpecificDatumReader<>(BenchmarkTestObject.class));
for (int i = 0; i <= 10000; ++i) {
deserializer.deserializeObjects(serializedBytes);
}
System.out.println("Regular avro deserialization latency: " + (System.currentTimeMillis() - startInMs) + " ms");
}