下面列出了org.apache.hadoop.hbase.client.Row#org.apache.avro.io.DecoderFactory 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Reads all records from a json file as {@link GenericRecord}s
*/
public static List<GenericRecord> readAllRecords(String jsonDataPath, String schemaPath)
throws Exception {
List<GenericRecord> records = new ArrayList<>();
File jsonDataFile = new File(jsonDataPath);
File schemaFile = new File(schemaPath);
Schema schema = new Schema.Parser().parse(schemaFile);
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
try (InputStream is = new FileInputStream(jsonDataFile)) {
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, is);
while (true) {
records.add(datumReader.read(null, decoder));
}
} catch (EOFException eof) {
// read all records
}
return records;
}
static YarnClusterSubmissionFromCS readYarnClusterSubmissionFromCSFromInputStream(
final InputStream appInputStream, final InputStream jobInputStream) throws IOException {
final JsonDecoder appDecoder = DecoderFactory.get().jsonDecoder(
AvroYarnAppSubmissionParameters.getClassSchema(), appInputStream);
final SpecificDatumReader<AvroYarnAppSubmissionParameters> appReader = new SpecificDatumReader<>(
AvroYarnAppSubmissionParameters.class);
final AvroYarnAppSubmissionParameters yarnClusterAppSubmissionParameters = appReader.read(null, appDecoder);
final JsonDecoder jobDecoder = DecoderFactory.get().jsonDecoder(
AvroYarnClusterJobSubmissionParameters.getClassSchema(), jobInputStream);
final SpecificDatumReader<AvroYarnClusterJobSubmissionParameters> jobReader = new SpecificDatumReader<>(
AvroYarnClusterJobSubmissionParameters.class);
final AvroYarnClusterJobSubmissionParameters yarnClusterJobSubmissionParameters = jobReader.read(null, jobDecoder);
return new YarnClusterSubmissionFromCS(yarnClusterAppSubmissionParameters, yarnClusterJobSubmissionParameters);
}
AvroBlock(byte[] data, long numRecords, Mode<T> mode, String writerSchemaString, String codec)
throws IOException {
this.mode = mode;
this.numRecords = numRecords;
checkNotNull(writerSchemaString, "writerSchemaString");
Schema writerSchema = internOrParseSchemaString(writerSchemaString);
Schema readerSchema =
internOrParseSchemaString(
MoreObjects.firstNonNull(mode.readerSchemaString, writerSchemaString));
this.reader = mode.createReader(writerSchema, readerSchema);
if (codec.equals(DataFileConstants.NULL_CODEC)) {
// Avro can read from a byte[] using a more efficient implementation. If the input is not
// compressed, pass the data in directly.
this.decoder = DecoderFactory.get().binaryDecoder(data, null);
} else {
this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null);
}
}
/**
* Change the schema of an Avro record.
* @param record The Avro record whose schema is to be changed.
* @param newSchema The target schema. It must be compatible as reader schema with record.getSchema() as writer schema.
* @return a new Avro record with the new schema.
* @throws IOException if conversion failed.
*/
public static GenericRecord convertRecordSchema(GenericRecord record, Schema newSchema) throws IOException {
if (record.getSchema().equals(newSchema)) {
return record;
}
try {
BinaryDecoder decoder = new DecoderFactory().binaryDecoder(recordToByteArray(record), null);
DatumReader<GenericRecord> reader = new GenericDatumReader<>(record.getSchema(), newSchema);
return reader.read(null, decoder);
} catch (IOException e) {
throw new IOException(
String.format("Cannot convert avro record to new schema. Original schema = %s, new schema = %s",
record.getSchema(), newSchema),
e);
}
}
public <T> Decoder dataAsDecoder(T data, Schema schema) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Encoder binaryEncoder = AvroCompatibilityHelper.newBinaryEncoder(baos, true, null);
try {
FastSpecificSerializerGenerator<T> fastSpecificSerializerGenerator =
new FastSpecificSerializerGenerator<>(schema, tempDir, classLoader, null);
FastSerializer<T> fastSerializer = fastSpecificSerializerGenerator.generateSerializer();
fastSerializer.serialize(data, binaryEncoder);
binaryEncoder.flush();
} catch (Exception e) {
throw new RuntimeException(e);
}
return DecoderFactory.defaultFactory().createBinaryDecoder(baos.toByteArray(), null);
}
@Override
public org.apache.gobblin.configuration.State parseJobStatus(byte[] message)
throws IOException {
InputStream is = new ByteArrayInputStream(message);
schemaVersionWriter.readSchemaVersioningInformation(new DataInputStream(is));
Decoder decoder = DecoderFactory.get().binaryDecoder(is, this.decoder.get());
try {
GobblinTrackingEvent decodedMessage = this.reader.get().read(null, decoder);
return parseJobStatus(decodedMessage);
} catch (AvroRuntimeException | IOException exc) {
this.messageParseFailures.mark();
if (this.messageParseFailures.getFiveMinuteRate() < 1) {
log.warn("Unable to decode input message.", exc);
} else {
log.warn("Unable to decode input message.");
}
return null;
}
}
/**
* Converts the avro binary data to the json format
*/
@Override
public XContentBuilder serialize(Event event) {
XContentBuilder builder = null;
try {
if (datumReader != null) {
Decoder decoder = new DecoderFactory().binaryDecoder(event.getBody(), null);
GenericRecord data = datumReader.read(null, decoder);
logger.trace("Record in event " + data);
XContentParser parser = XContentFactory
.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
data.toString());
builder = jsonBuilder().copyCurrentStructure(parser);
parser.close();
} else {
logger.error("Schema File is not configured");
}
} catch (IOException e) {
logger.error("Exception in parsing avro format data but continuing serialization to process further records",
e.getMessage(), e);
}
return builder;
}
private <T> Decoder serializeSpecificFast(T data, Schema schema) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder = EncoderFactory.get().directBinaryEncoder(baos, null);
try {
FastSpecificSerializerGenerator<T> fastSpecificSerializerGenerator = new FastSpecificSerializerGenerator<>(
schema, tempDir, classLoader, null);
FastSerializer<T> fastSerializer = fastSpecificSerializerGenerator.generateSerializer();
fastSerializer.serialize(data, binaryEncoder);
binaryEncoder.flush();
} catch (Exception e) {
throw new RuntimeException(e);
}
return DecoderFactory.get().binaryDecoder(baos.toByteArray(), null);
}
public static void testReflect(Object value, Type type, String schema)
throws Exception {
// check that schema matches expected
Schema s = ReflectData.get().getSchema(type);
assertEquals(Schema.parse(schema), s);
// check that value is serialized correctly
ReflectDatumWriter<Object> writer = new ReflectDatumWriter<Object>(s);
ByteArrayOutputStream out = new ByteArrayOutputStream();
writer.write(value, EncoderFactory.get().directBinaryEncoder(out, null));
ReflectDatumReader<Object> reader = new ReflectDatumReader<Object>(s);
Object after =
reader.read(null,
DecoderFactory.get().binaryDecoder(out.toByteArray(), null));
assertEquals(value, after);
}
void checkAvroInitialized() {
if (datumReader != null) {
return;
}
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
SpecificData specificData = new SpecificData(cl);
this.datumReader = new SpecificDatumReader<>(specificData);
this.reader = specificData.getSchema(recordClazz);
} else {
this.reader = new Schema.Parser().parse(schemaString);
GenericData genericData = new GenericData(cl);
this.datumReader = new GenericDatumReader<>(null, this.reader, genericData);
}
this.inputStream = new MutableByteArrayInputStream();
this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
@Override
public Record deserialize(String topic, byte[] data) {
ByteBuffer buffer = ByteBuffer.wrap(data);
if (buffer.get() != MAGIC_BYTE) {
throw new RuntimeException("Unknown magic byte!");
}
int version = buffer.getInt();
Schema schema = schemaLookup.getSchema(version);
int offset = buffer.position();
int length = buffer.remaining();
DatumReader<Record> reader = new GenericDatumReader<>(schema, schema, genericData);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, offset, length, null);
try {
return reader.read(null, decoder);
} catch (IOException e) {
throw new RuntimeException("Unable to decode record.", e);
}
}
@Test
public void testWriteAvroRecordsDropSchema() throws InterruptedException, StageException, IOException {
DataGeneratorFormatConfig dataGeneratorFormatConfig = new DataGeneratorFormatConfig();
dataGeneratorFormatConfig.avroSchema = SdcAvroTestUtil.AVRO_SCHEMA1;
dataGeneratorFormatConfig.avroSchemaSource = INLINE;
dataGeneratorFormatConfig.includeSchema = false;
dataGeneratorFormatConfig.avroCompression = AvroCompression.NULL;
FlumeTarget flumeTarget = FlumeTestUtil.createFlumeTarget(
FlumeTestUtil.createDefaultFlumeConfig(port, false),
DataFormat.AVRO,
dataGeneratorFormatConfig
);
TargetRunner targetRunner = new TargetRunner.Builder(FlumeDTarget.class, flumeTarget).build();
targetRunner.runInit();
List<Record> records = SdcAvroTestUtil.getRecords1();
targetRunner.runWrite(records);
targetRunner.runDestroy();
List<GenericRecord> genericRecords = new ArrayList<>();
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); //Reader schema argument is optional
datumReader.setSchema(new Schema.Parser().parse(SdcAvroTestUtil.AVRO_SCHEMA1));
Transaction transaction = ch.getTransaction();
transaction.begin();
Event event = ch.take();
while(event != null) {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(event.getBody(), null);
GenericRecord read = datumReader.read(null, decoder);
genericRecords.add(read);
event = ch.take();
}
transaction.commit();
transaction.close();
Assert.assertEquals(3, genericRecords.size());
SdcAvroTestUtil.compare1(genericRecords);
}
@Override
public AvroJobSpec deserialize(String topic, byte[] data) {
try (InputStream is = new ByteArrayInputStream(data)) {
_versionWriter.readSchemaVersioningInformation(new DataInputStream(is));
Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder);
return _reader.read(null, decoder);
} catch (IOException e) {
throw new RuntimeException("Could not decode message");
}
}
/**
* An iterator for scanning over rows serialized in Avro format
*
* @param bqSchema Schema of underlying BigQuery source
* @param columnsInOrder Sequence of columns in the schema
* @param schema Schema in avro format
* @param rowsInBytes Rows serialized in binary format for Avro
*/
public AvroBinaryIterator(Schema bqSchema,
List<String> columnsInOrder,
org.apache.avro.Schema schema,
ByteString rowsInBytes) {
reader = new GenericDatumReader<GenericRecord>(schema);
this.bqSchema = bqSchema;
this.columnsInOrder = columnsInOrder;
in = new DecoderFactory().binaryDecoder(rowsInBytes.toByteArray(), null);
}
public static Person desFromAvroBytes(byte[] record) throws IOException {
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
BinaryDecoder decoder = null;
decoder = DecoderFactory.get().binaryDecoder(record, decoder);
GenericRecord avroValue = datumReader.read(null, decoder);
return fromAvroRecord(avroValue);
}
/**
* Creates a Avro deserialization schema for the given specific record class. Having the
* concrete Avro record class might improve performance.
*
* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
*/
public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
this.recordClazz = recordClazz;
schema = SpecificData.get().getSchema(recordClazz);
typeInfo = (RowTypeInfo) AvroSchemaConverter.convertToTypeInfo(recordClazz);
schemaString = schema.toString();
record = (IndexedRecord) SpecificData.newInstance(recordClazz, schema);
datumReader = new SpecificDatumReader<>(schema);
inputStream = new MutableByteArrayInputStream();
decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
/**
* Creates a Avro deserialization schema for the given Avro schema string.
*
* @param avroSchemaString Avro schema string to deserialize Avro's record to Flink's row
*/
public AvroRowDeserializationSchema(String avroSchemaString) {
Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null.");
recordClazz = null;
final TypeInformation<?> typeInfo = AvroSchemaConverter.convertToTypeInfo(avroSchemaString);
Preconditions.checkArgument(typeInfo instanceof RowTypeInfo, "Row type information expected.");
this.typeInfo = (RowTypeInfo) typeInfo;
schemaString = avroSchemaString;
schema = new Schema.Parser().parse(avroSchemaString);
record = new GenericData.Record(schema);
datumReader = new GenericDatumReader<>(schema);
inputStream = new MutableByteArrayInputStream();
decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) {
this.in = in;
this.recordSchema = recordSchema;
datumReader = new NonCachingDatumReader<>(avroSchema);
decoder = DecoderFactory.get().binaryDecoder(in, null);
}
void checkAvroInitialized() {
if (datumReader != null) {
return;
}
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
SpecificData specificData = new SpecificData(cl);
this.datumReader = new SpecificDatumReader<>(specificData);
this.reader = specificData.getSchema(recordClazz);
} else {
this.reader = new Schema.Parser().parse(schemaString);
GenericData genericData = new GenericData(cl);
this.datumReader = new GenericDatumReader<>(null, this.reader, genericData);
}
this.inputStream = new MutableByteArrayInputStream();
this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
/**
* Parses a {@link org.apache.gobblin.metrics.GobblinTrackingEvent} from a byte array Avro serialization.
* @param reuse GobblinTrackingEvent to reuse.
* @param bytes Input bytes.
* @param schemaId Expected schemaId.
* @return GobblinTrackingEvent.
* @throws java.io.IOException
*/
public synchronized static GobblinTrackingEvent deserializeEventFromAvroSerialization(GobblinTrackingEvent reuse, byte[] bytes, @Nullable String schemaId)
throws IOException {
if (!reader.isPresent()) {
reader = Optional.of(new SpecificDatumReader<>(GobblinTrackingEvent.class));
}
Closer closer = Closer.create();
try {
DataInputStream inputStream = closer.register(new DataInputStream(new ByteArrayInputStream(bytes)));
if (schemaId != null) {
MetricReportUtils.readAndVerifySchemaId(inputStream, schemaId);
} else {
MetricReportUtils.readAndVerifySchemaVersion(inputStream);
}
// Decode the rest
Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
return reader.get().read(reuse, decoder);
} catch(Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
/**
* Creates a Avro deserialization schema for the given Avro schema string.
*
* @param avroSchemaString Avro schema string to deserialize Avro's record to Flink's row
*/
public AvroRowDeserializationSchema(String avroSchemaString) {
Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null.");
recordClazz = null;
final TypeInformation<?> typeInfo = AvroSchemaConverter.convertToTypeInfo(avroSchemaString);
Preconditions.checkArgument(typeInfo instanceof RowTypeInfo, "Row type information expected.");
this.typeInfo = (RowTypeInfo) typeInfo;
schemaString = avroSchemaString;
schema = new Schema.Parser().parse(avroSchemaString);
record = new GenericData.Record(schema);
datumReader = new GenericDatumReader<>(schema);
inputStream = new MutableByteArrayInputStream();
decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
/**
* Reads avro object from input stream.
*
* @param inputStream The input stream to read from
* @return Avro object
* @throws IOException
*/
AvroYarnJobSubmissionParameters fromInputStream(final InputStream inputStream) throws IOException {
final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
AvroYarnJobSubmissionParameters.getClassSchema(), inputStream);
final SpecificDatumReader<AvroYarnJobSubmissionParameters> reader = new SpecificDatumReader<>(
AvroYarnJobSubmissionParameters.class);
return reader.read(null, decoder);
}
/**
* Parse Avro record with schema
*
* @param bytes avro data
* @param schema avro schema
* @return JsonNode array
*/
private JsonNode parseAvroWithSchema(final byte[] bytes, Schema schema)
{
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
InputStream input = new ByteArrayInputStream(bytes);
Decoder decoder = DecoderFactory.get().binaryDecoder(input, null);
try
{
GenericRecord datum = reader.read(null, decoder);
return mapper.readTree(datum.toString());
} catch (IOException e)
{
throw SnowflakeErrors.ERROR_0010.getException("Failed to parse AVRO " +
"record\n" + e.toString());
}
}
private ResolvingDecoder newResolver() {
try {
return DecoderFactory.get().resolvingDecoder(fileSchema, readSchema, null);
} catch (IOException e) {
throw new RuntimeIOException(e);
}
}
/**
* To remove certain fields from the Avro schema or records of a topic/table, set property
* {topic/table name}.remove.fields={comma-separated, fully qualified field names} in workUnit.
*/
@Override
public EnvelopeSchemaConverter init(WorkUnitState workUnit) {
if (workUnit.contains(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY)) {
String removeFieldsPropName = workUnit.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY) + AvroProjectionConverter.REMOVE_FIELDS;
if (workUnit.contains(removeFieldsPropName)) {
this.fieldRemover = Optional.of(new AvroSchemaFieldRemover(workUnit.getProp(removeFieldsPropName)));
} else {
this.fieldRemover = Optional.absent();
}
}
String registryFactoryField = workUnit.contains(KafkaSchemaRegistryFactory.KAFKA_SCHEMA_REGISTRY_FACTORY_CLASS) ?
workUnit.getProp(KafkaSchemaRegistryFactory.KAFKA_SCHEMA_REGISTRY_FACTORY_CLASS) : DEFAULT_KAFKA_SCHEMA_REGISTRY_FACTORY_CLASS;
try {
KafkaSchemaRegistryFactory registryFactory = ((Class<? extends KafkaSchemaRegistryFactory>) Class.forName(registryFactoryField)).newInstance();
this.registry = registryFactory.create(workUnit.getProperties());
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
return null;
}
this.decoderFactory = DecoderFactory.get();
this.readers = CacheBuilder.newBuilder().build(new CacheLoader<Schema, GenericDatumReader<GenericRecord>>() {
@Override
public GenericDatumReader<GenericRecord> load(final Schema key) throws Exception {
return new GenericDatumReader<>(key);
}
});
return this;
}
public AvroHttpRequest deSerealizeAvroHttpRequestJSON(byte[] data) {
DatumReader<AvroHttpRequest> reader = new SpecificDatumReader<>(AvroHttpRequest.class);
Decoder decoder = null;
try {
decoder = DecoderFactory.get()
.jsonDecoder(AvroHttpRequest.getClassSchema(), new String(data));
return reader.read(null, decoder);
} catch (IOException e) {
logger.error("Deserialization error" + e.getMessage());
}
return null;
}
private ResolvingDecoder newResolver() {
try {
return DecoderFactory.get().resolvingDecoder(fileSchema, readSchema, null);
} catch (IOException e) {
throw new RuntimeIOException(e);
}
}
/**
* Reads provided {@link InputStream} into Avro {@link GenericRecord}
* applying provided {@link Schema} returning the resulting GenericRecord.
*/
public static GenericRecord read(InputStream in, Schema schema) {
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
GenericRecord avroRecord = null;
try {
avroRecord = datumReader.read(null, DecoderFactory.get().binaryDecoder(in, null));
return avroRecord;
} catch (Exception e) {
throw new IllegalStateException("Failed to read AVRO record", e);
}
}
@Override
public Iterable<Row> translate(Row message) throws Exception {
byte[] value = message.getAs(Translator.VALUE_FIELD_NAME);
Decoder decoder = DecoderFactory.get().binaryDecoder(value, null);
GenericRecord record = reader.read(null, decoder);
Row row = rowForRecord(record);
return Collections.singleton(row);
}
static AvroYarnJobSubmissionParameters readYarnJobSubmissionParametersFromInputStream(
final InputStream inputStream) throws IOException {
final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
AvroYarnJobSubmissionParameters.getClassSchema(), inputStream);
final SpecificDatumReader<AvroYarnJobSubmissionParameters> reader = new SpecificDatumReader<>(
AvroYarnJobSubmissionParameters.class);
return reader.read(null, decoder);
}