下面列出了org.apache.hadoop.mapreduce.lib.db.DBWritable#org.apache.hadoop.mapreduce.lib.db.DBConfiguration 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Validates that the mandatory configuration properties such as InputFormat class, InputFormat
* key and value classes are provided in the Hadoop configuration. In case of using {@code
* DBInputFormat} you need to order results by one or more keys. It can be done by setting
* configuration option "mapreduce.jdbc.input.orderby".
*/
private void validateConfiguration(Configuration configuration) {
checkArgument(configuration != null, "configuration can not be null");
checkArgument(
configuration.get("mapreduce.job.inputformat.class") != null,
"Configuration must contain \"mapreduce.job.inputformat.class\"");
checkArgument(
configuration.get("key.class") != null, "configuration must contain \"key.class\"");
checkArgument(
configuration.get("value.class") != null, "configuration must contain \"value.class\"");
if (configuration.get("mapreduce.job.inputformat.class").endsWith("DBInputFormat")) {
checkArgument(
configuration.get(DBConfiguration.INPUT_ORDER_BY_PROPERTY) != null,
"Configuration must contain \""
+ DBConfiguration.INPUT_ORDER_BY_PROPERTY
+ "\" when using DBInputFormat");
}
}
private static void setupHadoopConfiguration(PostgresIOTestPipelineOptions options) {
Configuration conf = new Configuration();
DBConfiguration.configureDB(
conf,
"org.postgresql.Driver",
DatabaseTestHelper.getPostgresDBUrl(options),
options.getPostgresUsername(),
options.getPostgresPassword());
conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name");
conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, "id ASC");
conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, TestRowDBWritable.class, DBWritable.class);
conf.setClass("key.class", LongWritable.class, Object.class);
conf.setClass("value.class", TestRowDBWritable.class, Object.class);
conf.setClass("mapreduce.job.inputformat.class", DBInputFormat.class, InputFormat.class);
conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
conf.set(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, "2");
conf.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, "id", "name");
conf.setClass(HadoopFormatIO.OUTPUT_KEY_CLASS, TestRowDBWritable.class, Object.class);
conf.setClass(HadoopFormatIO.OUTPUT_VALUE_CLASS, NullWritable.class, Object.class);
conf.setClass(
HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR, DBOutputFormat.class, OutputFormat.class);
conf.set(HadoopFormatIO.JOB_ID, String.valueOf(1));
hadoopConfiguration = new SerializableConfiguration(conf);
}
@Override
//Usage DBCountPageView [driverClass dburl]
public int run(String[] args) throws Exception {
String driverClassName = DRIVER_CLASS;
String url = DB_URL;
if(args.length > 1) {
driverClassName = args[0];
url = args[1];
}
initialize(driverClassName, url);
Configuration conf = getConf();
DBConfiguration.configureDB(conf, driverClassName, url);
Job job = new Job(conf);
job.setJobName("Count Pageviews of URLs");
job.setJarByClass(DBCountPageView.class);
job.setMapperClass(PageviewMapper.class);
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(PageviewReducer.class);
DBInputFormat.setInput(job, AccessRecord.class, "Access"
, null, "url", AccessFieldNames);
DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(PageviewRecord.class);
job.setOutputValueClass(NullWritable.class);
int ret;
try {
ret = job.waitForCompletion(true) ? 0 : 1;
boolean correct = verify();
if(!correct) {
throw new RuntimeException("Evaluation was not correct!");
}
} finally {
shutdown();
}
return ret;
}
@Override
//Usage DBCountPageView [driverClass dburl]
public int run(String[] args) throws Exception {
String driverClassName = DRIVER_CLASS;
String url = DB_URL;
if(args.length > 1) {
driverClassName = args[0];
url = args[1];
}
initialize(driverClassName, url);
Configuration conf = getConf();
DBConfiguration.configureDB(conf, driverClassName, url);
Job job = new Job(conf);
job.setJobName("Count Pageviews of URLs");
job.setJarByClass(DBCountPageView.class);
job.setMapperClass(PageviewMapper.class);
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(PageviewReducer.class);
DBInputFormat.setInput(job, AccessRecord.class, "Access"
, null, "url", AccessFieldNames);
DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(PageviewRecord.class);
job.setOutputValueClass(NullWritable.class);
int ret;
try {
ret = job.waitForCompletion(true) ? 0 : 1;
boolean correct = verify();
if(!correct) {
throw new RuntimeException("Evaluation was not correct!");
}
} finally {
shutdown();
}
return ret;
}