类org.apache.avro.specific.SpecificDatumReader源码实例Demo

下面列出了怎么用org.apache.avro.specific.SpecificDatumReader的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: avro-fastserde   文件: FastSerdeCache.java
/**
 * Generates if needed and returns specific-class aware avro {@link FastDeserializer}.
 *
 * @param writerSchema
 *            {@link Schema} of written data
 * @param readerSchema
 *            {@link Schema} intended to be used during deserialization
 * @return specific-class aware avro {@link FastDeserializer}
 */
public FastDeserializer<?> getFastSpecificDeserializer(Schema writerSchema,
        Schema readerSchema) {
    String schemaKey = getSchemaKey(writerSchema, readerSchema);
    FastDeserializer<?> deserializer = fastSpecificRecordDeserializersCache.get(schemaKey);

    if (deserializer == null) {
        SpecificDatumReader<?> fallbackReader = new SpecificDatumReader<>(writerSchema, readerSchema);
        deserializer = fastSpecificRecordDeserializersCache.putIfAbsent(schemaKey,
                d -> fallbackReader.read(null, d));
        if (deserializer == null) {
            deserializer = fastSpecificRecordDeserializersCache.get(schemaKey);
            CompletableFuture
                    .supplyAsync(() -> buildSpecificDeserializer(writerSchema, readerSchema), executor).thenAccept(
                            d -> {
                                fastSpecificRecordDeserializersCache.put(schemaKey, d);
                            });
        }
    }

    return deserializer;
}
 
源代码2 项目: Flink-CEPplus   文件: AvroDeserializationSchema.java
void checkAvroInitialized() {
	if (datumReader != null) {
		return;
	}

	ClassLoader cl = Thread.currentThread().getContextClassLoader();
	if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
		SpecificData specificData = new SpecificData(cl);
		this.datumReader = new SpecificDatumReader<>(specificData);
		this.reader = specificData.getSchema(recordClazz);
	} else {
		this.reader = new Schema.Parser().parse(schemaString);
		GenericData genericData = new GenericData(cl);
		this.datumReader = new GenericDatumReader<>(null, this.reader, genericData);
	}

	this.inputStream = new MutableByteArrayInputStream();
	this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
 
源代码3 项目: Flink-CEPplus   文件: AvroInputFormat.java
private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
	DatumReader<E> datumReader;

	if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
		datumReader = new GenericDatumReader<E>();
	} else {
		datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
			? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
	}
	if (LOG.isInfoEnabled()) {
		LOG.info("Opening split {}", split);
	}

	SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
	DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);

	if (LOG.isDebugEnabled()) {
		LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
	}

	end = split.getStart() + split.getLength();
	recordsReadSinceLastSync = 0;
	return dataFileReader;
}
 
@Test
public void testAvroParsing() throws IOException, SAXException {
  File input = new File("src/test/resources/en/mercedes.xml");
  final File output = File.createTempFile("jsonwikipedia-mercedes", ".avro");
  output.deleteOnExit();

  WikipediaArticleReader wap = new WikipediaArticleReader(input, output, "en");
  wap.start();

  // reading the encoded avro and checking that it is correct
  DatumReader<Article> userDatumReader = new SpecificDatumReader<>(Article.getClassSchema());
  DataFileReader<Article> dataFileReader = new DataFileReader<>(output, userDatumReader);
  assertTrue(dataFileReader.hasNext());
  Article article = new Article();
  dataFileReader.next(article);
  assertEquals("Mercedes-Benz", article.getTitle());
  assertEquals("Mercedes-Benz", article.getWikiTitle());
}
 
源代码5 项目: flink   文件: AvroInputFormat.java
private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
	DatumReader<E> datumReader;

	if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
		datumReader = new GenericDatumReader<E>();
	} else {
		datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
			? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
	}
	if (LOG.isInfoEnabled()) {
		LOG.info("Opening split {}", split);
	}

	SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
	DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);

	if (LOG.isDebugEnabled()) {
		LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
	}

	end = split.getStart() + split.getLength();
	recordsReadSinceLastSync = 0;
	return dataFileReader;
}
 
源代码6 项目: laser   文件: LaserFeatureListenser.java
public synchronized void recieveMessages(Message message) {
	final DatumReader<B5MEvent> reader = new SpecificDatumReader<B5MEvent>(
			B5MEvent.SCHEMA$);

	final B5MEvent b5mEvent = new B5MEvent();

	byte[] data = message.getData();

	BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null);
	try {
		reader.read(b5mEvent, decoder);
		for (LaserMessageConsumer consumer : this.consumer) {
			consumer.write(b5mEvent);
		}
	} catch (Exception e) {
		e.printStackTrace();
	}
}
 
public KafkaAvroJobStatusMonitor(String topic, Config config, int numThreads)
    throws IOException, ReflectiveOperationException {
  super(topic, config, numThreads);
  if (ConfigUtils.getBoolean(config, ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false)) {
    KafkaAvroSchemaRegistry schemaRegistry = (KafkaAvroSchemaRegistry) new KafkaAvroSchemaRegistryFactory().
        create(ConfigUtils.configToProperties(config));
    this.schemaVersionWriter = new SchemaRegistryVersionWriter(schemaRegistry, topic, GobblinTrackingEvent.SCHEMA$);
  } else {
    this.schemaVersionWriter = new FixedSchemaVersionWriter();
  }
  this.decoder = ThreadLocal.withInitial(() -> {
    InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
    return DecoderFactory.get().binaryDecoder(dummyInputStream, null);
  });
  this.reader = ThreadLocal.withInitial(() -> new SpecificDatumReader<>(GobblinTrackingEvent.SCHEMA$));
}
 
@SuppressWarnings("unchecked")
private <T> T decodeRecordFast(Schema writerSchema, Decoder decoder) {
  SpecificDatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema);
  try {
    return datumReader.read(null, decoder);
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}
 
源代码9 项目: doctorkafka   文件: DoctorKafkaActionsServlet.java
@Override
 public void renderJSON(PrintWriter writer, Map<String, String> params) {
   JsonArray json = new JsonArray();

   for (ConsumerRecord<byte[], byte[]> record : Lists.reverse(retrieveActionReportMessages())) {
     try {
JsonObject jsonRecord = new JsonObject();
BinaryDecoder binaryDecoder = avroDecoderFactory.binaryDecoder(record.value(), null);
SpecificDatumReader<OperatorAction> reader =
  new SpecificDatumReader<>(operatorActionSchema);

OperatorAction result = new OperatorAction();
reader.read(result, binaryDecoder);

jsonRecord.add("date",gson.toJsonTree(new Date(result.getTimestamp())));
jsonRecord.add("clusterName",gson.toJsonTree(result.getClusterName()));
jsonRecord.add("description",gson.toJsonTree(result.getDescription()));
json.add(jsonRecord);
     } catch (Exception e) {
LOG.info("Fail to decode an message", e);
     }
   }
   writer.print(json);
 }
 
源代码10 项目: stratosphere   文件: AvroInputFormat.java
@Override
public void open(FileInputSplit split) throws IOException {
	super.open(split);
	
	this.wrapper = InstantiationUtil.instantiate(avroWrapperTypeClass, AvroBaseValue.class);
	
	DatumReader<E> datumReader;
	if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
		datumReader = new SpecificDatumReader<E>(avroValueType);
	} else {
		datumReader = new ReflectDatumReader<E>(avroValueType);
	}
	
	LOG.info("Opening split " + split);
	
	SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());
	
	dataFileReader = DataFileReader.openReader(in, datumReader);
	dataFileReader.sync(split.getStart());
	
	reuseAvroValue = null;
}
 
源代码11 项目: incubator-gobblin   文件: KafkaAvroJobMonitor.java
public KafkaAvroJobMonitor(String topic, MutableJobCatalog catalog, Config config, Schema schema,
    SchemaVersionWriter<?> versionWriter) {
  super(topic, catalog, config);

  this.schema = schema;
  this.decoder = new ThreadLocal<BinaryDecoder>() {
    @Override
    protected BinaryDecoder initialValue() {
      InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]);
      return DecoderFactory.get().binaryDecoder(dummyInputStream, null);
    }
  };
  this.reader = new ThreadLocal<SpecificDatumReader<T>>() {
    @Override
    protected SpecificDatumReader<T> initialValue() {
      return new SpecificDatumReader<>(KafkaAvroJobMonitor.this.schema);
    }
  };
  this.versionWriter = versionWriter;
}
 
源代码12 项目: Decision   文件: StreamToActionBusCallbackTest.java
@Before
public void setUp() throws Exception {
    siddhiManager= new StreamingSiddhiConfiguration().siddhiManager();
    siddhiManager.defineStream(StreamsHelper.STREAM_DEFINITION);
    metadataService= new StreamMetadataService(siddhiManager);
    javaToSiddhiSerializer= new JavaToSiddhiSerializer(metadataService);
    javaToAvroSerializer = new JavaToAvroSerializer(new SpecificDatumReader(InsertMessage.getClassSchema()));

    Set<StreamAction> activeActions= new ListOrderedSet();
    activeActions.add(StreamAction.LISTEN);

    producer= Mockito.mock(Producer.class);
    avroProducer= Mockito.mock(Producer.class);
    //List<KeyedMessage<String, String>> km= any();
    //doNothing().when(producer).send(km);
    doNothing().when(producer).send(Matchers.<List<KeyedMessage<String, String>>>any());

    cbk= new StreamToActionBusCallback(activeActions, streamName, avroProducer,
            javaToSiddhiSerializer, javaToAvroSerializer);
}
 
源代码13 项目: flink   文件: AvroInputFormat.java
private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
	DatumReader<E> datumReader;

	if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
		datumReader = new GenericDatumReader<E>();
	} else {
		datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
			? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
	}
	if (LOG.isInfoEnabled()) {
		LOG.info("Opening split {}", split);
	}

	SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
	DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);

	if (LOG.isDebugEnabled()) {
		LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
	}

	end = split.getStart() + split.getLength();
	recordsReadSinceLastSync = 0;
	return dataFileReader;
}
 
源代码14 项目: hiped2   文件: AvroStockFileRead.java
public static void dumpStream(InputStream is) throws IOException {
  DataFileStream<Stock> reader =
      new DataFileStream<Stock>(
          is,
          new SpecificDatumReader<Stock>(Stock.class));

  for (Stock a : reader) {
    System.out.println(ToStringBuilder.reflectionToString(a,
        ToStringStyle.SIMPLE_STYLE
    ));
  }

  IOUtils.closeStream(is);
  IOUtils.closeStream(reader);
}
 
源代码15 项目: kite   文件: AvroEntitySerDe.java
private DatumReader<Object> buildDatumReader(Schema schema,
    Schema writtenSchema) {
  if (specific) {
    return new SpecificDatumReader<Object>(writtenSchema, schema);
  } else {
    return new GenericDatumReader<Object>(writtenSchema, schema);
  }
}
 
源代码16 项目: reef   文件: AvroEvaluatorRequestSerializer.java
/**
 * Deserialize EvaluatorRequest.
 */
public static EvaluatorRequest fromString(final String serializedRequest) {
  try {
    final Decoder decoder =
        DecoderFactory.get().jsonDecoder(AvroEvaluatorRequest.getClassSchema(), serializedRequest);
    final SpecificDatumReader<AvroEvaluatorRequest> reader = new SpecificDatumReader<>(AvroEvaluatorRequest.class);
    return fromAvro(reader.read(null, decoder));
  } catch (final IOException ex) {
    throw new RuntimeException("Unable to deserialize compute request", ex);
  }
}
 
源代码17 项目: reef   文件: FailedTaskBridge.java
private byte[] generateFailedTaskSerializedAvro() throws IOException {
  AvroFailedTask avroFailedTask = null;

  if (jfailedTask.getData() != null && jfailedTask.getData().isPresent()) {
    // Deserialize what was passed in from C#.
    try (ByteArrayInputStream fileInputStream = new ByteArrayInputStream(jfailedTask.getData().get())) {
      final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
          AvroFailedTask.getClassSchema(), fileInputStream);
      final SpecificDatumReader<AvroFailedTask> reader =
          new SpecificDatumReader<>(AvroFailedTask.class);
      avroFailedTask = reader.read(null, decoder);
    }
  } else {
    // This may result from a failed Evaluator.
    avroFailedTask = AvroFailedTask.newBuilder()
        .setIdentifier(jfailedTask.getId())
        .setCause(ByteBuffer.wrap(new byte[0]))
        .setData(ByteBuffer.wrap(new byte[0]))
        .setMessage("")
        .build();
  }

  // Overwrite the message if Java provides a message and C# does not.
  // Typically the case for failed Evaluators.
  if (StringUtils.isNoneBlank(jfailedTask.getMessage()) &&
      StringUtils.isBlank(avroFailedTask.getMessage().toString())) {
    avroFailedTask.setMessage(jfailedTask.getMessage());
  }

  final DatumWriter<AvroFailedTask> datumWriter = new SpecificDatumWriter<>(AvroFailedTask.class);

  try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
    final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(avroFailedTask.getSchema(), outputStream);
    datumWriter.write(avroFailedTask, encoder);
    encoder.flush();
    outputStream.flush();
    return outputStream.toByteArray();
  }
}
 
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
	recordClazz = (Class<? extends SpecificRecord>) inputStream.readObject();
	schemaString = inputStream.readUTF();
	typeInfo = (RowTypeInfo) AvroSchemaConverter.<Row>convertToTypeInfo(schemaString);
	schema = new Schema.Parser().parse(schemaString);
	if (recordClazz != null) {
		record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
	} else {
		record = new GenericData.Record(schema);
	}
	datumReader = new SpecificDatumReader<>(schema);
	this.inputStream = new MutableByteArrayInputStream();
	decoder = DecoderFactory.get().binaryDecoder(this.inputStream, null);
}
 
源代码19 项目: Flink-CEPplus   文件: AvroRecordInputFormatTest.java
/**
 * This test validates proper serialization with specific (generated POJO) types.
 */
@Test
public void testDeserializeToSpecificType() throws IOException {

	DatumReader<User> datumReader = new SpecificDatumReader<>(userSchema);

	try (FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader)) {
		User rec = dataFileReader.next();

		// check if record has been read correctly
		assertNotNull(rec);
		assertEquals("name not equal", TEST_NAME, rec.get("name").toString());
		assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString());

		// now serialize it with our framework:
		ExecutionConfig ec = new ExecutionConfig();
		TypeInformation<User> te = TypeExtractor.createTypeInfo(User.class);

		assertEquals(AvroTypeInfo.class, te.getClass());
		TypeSerializer<User> tser = te.createSerializer(ec);

		ByteArrayOutputStream out = new ByteArrayOutputStream();
		try (DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out)) {
			tser.serialize(rec, outView);
		}

		User newRec;
		try (DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(
				new ByteArrayInputStream(out.toByteArray()))) {
			newRec = tser.deserialize(inView);
		}

		// check if it is still the same
		assertNotNull(newRec);
		assertEquals("name not equal", TEST_NAME, newRec.getName().toString());
		assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString());
	}
}
 
源代码20 项目: hiped2   文件: AvroStockAvgFileRead.java
public static void readFromAvro(InputStream is) throws IOException {
  DataFileStream<StockAvg> reader =     //<co id="ch03_smallfileread_comment1"/>
      new DataFileStream<StockAvg>(
          is,
          new SpecificDatumReader<StockAvg>(StockAvg.class));

  for (StockAvg a : reader) {          //<co id="ch03_smallfileread_comment2"/>
    System.out.println(ToStringBuilder.reflectionToString(a,
        ToStringStyle.SHORT_PREFIX_STYLE
    ));
  }

  IOUtils.closeStream(is);
  IOUtils.closeStream(reader);
}
 
/**
 * Reads avro object from input stream.
 *
 * @param inputStream The input stream to read from
 * @return Avro object
 * @throws IOException
 */
AvroMultiRuntimeAppSubmissionParameters fromInputStream(final InputStream inputStream) throws IOException {
  final JsonDecoder decoder = DecoderFactory.get().jsonDecoder(
          AvroMultiRuntimeAppSubmissionParameters.getClassSchema(), inputStream);
  final SpecificDatumReader<AvroMultiRuntimeAppSubmissionParameters> reader = new SpecificDatumReader<>(
          AvroMultiRuntimeAppSubmissionParameters.class);
  return reader.read(null, decoder);
}
 
源代码22 项目: flink   文件: AvroFactory.java
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private static <T> AvroFactory<T> fromSpecific(Class<T> type, ClassLoader cl, Optional<Schema> previousSchema) {
	SpecificData specificData = new SpecificData(cl);
	Schema newSchema = specificData.getSchema(type);

	return new AvroFactory<>(
		specificData,
		newSchema,
		new SpecificDatumReader<>(previousSchema.orElse(newSchema), newSchema, specificData),
		new SpecificDatumWriter<>(newSchema, specificData)
	);
}
 
源代码23 项目: flink   文件: AvroRowDeserializationSchema.java
/**
 * Creates a Avro deserialization schema for the given specific record class. Having the
 * concrete Avro record class might improve performance.
 *
 * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
 */
public AvroRowDeserializationSchema(Class<? extends SpecificRecord> recordClazz) {
	Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
	this.recordClazz = recordClazz;
	schema = SpecificData.get().getSchema(recordClazz);
	typeInfo = (RowTypeInfo) AvroSchemaConverter.convertToTypeInfo(recordClazz);
	schemaString = schema.toString();
	record = (IndexedRecord) SpecificData.newInstance(recordClazz, schema);
	datumReader = new SpecificDatumReader<>(schema);
	inputStream = new MutableByteArrayInputStream();
	decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}
 
源代码24 项目: flink   文件: AvroRowDeserializationSchema.java
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream inputStream) throws ClassNotFoundException, IOException {
	recordClazz = (Class<? extends SpecificRecord>) inputStream.readObject();
	schemaString = inputStream.readUTF();
	typeInfo = (RowTypeInfo) AvroSchemaConverter.<Row>convertToTypeInfo(schemaString);
	schema = new Schema.Parser().parse(schemaString);
	if (recordClazz != null) {
		record = (SpecificRecord) SpecificData.newInstance(recordClazz, schema);
	} else {
		record = new GenericData.Record(schema);
	}
	datumReader = new SpecificDatumReader<>(schema);
	this.inputStream = new MutableByteArrayInputStream();
	decoder = DecoderFactory.get().binaryDecoder(this.inputStream, null);
}
 
源代码25 项目: clickstream-tutorial   文件: JavaSessionize.java
private void readObject(java.io.ObjectInputStream in)
        throws IOException, ClassNotFoundException {
    DatumReader<LogLine> reader =
            new SpecificDatumReader<LogLine>(LogLine.class);
    Decoder decoder = DecoderFactory.get().binaryDecoder(in, null);
    setValues(reader.read(null, decoder));
}
 
private void roundtrip(Object thingie) throws Exception {
  Schema schema = SpecificData.get().getSchema(thingie.getClass());
  ByteArrayOutputStream os = new ByteArrayOutputStream();
  BinaryEncoder binaryEncoder = AvroCompatibilityHelper.newBinaryEncoder(os, false, null);
  SpecificDatumWriter<Object> writer = new SpecificDatumWriter<>(schema);
  writer.write(thingie, binaryEncoder);
  binaryEncoder.flush();
  byte[] serialized = os.toByteArray();
  ByteArrayInputStream is = new ByteArrayInputStream(serialized);
  BinaryDecoder binaryDecoder = AvroCompatibilityHelper.newBinaryDecoder(is, false, null);
  SpecificDatumReader<Object> reader = new SpecificDatumReader<>(schema);
  Object deserialize = reader.read(null, binaryDecoder);
  Assert.assertEquals(deserialize, thingie);
}
 
private void roundtrip(Object thingie) throws Exception {
  Schema schema = SpecificData.get().getSchema(thingie.getClass());
  ByteArrayOutputStream os = new ByteArrayOutputStream();
  BinaryEncoder binaryEncoder = AvroCompatibilityHelper.newBinaryEncoder(os, false, null);
  SpecificDatumWriter<Object> writer = new SpecificDatumWriter<>(schema);
  writer.write(thingie, binaryEncoder);
  binaryEncoder.flush();
  byte[] serialized = os.toByteArray();
  ByteArrayInputStream is = new ByteArrayInputStream(serialized);
  BinaryDecoder binaryDecoder = AvroCompatibilityHelper.newBinaryDecoder(is, false, null);
  SpecificDatumReader<Object> reader = new SpecificDatumReader<>(schema);
  Object deserialize = reader.read(null, binaryDecoder);
  Assert.assertEquals(deserialize, thingie);
}
 
源代码28 项目: incubator-gobblin   文件: EventUtils.java
/**
 * Parses a {@link org.apache.gobblin.metrics.GobblinTrackingEvent} from a byte array Avro serialization.
 * @param reuse GobblinTrackingEvent to reuse.
 * @param bytes Input bytes.
 * @param schemaId Expected schemaId.
 * @return GobblinTrackingEvent.
 * @throws java.io.IOException
 */
public synchronized static GobblinTrackingEvent deserializeEventFromAvroSerialization(GobblinTrackingEvent reuse, byte[] bytes, @Nullable String schemaId)
    throws IOException {
  if (!reader.isPresent()) {
    reader = Optional.of(new SpecificDatumReader<>(GobblinTrackingEvent.class));
  }

  Closer closer = Closer.create();

  try {
    DataInputStream inputStream = closer.register(new DataInputStream(new ByteArrayInputStream(bytes)));

    if (schemaId != null) {
      MetricReportUtils.readAndVerifySchemaId(inputStream, schemaId);
    } else {
      MetricReportUtils.readAndVerifySchemaVersion(inputStream);
    }

    // Decode the rest
    Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
    return reader.get().read(reuse, decoder);
  } catch(Throwable t) {
    throw closer.rethrow(t);
  } finally {
    closer.close();
  }
}
 
@SuppressWarnings("unchecked")
private <T> T decodeRecordSlow(Schema readerSchema, Schema writerSchema, Decoder decoder) {
  org.apache.avro.io.DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
  try {
    return datumReader.read(null, decoder);
  } catch (IOException e) {
    e.printStackTrace();
    return null;
  }
}
 
源代码30 项目: avro-util   文件: SerDeMicroBenchmark.java
@Test(invocationCount = 5, groups = {"perfTest"})
public void testAvroDeserialization() throws Exception {
  byte[] serializedBytes = serializedTestObjects(1000);
  long startInMs = System.currentTimeMillis();
  AvroGenericDeserializer<BenchmarkTestObject> deserializer =
      new AvroGenericDeserializer<>(new SpecificDatumReader<>(BenchmarkTestObject.class));
  for (int i = 0; i <= 10000; ++i) {
    deserializer.deserializeObjects(serializedBytes);
  }
  System.out.println("Regular avro deserialization latency: " + (System.currentTimeMillis() - startInMs) + " ms");
}
 
 类所在包
 类方法
 同包方法