下面列出了怎么用org.apache.hadoop.mapreduce.lib.db.DBWritable的API类实例代码及写法,或者点击链接到github查看源代码。
private static void setupHadoopConfiguration(PostgresIOTestPipelineOptions options) {
Configuration conf = new Configuration();
DBConfiguration.configureDB(
conf,
"org.postgresql.Driver",
DatabaseTestHelper.getPostgresDBUrl(options),
options.getPostgresUsername(),
options.getPostgresPassword());
conf.set(DBConfiguration.INPUT_TABLE_NAME_PROPERTY, tableName);
conf.setStrings(DBConfiguration.INPUT_FIELD_NAMES_PROPERTY, "id", "name");
conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, "id ASC");
conf.setClass(DBConfiguration.INPUT_CLASS_PROPERTY, TestRowDBWritable.class, DBWritable.class);
conf.setClass("key.class", LongWritable.class, Object.class);
conf.setClass("value.class", TestRowDBWritable.class, Object.class);
conf.setClass("mapreduce.job.inputformat.class", DBInputFormat.class, InputFormat.class);
conf.set(DBConfiguration.OUTPUT_TABLE_NAME_PROPERTY, tableName);
conf.set(DBConfiguration.OUTPUT_FIELD_COUNT_PROPERTY, "2");
conf.setStrings(DBConfiguration.OUTPUT_FIELD_NAMES_PROPERTY, "id", "name");
conf.setClass(HadoopFormatIO.OUTPUT_KEY_CLASS, TestRowDBWritable.class, Object.class);
conf.setClass(HadoopFormatIO.OUTPUT_VALUE_CLASS, NullWritable.class, Object.class);
conf.setClass(
HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR, DBOutputFormat.class, OutputFormat.class);
conf.set(HadoopFormatIO.JOB_ID, String.valueOf(1));
hadoopConfiguration = new SerializableConfiguration(conf);
}
/**
*
* @param job
* @param inputClass DBWritable class
* @param tableName Input table name
* @param conditions Condition clause to be added to the WHERE clause.
* @param fieldNames fields being projected for the SELECT query.
*/
public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName , final String conditions, final String... fieldNames) {
job.setInputFormatClass(PhoenixInputFormat.class);
final Configuration configuration = job.getConfiguration();
PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
PhoenixConfigurationUtil.setSelectColumnNames(configuration,fieldNames);
PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.TABLE);
}
/**
*
* @param job
* @param inputClass DBWritable class
* @param tableName Input table name
* @param inputQuery Select query.
*/
public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName, final String inputQuery) {
job.setInputFormatClass(PhoenixInputFormat.class);
final Configuration configuration = job.getConfiguration();
PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
}
/**
*
* @param job
* @param inputClass DBWritable class
* @param tableName Input table name
* @param conditions Condition clause to be added to the WHERE clause. Can be <tt>null</tt> if there are no conditions.
* @param fieldNames fields being projected for the SELECT query.
*/
public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName,
final String conditions, final String... fieldNames) {
final Configuration configuration = setInput(job, inputClass, tableName);
if(conditions != null) {
PhoenixConfigurationUtil.setInputTableConditions(configuration, conditions);
}
PhoenixConfigurationUtil.setSelectColumnNames(configuration, fieldNames);
}
/**
*
* @param job
* @param inputClass DBWritable class
* @param inputFormatClass InputFormat class
* @param tableName Input table name
* @param inputQuery Select query
*/
public static void setInput(final Job job, final Class<? extends DBWritable> inputClass,
final Class<? extends InputFormat> inputFormatClass,
final String tableName, final String inputQuery) {
final Configuration configuration = setInput(job, inputClass, inputFormatClass, tableName);
PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
}
/**
*
* @param job
* @param inputClass DBWritable class
* @param snapshotName The name of a snapshot (of a table) to read from
* @param tableName Input table name
* @param restoreDir a temporary dir to copy the snapshot files into
* @param inputQuery The select query
*/
public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String snapshotName, String tableName,
Path restoreDir, String inputQuery) throws
IOException {
final Configuration configuration = setSnapshotInput(job, inputClass, snapshotName, tableName, restoreDir, SchemaType.QUERY);
if(inputQuery != null) {
PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
}
}
/**
*
* @param job
* @param inputClass DBWritable class
* @param snapshotName The name of a snapshot (of a table) to read from
* @param tableName Input table name
* @param restoreDir a temporary dir to copy the snapshot files into
*/
private static Configuration setSnapshotInput(Job job, Class<? extends DBWritable> inputClass, String snapshotName,
String tableName, Path restoreDir, SchemaType schemaType) {
job.setInputFormatClass(PhoenixInputFormat.class);
final Configuration configuration = job.getConfiguration();
PhoenixConfigurationUtil.setInputClass(configuration, inputClass);
PhoenixConfigurationUtil.setSnapshotNameKey(configuration, snapshotName);
PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
PhoenixConfigurationUtil.setRestoreDirKey(configuration, new Path(restoreDir, UUID.randomUUID().toString()).toString());
PhoenixConfigurationUtil.setSchemaType(configuration, schemaType);
return configuration;
}
private static Configuration setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName){
job.setInputFormatClass(PhoenixInputFormat.class);
final Configuration configuration = job.getConfiguration();
PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
return configuration;
}
private static Configuration setInput(final Job job, final Class<? extends DBWritable> inputClass,
final Class<? extends InputFormat> inputFormatClass, final String tableName){
job.setInputFormatClass(inputFormatClass);
final Configuration configuration = job.getConfiguration();
PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
return configuration;
}
public static void setInputClass(final Configuration configuration, Class<? extends DBWritable> inputClass) {
Preconditions.checkNotNull(configuration);
configuration.setClass(INPUT_CLASS ,inputClass,DBWritable.class);
}
public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String snapshotName, String tableName,
Path restoreDir) {
setSnapshotInput(job, inputClass, snapshotName, tableName, restoreDir, SchemaType.QUERY);
}
public static void setInputClass(final Configuration configuration, Class<? extends DBWritable> inputClass) {
Preconditions.checkNotNull(configuration);
configuration.setClass(INPUT_CLASS ,inputClass,DBWritable.class);
}
/**
*
* @param job MR job instance
* @param inputClass DBWritable class
* @param inputFormatClass InputFormat class
* @param tableName Input table name
* @param conditions Condition clause to be added to the WHERE clause.
* Can be <tt>null</tt> if there are no conditions.
* @param fieldNames fields being projected for the SELECT query.
*/
public static void setInput(final Job job, final Class<? extends DBWritable> inputClass,
final Class<? extends InputFormat> inputFormatClass, final String tableName,
final String conditions, final String... fieldNames) {
final Configuration configuration = setInput(job, inputClass, inputFormatClass, tableName);
if(conditions != null) {
PhoenixConfigurationUtil.setInputTableConditions(configuration, conditions);
}
PhoenixConfigurationUtil.setSelectColumnNames(configuration, fieldNames);
}
/**
*
* @param job
* @param inputClass DBWritable class
* @param snapshotName The name of a snapshot (of a table) to read from
* @param tableName Input table name
* @param restoreDir a temporary dir to copy the snapshot files into
* @param conditions Condition clause to be added to the WHERE clause. Can be <tt>null</tt> if there are no conditions.
* @param fieldNames fields being projected for the SELECT query.
*/
public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String snapshotName, String tableName,
Path restoreDir, final String conditions, final String... fieldNames) throws
IOException {
final Configuration configuration = setSnapshotInput(job, inputClass, snapshotName, tableName, restoreDir, SchemaType.QUERY);
if(conditions != null) {
PhoenixConfigurationUtil.setInputTableConditions(configuration, conditions);
}
PhoenixConfigurationUtil.setSelectColumnNames(configuration, fieldNames);
}
/**
*
* @param job
* @param inputClass DBWritable class
* @param tableName Input table name
* @param inputQuery Select query.
*/
public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName, final String inputQuery) {
final Configuration configuration = setInput(job, inputClass, tableName);
PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
}