下面列出了org.apache.hadoop.hbase.client.HTablePool#org.apache.avro.specific.SpecificRecord 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public AvroKeyExtractor(
String keyFieldName,
String[] fieldNames,
TypeInformation<?>[] fieldTypes,
Class<? extends SpecificRecord> recordClazz) {
checkArgument(fieldNames.length == fieldTypes.length,
"Number of provided field names and types does not match.");
Schema schema = SpecificData.get().getSchema(recordClazz);
Schema.Field keyField = schema.getField(keyFieldName);
Schema.Type keyType = keyField.schema().getType();
int keyIndex = Arrays.asList(fieldNames).indexOf(keyFieldName);
checkArgument(keyIndex >= 0,
"Key field '" + keyFieldName + "' not found");
checkArgument(Schema.Type.STRING.equals(keyType),
"Key field must be of type 'STRING'");
this.keyIndex = keyIndex;
}
public static <T extends SpecificRecord> PCollection<ThriftRecord> byFieldNames(
final PCollection<T> collection,
final String rowKeyFieldName,
final String ttlFieldName,
final String timestampFieldName
) {
final Class<T> recordType = collection.getPType().getTypeClass();
T record;
try {
record = recordType.getConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException("Could not create an instance of the record to determine it's schema", e);
}
ThriftByFieldNamesFn<T> doFn = new ThriftByFieldNamesFn<T>(record.getSchema(), rowKeyFieldName, ttlFieldName, timestampFieldName);
return collection.parallelDo(doFn, ThriftRecord.PTYPE);
}
/**
* Create PulsarAvroTableSink.
*
* @param serviceUrl pulsar service url
* @param topic topic in pulsar to which table is written
* @param routingKeyFieldName routing key field name
*/
public PulsarAvroTableSink(
String serviceUrl,
String topic,
Authentication authentication,
String routingKeyFieldName,
Class<? extends SpecificRecord> recordClazz) {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url not set");
checkArgument(StringUtils.isNotBlank(topic), "Topic is null");
checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead");
clientConfigurationData = new ClientConfigurationData();
producerConfigurationData = new ProducerConfigurationData();
clientConfigurationData.setServiceUrl(serviceUrl);
clientConfigurationData.setAuthentication(authentication);
producerConfigurationData.setTopicName(topic);
this.routingKeyFieldName = routingKeyFieldName;
this.recordClazz = recordClazz;
}
void checkAvroInitialized() {
if (datumReader != null) {
return;
}
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
SpecificData specificData = new SpecificData(cl);
this.datumReader = new SpecificDatumReader<>(specificData);
this.reader = specificData.getSchema(recordClazz);
} else {
this.reader = new Schema.Parser().parse(schemaString);
GenericData genericData = new GenericData(cl);
this.datumReader = new GenericDatumReader<>(null, this.reader, genericData);
}
this.inputStream = new MutableByteArrayInputStream();
this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
@VisibleForTesting
byte[] encodeRecord(IndexedRecord record) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Schema schema = record.getSchema();
try {
EncoderFactory encoderfactory = new EncoderFactory();
BinaryEncoder encoder = encoderfactory.binaryEncoder(out, null);
DatumWriter<IndexedRecord> writer;
if (record instanceof SpecificRecord) {
writer = new SpecificDatumWriter<>(schema);
} else {
writer = new GenericDatumWriter<>(schema);
}
writer.write(record, encoder);
encoder.flush(); //encoder may buffer
} catch (Exception e) {
throw new SamzaException("Unable to serialize Avro record using schema within the record: " + schema.toString(), e);
}
return out.toByteArray();
}
@Test
public void testReadRowFromSimpleRecord() throws IOException {
Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> simple = TestUtil.getSimpleRecordTestData();
Path path = TestUtil.createTempParquetFile(
tempRoot.getRoot(), TestUtil.SIMPLE_SCHEMA, Arrays.asList(simple.f1, simple.f1));
MessageType simpleType = SCHEMA_CONVERTER.convert(TestUtil.SIMPLE_SCHEMA);
ParquetRowInputFormat inputFormat = new ParquetRowInputFormat(path, simpleType);
inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertEquals(1, splits.length);
inputFormat.open(splits[0]);
Row row = inputFormat.nextRecord(null);
assertNotNull(row);
assertEquals(simple.f2, row);
row = inputFormat.nextRecord(null);
assertNotNull(row);
assertEquals(simple.f2, row);
assertTrue(inputFormat.reachedEnd());
}
public static <W, R extends SpecificRecord> Function<W, R> loadToAvroStaticMethod(String clazz, String methodName, Class<W> argType) throws ClassNotFoundException, NoSuchMethodException {
val method = Class.forName(clazz)
.getDeclaredMethod(methodName, argType);
Function<W, R> toAvroFn = obj -> {
try {
// We set null as first argument, since we're expecting an static method
return (R) method.invoke(null, obj);
} catch (Exception e) {
log.error("There was an error in {}.{} (toAvro) method: {}", clazz, methodName, e.getMessage(), e);
throw new RuntimeException(e);
}
};
return toAvroFn;
}
@Test
public void testReadRowFromNestedRecord() throws IOException {
Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> nested = TestUtil.getNestedRecordTestData();
Path path = TestUtil.createTempParquetFile(tempRoot.newFolder(), TestUtil.NESTED_SCHEMA, Collections.singletonList(nested.f1));
MessageType nestedType = SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
ParquetRowInputFormat inputFormat = new ParquetRowInputFormat(path, nestedType);
inputFormat.setRuntimeContext(TestUtil.getMockRuntimeContext());
FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertEquals(1, splits.length);
inputFormat.open(splits[0]);
Row row = inputFormat.nextRecord(null);
assertNotNull(row);
assertEquals(7, row.getArity());
assertEquals(nested.f2.getField(0), row.getField(0));
assertEquals(nested.f2.getField(1), row.getField(1));
assertArrayEquals((Long[]) nested.f2.getField(3), (Long[]) row.getField(3));
assertArrayEquals((String[]) nested.f2.getField(4), (String[]) row.getField(4));
assertEquals(nested.f2.getField(5), row.getField(5));
assertArrayEquals((Row[]) nested.f2.getField(6), (Row[]) row.getField(6));
}
/**
* generates the md5 hash of the schemaId and appends it to the given byte array.
* the byte array representing the payload of a BrooklinEnvelope
*
* This is done so when the client decodes the payload, it will contain a schemaId which
* can be used to retrieve the schema from the Schema Registry
*
* This method also converts an IndexedRecord into a byte array first
*/
public static byte[] encode(String schemaId, IndexedRecord record) throws AvroEncodingException {
Validate.notNull(record, "cannot encode null Record, schemaId: " + schemaId);
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(MAGIC_BYTE);
byte[] md5Bytes = hexToMd5(schemaId);
try {
out.write(md5Bytes);
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
DatumWriter<org.apache.avro.generic.IndexedRecord> writer;
if (record instanceof SpecificRecord) {
writer = new SpecificDatumWriter<>(record.getSchema());
} else {
writer = new GenericDatumWriter<>(record.getSchema());
}
writer.write(record, encoder);
encoder.flush(); //encoder may buffer
} catch (IOException e) {
throw new AvroEncodingException(e);
}
return out.toByteArray();
}
public static <T extends SpecificRecord> T toSpecificRecord(GenericData.Record record) throws IOException {
GenericDatumWriter<GenericData.Record> datumWriter = new GenericDatumWriter<>(record.getSchema());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null);
datumWriter.write(record, binaryEncoder);
binaryEncoder.flush();
SpecificDatumReader<T> datumReader = new SpecificDatumReader<>(record.getSchema());
return datumReader.read(null, DecoderFactory.get().binaryDecoder(baos.toByteArray(), null));
}
@Override
public DatumWriter<T> createDatumWriter(T data, Schema schema) {
if (data instanceof SpecificRecord) {
return new SpecificDatumWriter<>(schema);
} else {
return new GenericDatumWriter<>(schema);
}
}
private DatumWriter getDatumWriter(Class<?> type, Schema schema){
DatumWriter writer = null;
logger.debug("Finding correct DatumWriter for type {}",type.getName());
if(SpecificRecord.class.isAssignableFrom(type)){
writer = new SpecificDatumWriter<>(schema);
}else if(GenericRecord.class.isAssignableFrom(type)){
writer = new GenericDatumWriter<>(schema);
}else{
writer = new ReflectDatumWriter<>(schema);
}
logger.debug("DatumWriter of type {} selected",writer.getClass().getName());
return writer;
}
@InterfaceAudience.Private
@Override
public DatumReader getReader(Class<SpecificRecord> clazz) {
try {
return new SpecificDatumReader(clazz.newInstance().getSchema());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
@SuppressWarnings("unchecked")
public TypeInformation<T> getProducedType() {
if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
return new AvroTypeInfo(recordClazz);
} else {
return (TypeInformation<T>) new GenericRecordAvroTypeInfo(this.reader);
}
}
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
recordClazz = (Class<? extends SpecificRecord>) inputStream.readObject();
schemaString = inputStream.readUTF();
typeInfo = (RowTypeInfo) AvroSchemaConverter.<Row>convertToTypeInfo(schemaString);
schema = new Schema.Parser().parse(schemaString);
if (recordClazz != null) {
record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
} else {
record = new GenericData.Record(schema);
}
datumReader = new SpecificDatumReader<>(schema);
this.inputStream = new MutableByteArrayInputStream();
decoder = DecoderFactory.get().binaryDecoder(this.inputStream, null);
}
/**
* Creates an Avro serialization schema for the given specific record class.
*
* @param recordClazz Avro record class used to serialize Flink's row to Avro's record
*/
public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) {
Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
this.recordClazz = recordClazz;
this.schema = SpecificData.get().getSchema(recordClazz);
this.schemaString = schema.toString();
this.datumWriter = new SpecificDatumWriter<>(schema);
this.arrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
}
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
recordClazz = (Class<? extends SpecificRecord>) inputStream.readObject();
schemaString = (String) inputStream.readObject();
if (recordClazz != null) {
schema = SpecificData.get().getSchema(recordClazz);
} else {
schema = new Schema.Parser().parse(schemaString);
}
datumWriter = new SpecificDatumWriter<>(schema);
arrayOutputStream = new ByteArrayOutputStream();
encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
}
/** Get generated getters for an AVRO-generated SpecificRecord or a POJO. */
public static <T> List<FieldValueGetter> getGetters(Class<T> clazz, Schema schema) {
if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) {
return JavaBeanUtils.getGetters(
clazz,
schema,
new AvroSpecificRecordFieldValueTypeSupplier(),
new AvroTypeConversionFactory());
} else {
return POJOUtils.getGetters(
clazz, schema, new AvroPojoFieldValueTypeSupplier(), new AvroTypeConversionFactory());
}
}
/** Get field types for an AVRO-generated SpecificRecord or a POJO. */
public static <T> List<FieldValueTypeInformation> getFieldTypes(Class<T> clazz, Schema schema) {
if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) {
return JavaBeanUtils.getFieldTypes(
clazz, schema, new AvroSpecificRecordFieldValueTypeSupplier());
} else {
return POJOUtils.getFieldTypes(clazz, schema, new AvroPojoFieldValueTypeSupplier());
}
}
@Test
public void testSpecificSerializeFromSchemaSeveralTimes() throws IOException {
final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
final String schemaString = testData.f1.getSchema().toString();
final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(schemaString);
final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString);
serializationSchema.serialize(testData.f2);
serializationSchema.serialize(testData.f2);
final byte[] bytes = serializationSchema.serialize(testData.f2);
final Row actual = deserializationSchema.deserialize(bytes);
assertEquals(testData.f2, actual);
}
@Test
public void testSpecificDeserializeFromClassSeveralTimes() throws IOException {
final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
final byte[] bytes = serializationSchema.serialize(testData.f2);
deserializationSchema.deserialize(bytes);
deserializationSchema.deserialize(bytes);
final Row actual = deserializationSchema.deserialize(bytes);
assertEquals(testData.f2, actual);
}
@Test
public void testSpecificDeserializeFromSchemaSeveralTimes() throws IOException {
final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
final String schemaString = testData.f1.getSchema().toString();
final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(schemaString);
final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString);
final byte[] bytes = serializationSchema.serialize(testData.f2);
deserializationSchema.deserialize(bytes);
deserializationSchema.deserialize(bytes);
final Row actual = deserializationSchema.deserialize(bytes);
assertEquals(testData.f2, actual);
}
public static String toString(final SpecificRecord record) {
final String jsonEncodedRecord;
try {
final Schema schema = record.getSchema();
final ByteArrayOutputStream bos = new ByteArrayOutputStream();
final Encoder encoder = EncoderFactory.get().jsonEncoder(schema, bos);
final SpecificDatumWriter datumWriter = new SpecificDatumWriter(record.getClass());
datumWriter.write(record, encoder);
encoder.flush();
jsonEncodedRecord = new String(bos.toByteArray(), Charset.forName("UTF-8"));
} catch (final IOException e) {
throw new RuntimeException(e);
}
return jsonEncodedRecord;
}
/**
* Creates a Avro deserialization schema for the given specific record class. Having the
* concrete Avro record class might improve performance.
*
* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
*/
public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
this.recordClazz = recordClazz;
schema = SpecificData.get().getSchema(recordClazz);
typeInfo = (RowTypeInfo) AvroSchemaConverter.convertToTypeInfo(recordClazz);
schemaString = schema.toString();
record = (IndexedRecord) SpecificData.newInstance(recordClazz, schema);
datumReader = new SpecificDatumReader<>(schema);
inputStream = new MutableByteArrayInputStream();
decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
static <T extends SpecificRecord> SpecificAvroSerde<T> getSpecificAvroSerde(final Properties envProps) {
final SpecificAvroSerde<T> specificAvroSerde = new SpecificAvroSerde<>();
final HashMap<String, String> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
envProps.getProperty("schema.registry.url"));
specificAvroSerde.configure(serdeConfig, false);
return specificAvroSerde;
}
@Test
public void testSpecificSerializeDeserializeFromSchema() throws IOException {
final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSpecificTestData();
final String schemaString = testData.f1.getSchema().toString();
final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(schemaString);
final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(schemaString);
final byte[] bytes = serializationSchema.serialize(testData.f2);
final Row actual = deserializationSchema.deserialize(bytes);
assertEquals(testData.f2, actual);
}
/**
* Creates an Avro serialization schema for the given specific record class.
*
* @param recordClazz Avro record class used to serialize Flink's row to Avro's record
*/
public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) {
Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
this.recordClazz = recordClazz;
this.schema = SpecificData.get().getSchema(recordClazz);
this.schemaString = schema.toString();
this.datumWriter = new SpecificDatumWriter<>(schema);
this.arrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
}
@Override
public DeserializationSchema<Row> createDeserializationSchema(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
// create and configure
if (descriptorProperties.containsKey(AvroValidator.FORMAT_RECORD_CLASS)) {
return new AvroRowDeserializationSchema(
descriptorProperties.getClass(AvroValidator.FORMAT_RECORD_CLASS, SpecificRecord.class));
} else {
return new AvroRowDeserializationSchema(descriptorProperties.getString(AvroValidator.FORMAT_AVRO_SCHEMA));
}
}
@Override
public SerializationSchema<Row> createSerializationSchema(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
// create and configure
if (descriptorProperties.containsKey(AvroValidator.FORMAT_RECORD_CLASS)) {
return new AvroRowSerializationSchema(
descriptorProperties.getClass(AvroValidator.FORMAT_RECORD_CLASS, SpecificRecord.class));
} else {
return new AvroRowSerializationSchema(descriptorProperties.getString(AvroValidator.FORMAT_AVRO_SCHEMA));
}
}
/**
* Creates a Avro deserialization schema for the given specific record class. Having the
* concrete Avro record class might improve performance.
*
* @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
*/
public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
this.recordClazz = recordClazz;
schema = SpecificData.get().getSchema(recordClazz);
typeInfo = (RowTypeInfo) AvroSchemaConverter.convertToTypeInfo(recordClazz);
schemaString = schema.toString();
record = (IndexedRecord) SpecificData.newInstance(recordClazz, schema);
datumReader = new SpecificDatumReader<>(schema);
inputStream = new MutableByteArrayInputStream();
decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}