下面列出了怎么用org.apache.hadoop.hbase.client.ConnectionFactory的API类实例代码及写法,或者点击链接到github查看源代码。
private void connectToTable() {
if (this.conf == null) {
this.conf = HBaseConfiguration.create();
}
try {
Connection conn = ConnectionFactory.createConnection(conf);
super.table = (HTable) conn.getTable(TableName.valueOf(tableName));
} catch (TableNotFoundException tnfe) {
LOG.error("The table " + tableName + " not found ", tnfe);
throw new RuntimeException("HBase table '" + tableName + "' not found.", tnfe);
} catch (IOException ioe) {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
}
/**
* Truncates HTable while preserving the region pre-splits
* @param table HTable to truncate
* @return new instance of the truncated HTable
* @throws IOException throws IOException in case of any HBase IO problems
*/
public static HTable truncateTable(HTable table) throws IOException {
Configuration conf = table.getConfiguration();
byte[][] presplits = table.getRegionLocator().getStartKeys();
if (presplits.length > 0 && presplits[0].length == 0) {
presplits = Arrays.copyOfRange(presplits, 1, presplits.length);
}
HTableDescriptor desc = table.getTableDescriptor();
table.close();
try (Connection con = ConnectionFactory.createConnection(conf)) {
try (Admin admin = con.getAdmin()) {
admin.disableTable(desc.getTableName());
admin.deleteTable(desc.getTableName());
admin.createTable(desc, presplits);
}
}
return HalyardTableUtils.getTable(conf, desc.getTableName().getNameAsString(), false, 0);
}
@ProcessElement
public void processElement(
@Element Read read,
OutputReceiver<Result> out,
RestrictionTracker<ByteKeyRange, ByteKey> tracker)
throws Exception {
Connection connection = ConnectionFactory.createConnection(read.getConfiguration());
TableName tableName = TableName.valueOf(read.getTableId());
Table table = connection.getTable(tableName);
final ByteKeyRange range = tracker.currentRestriction();
try (ResultScanner scanner =
table.getScanner(HBaseUtils.newScanInRange(read.getScan(), range))) {
for (Result result : scanner) {
ByteKey key = ByteKey.copyFrom(result.getRow());
if (!tracker.tryClaim(key)) {
return;
}
out.output(result);
}
tracker.tryClaim(ByteKey.EMPTY);
}
}
@Override
public void open(FunctionContext context) {
LOG.info("start open ...");
org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
try {
hConnection = ConnectionFactory.createConnection(config);
table = (HTable) hConnection.getTable(TableName.valueOf(hTableName));
} catch (TableNotFoundException tnfe) {
LOG.error("Table '{}' not found ", hTableName, tnfe);
throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
} catch (IOException ioe) {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
this.readHelper = new HBaseReadWriteHelper(hbaseTableSchema);
LOG.info("end open.");
}
@Override
public void loader() throws Exception {
Table table = null;
try {
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
table = conn.getTable(TableName.valueOf("dfdq_rhm_aly:f_aly_point_data_test"));
Put put = new Put("kkk".getBytes());
put.addColumn(Bytes.toBytes("f"),Bytes.toBytes("t"),Bytes.toBytes(System.currentTimeMillis()));
table.put(put);
} catch (Exception e) {
throw new Exception("批量存储数据失败!", e);
} finally {
// table.close();
}
}
/**
* Returns list of fragments containing all of the
* HBase's table data.
* Lookup table information with mapping between
* field names in GPDB table and HBase table will be
* returned as user data.
*
* @return a list of fragments
*/
@Override
public List<Fragment> getFragments() throws Exception {
// check that Zookeeper and HBase master are available
HBaseAdmin.checkHBaseAvailable(configuration);
connection = ConnectionFactory.createConnection(configuration);
Admin hbaseAdmin = connection.getAdmin();
if (!HBaseUtilities.isTableAvailable(hbaseAdmin, context.getDataSource())) {
HBaseUtilities.closeConnection(hbaseAdmin, connection);
throw new TableNotFoundException(context.getDataSource());
}
byte[] userData = prepareUserData();
addTableFragments(userData);
HBaseUtilities.closeConnection(hbaseAdmin, connection);
return fragments;
}
private void getTable(HbaseSinkConfig hbaseSinkConfig) throws IOException {
configuration = HBaseConfiguration.create();
String hbaseConfigResources = hbaseSinkConfig.getHbaseConfigResources();
if (StringUtils.isNotBlank(hbaseConfigResources)) {
configuration.addResource(hbaseConfigResources);
}
configuration.set("hbase.zookeeper.quorum", hbaseSinkConfig.getZookeeperQuorum());
configuration.set("hbase.zookeeper.property.clientPort", hbaseSinkConfig.getZookeeperClientPort());
configuration.set("zookeeper.znode.parent", hbaseSinkConfig.getZookeeperZnodeParent());
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
tableName = TableName.valueOf(hbaseSinkConfig.getTableName());
if (!admin.tableExists(this.tableName)) {
throw new IllegalArgumentException(this.tableName + " table does not exist.");
}
table = connection.getTable(this.tableName);
}
private void prepareTableOpen() throws Exception {
// Set table name
context.setDataSource(tableName);
// Make sure we mock static functions in HBaseConfiguration
PowerMockito.mockStatic(HBaseConfiguration.class);
hbaseConfiguration = mock(Configuration.class);
when(HBaseConfiguration.create()).thenReturn(hbaseConfiguration);
// Make sure we mock static functions in ConnectionFactory
PowerMockito.mockStatic(ConnectionFactory.class);
hbaseConnection = mock(Connection.class);
when(ConnectionFactory.createConnection(hbaseConfiguration)).thenReturn(hbaseConnection);
table = mock(Table.class);
when(hbaseConnection.getTable(TableName.valueOf(tableName))).thenReturn(table);
}
/**
* Helper method which locates or creates and returns the specified HTable used for triple/ quad storage
* @param config Hadoop Configuration of the cluster running HBase
* @param tableName String table name
* @param create boolean option to create the table if does not exists
* @param splits array of keys used to pre-split new table, may be null
* @return HTable
* @throws IOException throws IOException in case of any HBase IO problems
*/
public static HTable getTable(Configuration config, String tableName, boolean create, byte[][] splits) throws IOException {
Configuration cfg = HBaseConfiguration.create(config);
cfg.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 3600000l);
if (create) {
try (Connection con = ConnectionFactory.createConnection(config)) {
try (Admin admin = con.getAdmin()) {
//check if the table exists and if it doesn't, make it
if (!admin.tableExists(TableName.valueOf(tableName))) {
HTableDescriptor td = new HTableDescriptor(TableName.valueOf(tableName));
td.addFamily(createColumnFamily());
admin.createTable(td, splits);
}
}
}
}
//this is deprecated, the recommendation now is to use connection.getTable()
HTable table = new HTable(cfg, tableName);
table.setAutoFlushTo(false);
return table;
}
/**
* Puts the specified RegionInfo into META with replica related columns
*/
public static void fixMetaHoleOnlineAndAddReplicas(Configuration conf,
RegionInfo hri, Collection<ServerName> servers, int numReplicas) throws IOException {
Connection conn = ConnectionFactory.createConnection(conf);
Table meta = conn.getTable(TableName.META_TABLE_NAME);
Put put = HBCKMetaTableAccessor.makePutFromRegionInfo(hri, System.currentTimeMillis());
if (numReplicas > 1) {
Random r = new Random();
ServerName[] serversArr = servers.toArray(new ServerName[servers.size()]);
for (int i = 1; i < numReplicas; i++) {
ServerName sn = serversArr[r.nextInt(serversArr.length)];
// the column added here is just to make sure the master is able to
// see the additional replicas when it is asked to assign. The
// final value of these columns will be different and will be updated
// by the actual regionservers that start hosting the respective replicas
HBCKMetaTableAccessor.addLocation(put, sn, sn.getStartcode(), i);
}
}
meta.put(put);
meta.close();
conn.close();
}
/**
* Deletes the middle region from the regions of the given table from Meta table
* Removes whole of the "info" column family
*/
private void deleteRegionFromMeta(String tname) throws IOException, InterruptedException {
TableName tn = TableName.valueOf(tname);
try (Connection connection = ConnectionFactory.createConnection(conf)) {
Table metaTable = connection.getTable(TableName.valueOf("hbase:meta"));
List<RegionInfo> ris = HBCKMetaTableAccessor.getTableRegions(connection, tn);
System.out.println(String.format("Current Regions of the table " + tn.getNameAsString()
+ " in Meta before deletion of the region are: " + ris));
RegionInfo ri = ris.get(ris.size() / 2);
System.out.println("Deleting Region " + ri.getRegionNameAsString());
byte[] key = HBCKMetaTableAccessor.getMetaKeyForRegion(ri);
Delete delete = new Delete(key);
delete.addFamily(Bytes.toBytes("info"));
metaTable.delete(delete);
Thread.sleep(500);
ris = HBCKMetaTableAccessor.getTableRegions(connection, tn);
System.out.println("Current Regions of the table " + tn.getNameAsString()
+ " in Meta after deletion of the region are: " + ris);
}
}
public void initHbaseEnv(String zk, String zkNodeParent)
throws IOException
{
if (null == connection) {
synchronized (HbaseHelper.class) {
if (null == connection) {
Configuration conf = new Configuration();
conf.set(HBASE_ZOOKEEPER_QUORUM, zk);
if (zkNodeParent != null) {
conf.set(ZOOKEEPER_ZNODE_PARENT, zkNodeParent);
}
HbaseHelper.connection = ConnectionFactory.createConnection(conf);
Runtime.getRuntime().addShutdownHook(new Thread(this::closeConnection));
}
}
}
}
@Override
public synchronized void open() throws IOException {
HugeConfig config = this.config();
String hosts = config.get(HbaseOptions.HBASE_HOSTS);
int port = config.get(HbaseOptions.HBASE_PORT);
String znodeParent = config.get(HbaseOptions.HBASE_ZNODE_PARENT);
Configuration hConfig = HBaseConfiguration.create();
hConfig.set(HConstants.ZOOKEEPER_QUORUM, hosts);
hConfig.set(HConstants.ZOOKEEPER_CLIENT_PORT, String.valueOf(port));
hConfig.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
hConfig.setInt("zookeeper.recovery.retry",
config.get(HbaseOptions.HBASE_ZK_RETRY));
// Set hbase.hconnection.threads.max 64 to avoid OOM(default value: 256)
hConfig.setInt("hbase.hconnection.threads.max",
config.get(HbaseOptions.HBASE_THREADS_MAX));
this.hbase = ConnectionFactory.createConnection(hConfig);
}
private Connection getHBaseConnection() {
Configuration hbaseConfig = HBaseConfiguration.create();
String quorum = config.getHbaseZkQuorum();
hbaseConfig.set("hbase.zookeeper.quorum", quorum);
String zkZnodeParent = config.getHBaseZkZnodeParent();
if (zkZnodeParent != null) {
hbaseConfig.set("zookeeper.znode.parent", zkZnodeParent);
}
String port = String.valueOf(config.getHbaseZkPort());
hbaseConfig.set("hbase.zookeeper.property.clientPort", port);
try {
return ConnectionFactory.createConnection(hbaseConfig);
} catch (IOException e) {
throw new HoodieDependentSystemUnavailableException(HoodieDependentSystemUnavailableException.HBASE,
quorum + ":" + port);
}
}
@Override
public Connection get()
{
try {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zooKeepers);
conf.set("hbase.client.pause", "50");
conf.set("hbase.client.retries.number", "3");
conf.set("hbase.rpc.timeout", "2000");
conf.set("hbase.client.operation.timeout", "3000");
conf.set("hbase.client.scanner.timeout.period", "10000");
Connection connection = ConnectionFactory.createConnection(conf);
LOG.info("Connection to instance %s at %s established, user %s");
return connection;
}
catch (IOException e) {
throw new PrestoException(UNEXPECTED_HBASE_ERROR, "Failed to get connection to HBASE", e);
}
}
@Override
protected Connection initSourceConnect() throws IOException, ServiceException {
// 目前只有zklist转成serverlist和dbname
Configuration config = HBaseConfiguration.create();
String address = connection.toString(",");
config.set(DataStoreProtocol.HBASE_ZK_QUORUM, address);
config.set("hbase.client.scanner.caching",
(String) connection.getContext(DataStoreProtocol.HBASE_QUERY_CACHING));
config.set("hbase.client.scanner.max.result.size",
(String) connection.getContext(DataStoreProtocol.HBASE_QUERY_MAXRESULTSIZE));
config.set("zookeeper.recovery.retry", String.valueOf(connection.getRetryTimes()));
// Failed to replace a bad datanode exception protection configuration
config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
HBaseAdmin.checkHBaseAvailable(config);
conn = ConnectionFactory.createConnection(config);
// hbase.client.retries.number = 1 and zookeeper.recovery.retry = 1.
return conn;
}
public HBaseTransactionManager build() throws IOException, InterruptedException {
Connection connection = ConnectionFactory.createConnection(hbaseOmidClientConf.getHBaseConfiguration());
CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient(connection)).get();
CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter(connection)).get();
PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient, connection)).get();
TSOProtocol tsoClient = this.tsoClient.or(buildTSOClient()).get();
return new HBaseTransactionManager(hbaseOmidClientConf,
postCommitter,
tsoClient,
commitTableClient,
commitTableWriter,
new HBaseTransactionFactory(),
connection);
}
public static void main(String[] args) throws IOException {
Config cmdline = new Config();
JCommander jcommander = new JCommander(cmdline, args);
if (cmdline.help) {
jcommander.usage("CompactorUtil");
System.exit(1);
}
HBaseLogin.loginIfNeeded(cmdline.loginFlags);
Configuration conf = HBaseConfiguration.create();
try (Connection conn = ConnectionFactory.createConnection(conf)) {
if (cmdline.enable) {
enableOmidCompaction(conn, TableName.valueOf(cmdline.table),
Bytes.toBytes(cmdline.columnFamily));
} else if (cmdline.disable) {
disableOmidCompaction(conn, TableName.valueOf(cmdline.table),
Bytes.toBytes(cmdline.columnFamily));
} else {
System.err.println("Must specify enable or disable");
}
}
}
@BeforeClass
public void setupTestCompation() throws Exception {
TSOServerConfig tsoConfig = new TSOServerConfig();
tsoConfig.setPort(1234);
tsoConfig.setConflictMapSize(1);
tsoConfig.setWaitStrategy("LOW_CPU");
injector = Guice.createInjector(new TSOForHBaseCompactorTestModule(tsoConfig));
hbaseConf = injector.getInstance(Configuration.class);
HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
// settings required for #testDuplicateDeletes()
hbaseConf.setInt("hbase.hstore.compaction.min", 2);
hbaseConf.setInt("hbase.hstore.compaction.max", 2);
setupHBase();
connection = ConnectionFactory.createConnection(hbaseConf);
admin = connection.getAdmin();
createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
setupTSO();
commitTable = injector.getInstance(CommitTable.class);
}
@Override
public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception {
if (estimatedSizeBytes == null) {
try (Connection connection = ConnectionFactory.createConnection(read.configuration)) {
estimatedSizeBytes =
HBaseUtils.estimateSizeBytes(
connection, read.tableId, HBaseUtils.getByteKeyRange(read.scan));
}
LOG.debug(
"Estimated size {} bytes for table {} and scan {}",
estimatedSizeBytes,
read.tableId,
read.scan);
}
return estimatedSizeBytes;
}
@Override
public void initialize(Configuration conf) throws IOException {
this.conf = conf;
this.connection = ConnectionFactory.createConnection(conf);
final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
createPruneTable(stateTable);
this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
return connection.getTable(stateTable);
}
});
}
@Override
public void initialize(Configuration conf) throws IOException {
this.conf = conf;
this.connection = ConnectionFactory.createConnection(conf);
final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
createPruneTable(stateTable);
this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
return connection.getTable(stateTable);
}
});
}
@Override
public void initialize(Configuration conf) throws IOException {
this.conf = conf;
this.connection = ConnectionFactory.createConnection(conf);
final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
createPruneTable(stateTable);
this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
return connection.getTable(stateTable);
}
});
}
public synchronized static Connection getConnection(Config config) throws IOException {
LOG.info("Opening connection to HBase");
LOG.debug("Creating connection object...");
Configuration configuration = HBaseUtils.getHBaseConfiguration(config);
// new Connection
Connection connection = ConnectionFactory.createConnection(configuration);
if (connection == null) {
LOG.error("Could not open connection to HBase with {}", configuration.get(HBaseUtils.ZK_QUORUM_PROPERTY));
throw new IllegalArgumentException("Could not connect to HBase with supplied ZK quorum");
}
JVMUtils.closeAtShutdown(connection);
return connection;
}
public SecondaryIndexTable(TransactionServiceClient transactionServiceClient, Table table,
byte[] secondaryIndex) throws IOException {
secondaryIndexTableName = TableName.valueOf(table.getName().getNameAsString() + ".idx");
this.connection = ConnectionFactory.createConnection(table.getConfiguration());
Table secondaryIndexHTable = null;
try (Admin hBaseAdmin = this.connection.getAdmin()) {
if (!hBaseAdmin.tableExists(secondaryIndexTableName)) {
hBaseAdmin.createTable(TableDescriptorBuilder.newBuilder(secondaryIndexTableName).build());
}
secondaryIndexHTable = this.connection.getTable(secondaryIndexTableName);
} catch (Exception e) {
Closeables.closeQuietly(connection);
Throwables.propagate(e);
}
this.secondaryIndex = secondaryIndex;
this.transactionAwareHTable = new TransactionAwareHTable(table);
this.secondaryIndexTable = new TransactionAwareHTable(secondaryIndexHTable);
this.transactionContext = new TransactionContext(transactionServiceClient, transactionAwareHTable,
secondaryIndexTable);
}
@Override
public void initialize(Configuration conf) throws IOException {
this.conf = conf;
this.connection = ConnectionFactory.createConnection(conf);
final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
createPruneTable(stateTable);
this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
return connection.getTable(stateTable);
}
});
}
@Override
public void initialize(Configuration conf) throws IOException {
this.conf = conf;
this.connection = ConnectionFactory.createConnection(conf);
final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
LOG.info("Initializing plugin with state table {}:{}", stateTable.getNamespaceAsString(),
stateTable.getNameAsString());
createPruneTable(stateTable);
this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
return connection.getTable(stateTable);
}
});
}
/**
* Sets up common resources required by all clients.
*/
public void init() throws IOException {
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
new TransactionModules().getDistributedModules(),
new TransactionClientModule()
);
zkClient = injector.getInstance(ZKClientService.class);
zkClient.startAndWait();
txClient = injector.getInstance(TransactionServiceClient.class);
conn = ConnectionFactory.createConnection(conf);
createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY });
}
/**
* Sets up common resources required by all clients.
*/
public void init() throws IOException {
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
new TransactionModules().getDistributedModules(),
new TransactionClientModule()
);
zkClient = injector.getInstance(ZKClientService.class);
zkClient.startAndWait();
txClient = injector.getInstance(TransactionServiceClient.class);
conn = ConnectionFactory.createConnection(conf);
createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY });
}
/**
* Sets up common resources required by all clients.
*/
public void init() throws IOException {
Injector injector = Guice.createInjector(
new ConfigModule(conf),
new ZKModule(),
new DiscoveryModules().getDistributedModules(),
new TransactionModules().getDistributedModules(),
new TransactionClientModule()
);
zkClient = injector.getInstance(ZKClientService.class);
zkClient.startAndWait();
txClient = injector.getInstance(TransactionServiceClient.class);
conn = ConnectionFactory.createConnection(conf);
createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY });
}