下面列出了怎么用org.apache.avro.mapred.AvroWrapper的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!reader.hasNext() || reader.pastSync(end)) {
key = null;
value = null;
return false;
}
if (key == null) {
key = new AvroWrapper<T>();
}
if (value == null) {
value = NullWritable.get();
}
key.datum(reader.next(key.datum()));
return true;
}
@Override
public List<Object> call(Tuple2<AvroWrapper, NullWritable> avroTuple)
{
final GenericData.Record datum = (GenericData.Record) avroTuple._1().datum();
List<Object> row = new ArrayList<>(this.headers.size());
for (String header : this.headers)
{
Object value = datum.get(header);
if (value instanceof CharSequence) // Avro Utf8 type
{
value = value.toString();
}
row.add(value);
}
return row;
}
public void reduce(Text key,
Iterator<DoubleWritable> values,
OutputCollector<AvroWrapper<StockAvg>,
NullWritable> output,
Reporter reporter) throws IOException {
Mean mean = new Mean();
while (values.hasNext()) {
mean.increment(values.next().get());
}
StockAvg avg = new StockAvg();
avg.setSymbol(key.toString());
avg.setAvg(mean.getResult());
output.collect(new AvroWrapper<StockAvg>(avg),
NullWritable.get());
}
@Override
public boolean openForRead() throws Exception {
// Pass the schema to the AvroInputFormat
AvroJob.setInputSchema(jobConf, schema);
// The avroWrapper required for the iteration
avroWrapper = new AvroWrapper<>();
return super.openForRead();
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
wrapper = new AvroWrapper<GenericRecord>();
schema = AvroJob.getOutputSchema(context.getConfiguration());
bigDecimalFormatString = context.getConfiguration().getBoolean(
ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT, ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
}
@Override
public RecordWriter<AvroWrapper<T>, NullWritable> getRecordWriter(
TaskAttemptContext context) throws IOException, InterruptedException {
boolean isMapOnly = context.getNumReduceTasks() == 0;
Schema schema =
isMapOnly ? AvroJob.getMapOutputSchema(context.getConfiguration())
: AvroJob.getOutputSchema(context.getConfiguration());
final DataFileWriter<T> WRITER =
new DataFileWriter<T>(new ReflectDatumWriter<T>());
configureDataFileWriter(WRITER, context);
Path path = getDefaultWorkFile(context, EXT);
WRITER.create(schema,
path.getFileSystem(context.getConfiguration()).create(path));
return new RecordWriter<AvroWrapper<T>, NullWritable>() {
@Override
public void write(AvroWrapper<T> wrapper, NullWritable ignore)
throws IOException {
WRITER.append(wrapper.datum());
}
@Override
public void close(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
WRITER.close();
}
};
}
@Override
public RecordReader<AvroWrapper<T>, NullWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
context.setStatus(split.toString());
return new AvroRecordReader<T>();
}
@Override
public DistributedTable get()
{
JavaPairRDD<AvroWrapper, NullWritable> avroRdd = this.sparkContext.hadoopFile(this.dataPath.toString(), AvroInputFormat.class, AvroWrapper.class, NullWritable.class);
LOGGER.info("data location: {}", this.dataPath);
List<String> headers = avroRdd.keys().map(new AvroHeadersFunction()).first();
LOGGER.info("data headers: {}", headers);
JavaRDD<List<Object>> rows = avroRdd.map(new AvroRowsFunction(headers));
return new DistributedTable(headers, rows);
}
public void map(LongWritable key,
StockDbWritable value,
OutputCollector<AvroWrapper<Stock>, NullWritable> output,
Reporter reporter) throws IOException {
output.collect(
new AvroWrapper<Stock>(writableToAvro(value)),
NullWritable.get());
}
public void map(AvroWrapper<Stock> key,
NullWritable value,
OutputCollector<Text, DoubleWritable> output,
Reporter reporter) throws IOException {
output.collect(new Text(key.datum().getSymbol().toString()),
new DoubleWritable(key.datum().getOpen()));
}
public void map(AvroWrapper<GenericRecord> key,
NullWritable value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
outKey.set(
key.datum().get(SmallFilesWrite.FIELD_FILENAME).toString());
outValue.set(DigestUtils.md5Hex(
((ByteBuffer) key.datum().get(SmallFilesWrite.FIELD_CONTENTS))
.array()));
output.collect(outKey, outValue);
}
@Override
public void reduce(NullWritable key, Iterator<BloomFilter> values,
OutputCollector<AvroWrapper<GenericRecord>,
NullWritable> output,
Reporter reporter) throws IOException {
while (values.hasNext()) {
BloomFilter bf = values.next();
filter.or(bf);
System.out.println(filter);
}
collector = output;
}
@Override
public void close() throws IOException {
System.out.println(filter);
if (collector != null) {
collector.collect(
new AvroWrapper<GenericRecord>(
AvroBytesRecord.toGenericRecord(filter)),
NullWritable.get());
}
}
@Override
public void write(K2 k, V2 v) throws IOException {
GenericRecord record = fromText(k.toString() + "\t" + v.toString(), schema);
AvroWrapper<GenericRecord> wrapper = new AvroWrapper<GenericRecord>(record);
writer.append(wrapper.datum());
}
@Test
public void testDataEntries() throws IOException, InjectionException {
final JavaConfigurationBuilder builder = Tang.Factory.getTang().newConfigurationBuilder();
builder.bindNamedParameter(PathString.class, file.getAbsolutePath());
final Configuration conf = builder.build();
final Injector injector = Tang.Factory.getTang().newInjector(conf);
final ParquetReader reader = injector.getInstance(ParquetReader.class);
final byte[] byteArr = reader.serializeToByteBuffer().array();
final ByteArrayInputStream inputStream = new ByteArrayInputStream(byteArr);
final DatumReader datumReader = new GenericDatumReader<GenericRecord>();
datumReader.setSchema(reader.createAvroSchema());
final AvroKeyDeserializer deserializer
= new AvroKeyDeserializer<GenericRecord>(reader.createAvroSchema(), reader.createAvroSchema(), datumReader);
deserializer.open(inputStream);
AvroWrapper<GenericRecord> record = null;
for (int i = 0; i < 10; i = i + 1) {
record = deserializer.deserialize(record);
Assert.assertEquals("User_" + i, record.datum().get("name").toString());
Assert.assertEquals(i, record.datum().get("age"));
Assert.assertEquals("blue", record.datum().get("favorite_color").toString());
}
}
@Override
protected void map(AvroWrapper<GenericRecord> key, NullWritable value,
Context context) throws IOException, InterruptedException {
context.write(toSqoopRecord(key.datum()), NullWritable.get());
}
@Override
public void map(AvroWrapper<GenericRecord> key, NullWritable val, Context c)
throws IOException, InterruptedException {
processRecord(toSqoopRecord(key.datum()), c);
}
@Override
public AvroWrapper<T> getCurrentKey() throws IOException,
InterruptedException {
return key;
}
@Override
public List<String> call(AvroWrapper avroWrapper)
{
return getColumns(((GenericData.Record) avroWrapper.datum()).getSchema().getFields());
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.OutputFileOption.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path output = new Path(cli.getArgValueAsString(CliCommonOpts.OutputFileOption.OUTPUT));
Configuration conf = super.getConf();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://localhost/sqoop_test" +
"?user=hip_sqoop_user&password=password");
JobConf job = new JobConf(conf);
job.setJarByClass(DBImportMapReduce.class);
job.setInputFormat(DBInputFormat.class);
job.setOutputFormat(AvroOutputFormat.class);
AvroJob.setOutputSchema(job, Stock.SCHEMA$);
job.set(AvroJob.OUTPUT_CODEC, SnappyCodec.class.getName());
job.setMapperClass(Map.class);
job.setNumMapTasks(4);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(AvroWrapper.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(AvroWrapper.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, output);
DBInputFormat.setInput(
job,
StockDbWritable.class,
"select * from stocks",
"SELECT COUNT(id) FROM stocks");
RunningJob runningJob = JobClient.runJob(job);
return runningJob.isSuccessful() ? 0 : 1;
}