下面列出了org.apache.hadoop.mapred.lib.db.DBConfiguration#org.apache.avro.mapred.AvroOutputFormat 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public OutputFormat<NullWritable, Object> getOutputFormat()
throws IOException {
/**
* Hadoop output format for AvroStorage.
*/
class AvroStorageOutputFormat extends
FileOutputFormat<NullWritable, Object> {
@Override
public RecordWriter<NullWritable, Object> getRecordWriter(
final TaskAttemptContext tc) throws IOException,
InterruptedException {
return new AvroRecordWriter(
// avroStorageOutputFormatSchema,
getDefaultWorkFile(tc, AvroOutputFormat.EXT),
tc.getConfiguration());
}
}
return new AvroStorageOutputFormat();
}
@Test
public void testAvroAsTextFmt() throws IOException {
AvroAsTextOutputFormat outfmt = new AvroAsTextOutputFormat();
FileOutputFormat.setOutputPath(defaultConf, file);
RecordWriter<Text, NullWritable> writer = outfmt.getRecordWriter(file.getFileSystem(defaultConf),
defaultConf, fname, new dummyReporter());
writer.write(new Text(tsv), NullWritable.get());
writer.close(null);
FileInputFormat.setInputPaths(defaultConf, FileOutputFormat.getTaskOutputPath(defaultConf, fname +
AvroOutputFormat.EXT));
AvroAsTextInputFormat informat = new AvroAsTextInputFormat();
RecordReader<Text, Text> reader = informat.getRecordReader(informat.getSplits(defaultConf, 1)[0],
defaultConf, new dummyReporter());
Text k = new Text();
Text v = new Text();
reader.next(k, v);
Assert.assertEquals("read back tsv", tsv, k.toString() + "\t" + v.toString());
}
@Test
public void testAvroAsJsonFmt() throws IOException {
AvroAsJsonOutputFormat outfmt = new AvroAsJsonOutputFormat();
FileOutputFormat.setOutputPath(defaultConf, file2);
RecordWriter<Text, NullWritable> writer = outfmt.getRecordWriter(file2.getFileSystem(defaultConf),
defaultConf, fname2, new dummyReporter());
writer.write(new Text(json), NullWritable.get());
writer.close(null);
FileInputFormat.setInputPaths(defaultConf, FileOutputFormat.getTaskOutputPath(defaultConf, fname2 +
AvroOutputFormat.EXT));
AvroAsJsonInputFormat informat = new AvroAsJsonInputFormat();
RecordReader<Text, Text> reader = informat.getRecordReader(informat.getSplits(defaultConf, 1)[0],
defaultConf, new dummyReporter());
Text k = new Text();
Text v = new Text();
reader.next(k, v);
ObjectMapper mapper = new ObjectMapper();
JsonNode n0 = mapper.readTree(k.toString());
JsonNode n1 = mapper.readTree(json);
Assert.assertEquals("read back json", n0, n1);
}
/**
* Run an avro hadoop job with job conf
* @param conf
* @throws Exception
*/
public static void runAvroJob(JobConf conf) throws Exception
{
Path[] inputPaths = AvroInputFormat.getInputPaths(conf);
_log.info("Running hadoop job with input paths:");
for (Path inputPath : inputPaths)
{
_log.info(inputPath);
}
_log.info("Output path="+AvroOutputFormat.getOutputPath(conf));
Job job = new Job(conf);
job.setJarByClass(AvroUtils.class);
job.waitForCompletion(true);
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));
Configuration conf = super.getConf();
JobConf job = new JobConf(conf);
job.setJarByClass(BloomFilterCreator.class);
job.set(AvroJob.OUTPUT_SCHEMA, AvroBytesRecord.SCHEMA.toString());
job.set(AvroJob.OUTPUT_CODEC, SnappyCodec.class.getName());
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(AvroOutputFormat.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BloomFilter.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(BloomFilter.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
return JobClient.runJob(job).isSuccessful() ? 0 : 1;
}
/**
* Sets up various standard settings in the JobConf. You probably don't want to mess with this.
*
* @return A configured JobConf.
* @throws IOException
* @throws URISyntaxException
*/
protected JobConf createJobConf() throws IOException, URISyntaxException
{
JobConf conf = new JobConf();
conf.setJobName(getJobId());
conf.setInputFormat(AvroInputFormat.class);
conf.setOutputFormat(AvroOutputFormat.class);
AvroOutputFormat.setDeflateLevel(conf, 9);
String hadoop_ugi = _config.getString("hadoop.job.ugi", null);
if (hadoop_ugi != null)
{
conf.set("hadoop.job.ugi", hadoop_ugi);
}
if (_config.getBoolean("is.local", false))
{
conf.set("mapred.job.tracker", "local");
conf.set("fs.default.name", "file:///");
conf.set("mapred.local.dir", "/tmp/map-red");
_log.info("Running locally, no hadoop jar set.");
}
// set JVM options if present
if (_config.containsKey("mapred.child.java.opts"))
{
conf.set("mapred.child.java.opts", _config.getString("mapred.child.java.opts"));
_log.info("mapred.child.java.opts set to " + _config.getString("mapred.child.java.opts"));
}
if (_config.containsKey(INPUT_PATHS))
{
List<String> inputPathnames = _config.getStringList(INPUT_PATHS);
for (String pathname : inputPathnames)
{
AvroUtils.addAllSubPaths(conf, new Path(pathname));
}
AvroJob.setInputSchema(conf, AvroUtils.getAvroInputSchema(conf));
}
if (_config.containsKey(OUTPUT_PATH))
{
Path path = new Path(_config.get(OUTPUT_PATH));
AvroOutputFormat.setOutputPath(conf, path);
if (_config.getBoolean("force.output.overwrite", false))
{
FileSystem fs = FileOutputFormat.getOutputPath(conf).getFileSystem(conf);
fs.delete(FileOutputFormat.getOutputPath(conf), true);
}
}
// set all hadoop configs
for (String key : _config.keySet())
{
String lowerCase = key.toLowerCase();
if ( lowerCase.startsWith(HADOOP_PREFIX))
{
String newKey = key.substring(HADOOP_PREFIX.length());
conf.set(newKey, _config.get(key));
}
}
return conf;
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.OutputFileOption.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path output = new Path(cli.getArgValueAsString(CliCommonOpts.OutputFileOption.OUTPUT));
Configuration conf = super.getConf();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
"jdbc:mysql://localhost/sqoop_test" +
"?user=hip_sqoop_user&password=password");
JobConf job = new JobConf(conf);
job.setJarByClass(DBImportMapReduce.class);
job.setInputFormat(DBInputFormat.class);
job.setOutputFormat(AvroOutputFormat.class);
AvroJob.setOutputSchema(job, Stock.SCHEMA$);
job.set(AvroJob.OUTPUT_CODEC, SnappyCodec.class.getName());
job.setMapperClass(Map.class);
job.setNumMapTasks(4);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(AvroWrapper.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(AvroWrapper.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, output);
DBInputFormat.setInput(
job,
StockDbWritable.class,
"select * from stocks",
"SELECT COUNT(id) FROM stocks");
RunningJob runningJob = JobClient.runJob(job);
return runningJob.isSuccessful() ? 0 : 1;
}
/**
* The MapReduce driver - setup and launch the job.
*
* @param args the command-line arguments
* @return the process exit code
* @throws Exception if something goes wrong
*/
public int run(final String[] args) throws Exception {
Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build();
int result = cli.runCmd();
if (result != 0) {
return result;
}
Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT));
Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT));
Configuration conf = super.getConf();
JobConf job = new JobConf(conf);
job.setJarByClass(AvroMixedMapReduce.class);
job.set(AvroJob.INPUT_SCHEMA, Stock.SCHEMA$.toString());
job.set(AvroJob.OUTPUT_SCHEMA, StockAvg.SCHEMA$.toString());
job.set(AvroJob.OUTPUT_CODEC, SnappyCodec.class.getName());
job.setInputFormat(AvroInputFormat.class);
job.setOutputFormat(AvroOutputFormat.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
return JobClient.runJob(job).isSuccessful() ? 0 : 1;
}