下面列出了怎么用org.apache.hadoop.hbase.client.HTablePool的API类实例代码及写法,或者点击链接到github查看源代码。
private void initConfiguration() {
if (clusterConfig.get(HbaseConf.cluster_name) == null || "".equals(clusterConfig.get(HbaseConf.cluster_name))) {
throw new IllegalArgumentException("cluster name can not be null or ''!");
}
clusterName = clusterConfig.get(HbaseConf.cluster_name);
Configuration conf = HBaseConfiguration.create();
conf.set(HbaseConf.hbase_quorum, clusterConfig.get(HbaseConf.hbase_quorum));
conf.set(HbaseConf.hbase_clientPort, clusterConfig.get(HbaseConf.hbase_clientPort));
if (null != clusterConfig.get(HbaseConf.hbase_znode_parent)) {
conf.set(HbaseConf.hbase_znode_parent, clusterConfig.get(HbaseConf.hbase_znode_parent));
}
conf.set("hbase.client.retries.number", "5");
conf.set("hbase.client.pause", "200");
conf.set("ipc.ping.interval", "3000");
conf.setBoolean("hbase.ipc.client.tcpnodelay", true);
if (this.checkConfiguration(clusterConfig.get(HbaseConf.cluster_name), conf)) {
conficuration = conf;
tablePool = new HTablePool(conf, 100);
}
}
private static HTableInterface getTableFromSingletonPool(RegionCoprocessorEnvironment env, byte[] tableName) throws IOException {
// It's ok to not ever do a pool.close() as we're storing a single
// table only. The HTablePool holds no other resources that this table
// which will be closed itself when it's no longer needed.
@SuppressWarnings("resource")
HTablePool pool = new HTablePool(env.getConfiguration(),1);
try {
return pool.getTable(tableName);
} catch (RuntimeException t) {
// handle cases that an IOE is wrapped inside a RuntimeException like HTableInterface#createHTableInterface
if(t.getCause() instanceof IOException) {
throw (IOException)t.getCause();
} else {
throw t;
}
}
}
/**
* The constructor will start by registering the schemas with the meta store
* table in HBase, and create the required tables to run.
*/
public UserProfileExample() throws InterruptedException {
Configuration conf = HBaseConfiguration.create();
HTablePool pool = new HTablePool(conf, 10);
SchemaManager schemaManager = new DefaultSchemaManager(pool);
registerSchemas(conf, schemaManager);
userProfileDao = new SpecificAvroDao<UserProfileModel>(pool,
"kite_example_user_profiles", "UserProfileModel", schemaManager);
userActionsDao = new SpecificAvroDao<UserActionsModel>(pool,
"kite_example_user_profiles", "UserActionsModel", schemaManager);
userProfileActionsDao = SpecificAvroDao.buildCompositeDaoWithEntityManager(
pool, "kite_example_user_profiles", UserProfileActionsModel.class,
schemaManager);
}
/**
* Create a CompositeDao, which will return SpecificRecord instances
* in a Map container.
*
* @param tablePool
* An HTablePool instance to use for connecting to HBase
* @param tableName
* The table name this dao will read from and write to
* @param keySchemaString
* The Avro schema string that represents the StorageKey structure for row
* keys in this table.
* @param subEntitySchemaStrings
* The list of entities that make up the composite.
* @param keyClass
* The class of the SpecificRecord representing the StorageKey of rows this
* dao will fetch.
* @return The CompositeDao instance.
* @throws SchemaNotFoundException
* @throws ValidationException
*/
@SuppressWarnings("unchecked")
public static <K extends SpecificRecord, S extends SpecificRecord> Dao<
Map<String, S>> buildCompositeDao(
HTablePool tablePool, String tableName,
List<String> subEntitySchemaStrings) {
List<EntityMapper<S>> entityMappers = new ArrayList<EntityMapper<S>>();
for (String subEntitySchemaString : subEntitySchemaStrings) {
AvroEntitySchema subEntitySchema = parser
.parseEntitySchema(subEntitySchemaString);
Class<S> subEntityClass;
try {
subEntityClass = (Class<S>) Class.forName(subEntitySchema
.getAvroSchema().getFullName());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
entityMappers.add(SpecificAvroDao.<S> buildEntityMapper(
subEntitySchemaString, subEntitySchemaString,
subEntityClass));
}
return new SpecificMapCompositeAvroDao<S>(tablePool, tableName, entityMappers);
}
/**
* Create a CompositeDao, which will return SpecificRecord instances
* in a Map container.
*
* @param tablePool
* An HTablePool instance to use for connecting to HBase.
* @param tableName
* The table name of the managed schema.
* @param subEntityClasses
* The classes that make up the subentities.
* @param schemaManager
* The SchemaManager which will use to create the entity mapper that
* will power this dao.
* @return The CompositeDao instance.
* @throws SchemaNotFoundException
*/
public static <K extends SpecificRecord, S extends SpecificRecord> Dao<Map<String, S>> buildCompositeDaoWithEntityManager(
HTablePool tablePool, String tableName, List<Class<S>> subEntityClasses,
SchemaManager schemaManager) {
List<EntityMapper<S>> entityMappers = new ArrayList<EntityMapper<S>>();
for (Class<S> subEntityClass : subEntityClasses) {
String entityName = getSchemaFromEntityClass(subEntityClass).getName();
entityMappers.add(new VersionedAvroEntityMapper.Builder()
.setSchemaManager(schemaManager).setTableName(tableName)
.setEntityName(entityName).setSpecific(true)
.<S> build());
}
return new SpecificMapCompositeAvroDao<S>(tablePool, tableName,
entityMappers);
}
public SpecificCompositeAvroDao(HTablePool tablePool, String tableName,
List<EntityMapper<S>> entityMappers, Class<E> entityClass) {
super(tablePool, tableName, entityMappers);
this.entityClass = entityClass;
try {
entityConstructor = entityClass.getConstructor();
entitySchema = (Schema) entityClass.getDeclaredField("SCHEMA$").get(
null);
} catch (Throwable e) {
LOG.error(
"Error getting constructor or schema field for entity of type: "
+ entityClass.getName(), e);
throw new DatasetException(e);
}
}
/**
* Checks an HTable out of the HTablePool and modifies it to take advantage of
* batch puts. This is very useful when performing many consecutive puts.
*
* @param clientTemplate
* The client template to use
* @param entityMapper
* The EntityMapper to use for mapping
* @param pool
* The HBase table pool
* @param tableName
* The name of the HBase table
* @param writeBufferSize
* The batch buffer size in bytes.
*/
public BaseEntityBatch(HBaseClientTemplate clientTemplate,
EntityMapper<E> entityMapper, HTablePool pool, String tableName,
long writeBufferSize) {
this.table = pool.getTable(tableName);
this.table.setAutoFlush(false);
this.clientTemplate = clientTemplate;
this.entityMapper = entityMapper;
this.state = ReaderWriterState.NEW;
/**
* If the writeBufferSize is less than the currentBufferSize, then the
* buffer will get flushed automatically by HBase. This should never happen,
* since we're getting a fresh table out of the pool, and the writeBuffer
* should be empty.
*/
try {
table.setWriteBufferSize(writeBufferSize);
} catch (IOException e) {
throw new DatasetIOException("Error flushing commits for table ["
+ table + "]", e);
}
}
public SpecificMapCompositeAvroDao(HTablePool tablePool, String tableName,
List<EntityMapper<S>> entityMappers) {
super(tablePool, tableName, entityMappers);
subEntitySchemas = Lists.newArrayList();
for (EntityMapper<S> entityMapper : entityMappers) {
subEntitySchemas.add(parser.parseEntitySchema(entityMapper.getEntitySchema().getRawSchema()).getAvroSchema());
}
}
@BeforeClass
public static void beforeClass() throws Exception {
HTablePool tablePool = HBaseTestUtils.startHBaseAndGetPool();
// managed table should be created by HBaseDatasetRepository
HBaseTestUtils.util.deleteTable(Bytes.toBytes(managedTableName));
SchemaManager schemaManager = new DefaultSchemaManager(tablePool);
HBaseAdmin admin = new HBaseAdmin(HBaseTestUtils.getConf());
provider = new HBaseMetadataProvider(admin, schemaManager);
}
@Before
public void before() throws Exception {
tableName = UUID.randomUUID().toString().substring(0, 8);
tablePool = new HTablePool(HBaseTestUtils.getConf(), 10);
manager = new DefaultSchemaManager(tablePool);
tool = new SchemaTool(new HBaseAdmin(HBaseTestUtils.getConf()),
manager);
}
@Before
public void before() throws Exception {
tablePool = new HTablePool(HBaseTestUtils.getConf(), 10);
SchemaTool tool = new SchemaTool(new HBaseAdmin(HBaseTestUtils.getConf()),
new DefaultSchemaManager(tablePool));
tool.createOrMigrateSchema(tableName, testRecord, true);
tool.createOrMigrateSchema(tableName, testRecordv2, true);
tool.createOrMigrateSchema(compositeTableName, compositeSubrecord1, true);
tool.createOrMigrateSchema(compositeTableName, compositeSubrecord2, true);
tool.createOrMigrateSchema(incrementTableName, testIncrement, true);
manager = new DefaultSchemaManager(tablePool);
}
public static SchemaManager initializeSchemaManager(
HTablePool tablePool, String directory) throws Exception {
SchemaManager entityManager = new DefaultSchemaManager(
tablePool);
SchemaTool schemaTool = new SchemaTool(new HBaseAdmin(getConf()),
entityManager);
schemaTool.createOrMigrateSchemaDirectory(directory, true);
return entityManager;
}
@SuppressWarnings("deprecation")
public IndexedRegion(final Path basedir, final HLog log, final FileSystem fs,
final Configuration conf, final HRegionInfo regionInfo,
final FlushRequester flushListener) throws IOException {
super(basedir, log, fs, conf, regionInfo, flushListener);
this.indexTableDescriptor = new IndexedTableDescriptor(
regionInfo.getTableDesc());
this.conf = conf;
this.tablePool = new HTablePool();
}
private EagleConfigFactory(){
init();
this.pool = new HTablePool(this.hbaseConf, 10);
}
private EagleConfigFactory() {
init();
if (this.getStorageType() == null || this.getStorageType().equalsIgnoreCase("hbase")) {
this.pool = new HTablePool(this.hbaseConf, 10);
}
}
public void initFromStormConf(Map stormConf) {
logger.info("init hbase client.");
Configuration conf = makeConf(stormConf);
hTablePool = new HTablePool(conf, TABLE_POOL_SIZE);
logger.info("finished init hbase client.");
}
HBaseDatasetRepository(HBaseAdmin hBaseAdmin, HTablePool tablePool, URI repositoryUri) {
this.tablePool = tablePool;
this.schemaManager = new DefaultSchemaManager(tablePool);
this.metadataProvider = new HBaseMetadataProvider(hBaseAdmin, schemaManager);
this.repositoryUri = repositoryUri;
}
public ManagedSchemaHBaseDao(HTablePool tablePool, String managedSchemaTable) {
managedSchemaDao = new SpecificAvroDao<ManagedSchema>(tablePool,
managedSchemaTable, managedSchemaEntity.getRawSchema(),
ManagedSchema.class);
}
public DefaultSchemaManager(HTablePool tablePool, String managedSchemaTable) {
this(new ManagedSchemaHBaseDao(tablePool, managedSchemaTable));
}
/**
* Create a CompositeDao, which will return SpecificRecord instances
* represented by the entitySchemaString avro schema. This avro schema must be
* a composition of the schemas in the subEntitySchemaStrings list.
*
* @param tablePool
* An HTablePool instance to use for connecting to HBase
* @param tableName
* The table name this dao will read from and write to
* @param keySchemaString
* The Avro schema string that represents the StorageKey structure for row
* keys in this table.
* @param subEntitySchemaStrings
* The list of entities that make up the composite. This list must be
* in the same order as the fields defined in the entitySchemaString.
* @param keyClass
* The class of the SpecificRecord representing the StorageKey of rows this
* dao will fetch.
* @param entityClass
* The class of the SpecificRecord this DAO will persist and fetch.
* @return The CompositeDao instance.
* @throws SchemaNotFoundException
* @throws ValidationException
*/
@SuppressWarnings("unchecked")
public static <E extends SpecificRecord, S extends SpecificRecord> Dao<E> buildCompositeDao(
HTablePool tablePool, String tableName,
List<String> subEntitySchemaStrings, Class<E> entityClass) {
List<EntityMapper<S>> entityMappers = new ArrayList<EntityMapper<S>>();
for (String subEntitySchemaString : subEntitySchemaStrings) {
AvroEntitySchema subEntitySchema = parser
.parseEntitySchema(subEntitySchemaString);
Class<S> subEntityClass;
try {
subEntityClass = (Class<S>) Class.forName(subEntitySchema
.getAvroSchema().getFullName());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
entityMappers.add(SpecificAvroDao.<S> buildEntityMapper(
subEntitySchemaString, subEntitySchemaString, subEntityClass));
}
return new SpecificCompositeAvroDao<E, S>(tablePool, tableName,
entityMappers, entityClass);
}
/**
* Create a CompositeDao, which will return SpecificRecord instances
* represented by the entitySchemaString avro schema. This avro schema must be
* a composition of the schemas in the subEntitySchemaStrings list.
*
* @param tablePool
* An HTabePool instance to use for connecting to HBase.
* @param tableName
* The table name of the managed schema.
* @param entityClass
* The class that is the composite record, which is made up of fields
* referencing the sub records.
* @param schemaManager
* The SchemaManager which will use to create the entity mapper that
* will power this dao.
* @return The CompositeDao instance.
* @throws SchemaNotFoundException
*/
public static <K extends SpecificRecord, E extends SpecificRecord, S extends SpecificRecord> Dao<E> buildCompositeDaoWithEntityManager(
HTablePool tablePool, String tableName, Class<E> entityClass,
SchemaManager schemaManager) {
Schema entitySchema = getSchemaFromEntityClass(entityClass);
List<EntityMapper<S>> entityMappers = new ArrayList<EntityMapper<S>>();
for (Schema.Field field : entitySchema.getFields()) {
entityMappers.add(new VersionedAvroEntityMapper.Builder()
.setSchemaManager(schemaManager).setTableName(tableName)
.setEntityName(getSchemaName(field.schema())).setSpecific(true)
.<S> build());
}
return new SpecificCompositeAvroDao<E, S>(tablePool, tableName,
entityMappers, entityClass);
}
public Builder(HTablePool tablePool, String tableName,
EntityMapper<E> entityMapper) {
super(tablePool, tableName, entityMapper);
}
@Before
public void beforeTest() throws Exception {
tablePool = new HTablePool(HBaseTestUtils.getConf(), 10);
}
@Before
public void beforeTest() throws Exception {
HBaseTestUtils.util.truncateTable(Bytes.toBytes(tableName));
tablePool = new HTablePool(HBaseTestUtils.getConf(), 10);
}
public static HTablePool startHBaseAndGetPool() throws Exception {
getMiniCluster();
return new HTablePool(getConf(), 10);
}
/**
* Construct a GenericAvroDao.
*
* @param tablePool
* An HTablePool instance to use for connecting to HBase.
* @param tableName
* The name of the table this Dao will read from and write to in
* HBase.
* @param keySchemaStr
* The Avro schema that represents the StorageKey structure for row keys in
* this table.
* @param entitySchemaStream
* The InputStream that contains a json string representing the
* special avro record schema, that contains metadata in annotations
* of the Avro record fields. See {@link AvroEntityMapper} for
* details.
*/
public GenericAvroDao(HTablePool tablePool, String tableName,
InputStream entitySchemaStream) {
super(tablePool, tableName, buildEntityMapper(AvroUtils
.inputStreamToString(entitySchemaStream)));
}
/**
* Construct the GenericAvroDao with an EntityManager, which will provide the
* entity mapper to this Dao that knows how to map the different entity schema
* versions defined by the managed schema. The entitySchemaString parameter
* represents the schema to use for writes.
*
* @param tablePool
* An HTabePool instance to use for connecting to HBase.
* @param tableName
* The table name of the managed schema.
* @param entityName
* The entity name of the managed schema.
* @param schemaManager
* The EntityManager which will create the entity mapper that will
* power this dao.
* @param entitySchemaString
* The schema as a string representing the schema version that this
* DAO should use for writes.
*/
public GenericAvroDao(HTablePool tablePool, String tableName,
String entityName, SchemaManager schemaManager, String entitySchemaString) {
super(tablePool, tableName, new VersionedAvroEntityMapper.Builder()
.setSchemaManager(schemaManager).setTableName(tableName)
.setEntityName(entityName).setSpecific(false)
.setGenericSchemaString(entitySchemaString).<GenericRecord> build());
}
/**
* Construct the GenericAvroDao with an EntityManager, which will provide the
* entity mapper to this Dao that knows how to map the different entity schema
* versions defined by the managed schema. The newest schema version available
* at the time of this dao's creation will be used for writes.
*
* @param tablePool
* An HTabePool instance to use for connecting to HBase.
* @param tableName
* The table name of the managed schema.
* @param entityName
* The entity name of the managed schema.
* @param schemaManager
* The SchemaManager which will create the entity mapper that will
* power this dao.
*/
public GenericAvroDao(HTablePool tablePool, String tableName,
String entityName, SchemaManager schemaManager) {
super(tablePool, tableName, new VersionedAvroEntityMapper.Builder()
.setSchemaManager(schemaManager).setTableName(tableName)
.setEntityName(entityName).setSpecific(false).<GenericRecord> build());
}
/**
* Construct the SpecificAvroDao.
*
* @param tablePool
* An HTablePool instance to use for connecting to HBase.
* @param tableName
* The name of the table this Dao will read from and write to.
* @param keySchemaString
* The Avro schema string that represents the StorageKey structure for row
* keys in this table.
* @param entitySchemaString
* The json string representing the special avro record schema, that
* contains metadata in annotations of the Avro record fields. See
* {@link AvroEntityMapper} for details.
* @param keyClass
* The class of the SpecificRecord this DAO will use as a key
* @param entityClass
* The class of the SpecificRecord this DAO will persist and fetch.
*/
public SpecificAvroDao(HTablePool tablePool, String tableName,
String entitySchemaString, Class<E> entityClass) {
super(tablePool, tableName, buildEntityMapper(entitySchemaString,
entitySchemaString, entityClass));
}
/**
* Construct the SpecificAvroDao.
*
* @param tablePool
* An HTablePool instance to use for connecting to HBase.
* @param tableName
* The name of the table this Dao will read from and write to.
* @param keySchemaStream
* The json stream representing the avro schema for the key.
* @param entitySchemaStream
* The json stream representing the special avro record schema, that
* contains metadata in annotations of the Avro record fields. See
* {@link AvroEntityMapper} for details.
* @param keyClass
* The class of the SpecificRecord this DAO will use as a key
* @param entityClass
* The class of the SpecificRecord this DAO will persist and fetch.
*/
public SpecificAvroDao(HTablePool tablePool, String tableName,
InputStream entitySchemaStream, Class<E> entityClass) {
this(tablePool, tableName, AvroUtils
.inputStreamToString(entitySchemaStream), entityClass);
}