下面列出了怎么用org.apache.hadoop.hbase.mapreduce.ResultSerialization的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job configuration to adjust.
* @param partitioner Partitioner to use. Pass <code>null</code> to use
* default partitioner.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
* @throws IOException When determining the region count fails.
*/
public static void initTableReduceJob(String table,
Class<? extends TableReduce> reducer, JobConf job, Class partitioner,
boolean addDependencyJars) throws IOException {
job.setOutputFormat(TableOutputFormat.class);
job.setReducerClass(reducer);
job.set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setStrings("io.serializations", job.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
if (partitioner == HRegionPartitioner.class) {
job.setPartitionerClass(HRegionPartitioner.class);
int regions =
MetaTableAccessor.getRegionCount(HBaseConfiguration.create(job), TableName.valueOf(table));
if (job.getNumReduceTasks() > regions) {
job.setNumReduceTasks(regions);
}
} else if (partitioner != null) {
job.setPartitionerClass(partitioner);
}
if (addDependencyJars) {
addDependencyJars(job);
}
initCredentials(job);
}
static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator,
Class<? extends OutputFormat<?, ?>> cls) throws IOException, UnsupportedEncodingException {
Configuration conf = job.getConfiguration();
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(cls);
// Based on the configured map output class, set the correct reducer to properly
// sort the incoming values.
// TODO it would be nice to pick one or the other of these formats.
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(PutSortReducer.class);
} else if (Text.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(TextSortReducer.class);
} else {
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}
conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(),
ResultSerialization.class.getName(), KeyValueSerialization.class.getName());
// Use table's region boundaries for TOP split points.
LOG.info("Looking up current regions for table " + tableDescriptor.getTableName());
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count");
job.setNumReduceTasks(startKeys.size());
configurePartitioner(job, startKeys);
// Set compression algorithms based on column families
configureCompression(conf, tableDescriptor);
configureBloomType(tableDescriptor, conf);
configureBlockSize(tableDescriptor, conf);
configureDataBlockEncoding(tableDescriptor, conf);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
}
/**
* Use this before submitting a TableMap job. It will
* appropriately set up the JobConf.
*
* @param table The table name to read from.
* @param columns The columns to scan.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job configuration to adjust.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
*/
public static void initTableMapJob(String table, String columns,
Class<? extends TableMap> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, JobConf job, boolean addDependencyJars,
Class<? extends InputFormat> inputFormat) {
job.setInputFormat(inputFormat);
job.setMapOutputValueClass(outputValueClass);
job.setMapOutputKeyClass(outputKeyClass);
job.setMapperClass(mapper);
job.setStrings("io.serializations", job.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
FileInputFormat.addInputPaths(job, table);
job.set(TableInputFormat.COLUMN_LIST, columns);
if (addDependencyJars) {
try {
addDependencyJars(job);
} catch (IOException e) {
LOG.error("IOException encountered while adding dependency jars", e);
}
}
try {
initCredentials(job);
} catch (IOException ioe) {
// just spit out the stack trace? really?
LOG.error("IOException encountered while initializing credentials", ioe);
}
}
static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator,
Class<? extends OutputFormat<?, ?>> cls) throws IOException, UnsupportedEncodingException {
Configuration conf = job.getConfiguration();
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(cls);
// Based on the configured map output class, set the correct reducer to properly
// sort the incoming values.
// TODO it would be nice to pick one or the other of these formats.
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(PutSortReducer.class);
} else if (Text.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(TextSortReducer.class);
} else {
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}
conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(),
ResultSerialization.class.getName(), KeyValueSerialization.class.getName());
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
// record this table name for creating writer by favored nodes
LOG.info("bulkload locality sensitive enabled");
conf.set(OUTPUT_TABLE_NAME_CONF_KEY, regionLocator.getName().getNameAsString());
}
// Use table's region boundaries for TOP split points.
LOG.info("Looking up current regions for table " + tableDescriptor.getTableName());
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count");
job.setNumReduceTasks(startKeys.size());
configurePartitioner(job, startKeys);
// Set compression algorithms based on column families
configureCompression(conf, tableDescriptor);
configureBloomType(tableDescriptor, conf);
configureBlockSize(tableDescriptor, conf);
configureDataBlockEncoding(tableDescriptor, conf);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
}
/**
* Configures the job for MultiHfileOutputFormat.
* @param job
* @param tablesToBeLoaded
* @throws IOException
*/
@SuppressWarnings("deprecation")
public static void configureIncrementalLoad(Job job, List<TargetTableRef> tablesToBeLoaded) throws IOException {
Configuration conf = job.getConfiguration();
job.setOutputFormatClass(MultiHfileOutputFormat.class);
conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
KeyValueSerialization.class.getName());
// tableStartKeys for all tables.
Set<TableRowkeyPair> tablesStartKeys = Sets.newTreeSet();
for(TargetTableRef table : tablesToBeLoaded) {
final String tableName = table.getPhysicalName();
try(Connection hbaseConn = ConnectionFactory.createConnection(conf);){
Set<TableRowkeyPair> startKeys =
getRegionStartKeys(tableName,
hbaseConn.getRegionLocator(TableName.valueOf(tableName)));
tablesStartKeys.addAll(startKeys);
TableDescriptor tableDescriptor = hbaseConn.getTable(TableName.valueOf(tableName)).getDescriptor();
String compressionConfig = configureCompression(tableDescriptor);
String bloomTypeConfig = configureBloomType(tableDescriptor);
String blockSizeConfig = configureBlockSize(tableDescriptor);
String blockEncodingConfig = configureDataBlockEncoding(tableDescriptor);
Map<String,String> tableConfigs = Maps.newHashMap();
if(StringUtils.isNotBlank(compressionConfig)) {
tableConfigs.put(COMPRESSION_FAMILIES_CONF_KEY, compressionConfig);
}
if(StringUtils.isNotBlank(bloomTypeConfig)) {
tableConfigs.put(BLOOM_TYPE_FAMILIES_CONF_KEY,bloomTypeConfig);
}
if(StringUtils.isNotBlank(blockSizeConfig)) {
tableConfigs.put(BLOCK_SIZE_FAMILIES_CONF_KEY,blockSizeConfig);
}
if(StringUtils.isNotBlank(blockEncodingConfig)) {
tableConfigs.put(DATABLOCK_ENCODING_FAMILIES_CONF_KEY,blockEncodingConfig);
}
table.setConfiguration(tableConfigs);
final String tableDefns = TargetTableRefFunctions.TO_JSON.apply(table);
// set the table definition in the config to be used during the RecordWriter..
conf.set(tableName, tableDefns);
TargetTableRef tbl = TargetTableRefFunctions.FROM_JSON.apply(tableDefns);
LOGGER.info(" the table logical name is "+ tbl.getLogicalName());
}
}
LOGGER.info("Configuring " + tablesStartKeys.size() + " reduce partitions to match current region count");
job.setNumReduceTasks(tablesStartKeys.size());
configurePartitioner(job, tablesStartKeys);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
}