下面列出了org.apache.hadoop.hbase.client.HBaseAdmin#checkHBaseAvailable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Returns list of fragments containing all of the
* HBase's table data.
* Lookup table information with mapping between
* field names in GPDB table and HBase table will be
* returned as user data.
*
* @return a list of fragments
*/
@Override
public List<Fragment> getFragments() throws Exception {
// check that Zookeeper and HBase master are available
HBaseAdmin.checkHBaseAvailable(configuration);
connection = ConnectionFactory.createConnection(configuration);
Admin hbaseAdmin = connection.getAdmin();
if (!HBaseUtilities.isTableAvailable(hbaseAdmin, context.getDataSource())) {
HBaseUtilities.closeConnection(hbaseAdmin, connection);
throw new TableNotFoundException(context.getDataSource());
}
byte[] userData = prepareUserData();
addTableFragments(userData);
HBaseUtilities.closeConnection(hbaseAdmin, connection);
return fragments;
}
@Override
protected Connection initSourceConnect() throws IOException, ServiceException {
// 目前只有zklist转成serverlist和dbname
Configuration config = HBaseConfiguration.create();
String address = connection.toString(",");
config.set(DataStoreProtocol.HBASE_ZK_QUORUM, address);
config.set("hbase.client.scanner.caching",
(String) connection.getContext(DataStoreProtocol.HBASE_QUERY_CACHING));
config.set("hbase.client.scanner.max.result.size",
(String) connection.getContext(DataStoreProtocol.HBASE_QUERY_MAXRESULTSIZE));
config.set("zookeeper.recovery.retry", String.valueOf(connection.getRetryTimes()));
// Failed to replace a bad datanode exception protection configuration
config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
HBaseAdmin.checkHBaseAvailable(config);
conn = ConnectionFactory.createConnection(config);
// hbase.client.retries.number = 1 and zookeeper.recovery.retry = 1.
return conn;
}
private void connect() throws IOException, ServiceException {
Configuration config = HBaseConfiguration.create();
String path = this.getClass().getClassLoader().getResource("hbase-site.xml").getPath();
config.addResource(new Path(path));
try {
HBaseAdmin.checkHBaseAvailable(config);
} catch (MasterNotRunningException e) {
System.out.println("HBase is not running." + e.getMessage());
return;
}
HBaseClientOperations HBaseClientOperations = new HBaseClientOperations();
HBaseClientOperations.run(config);
}
@Override
public void init() throws Exception {
super.init();
ReportUtils.startLevel(report, getClass(), "Init");
config = new Configuration();
// if hbaseRoot root exists in the SUT file, load configuration from it
if (StringUtils.isNotEmpty(hbaseRoot)) {
config.addResource(new Path(getHbaseRoot() + "/conf/hbase-site.xml"));
} else {
config.set("hbase.rootdir", "hdfs://" + host + ":8020/hbase");
}
HBaseAdmin.checkHBaseAvailable(config);
connection = ConnectionFactory.createConnection(config);
admin = connection.getAdmin();
if (admin.getClusterStatus().getServersSize() == 0) {
ReportUtils.report(report, getClass(),
"No HBase region servers running", Reporter.FAIL);
}
ReportUtils.report(report, getClass(), "HBase Admin created");
ReportUtils.stopLevel(report);
}
HBaseStorage(Config config, String localId) throws Exception {
this.localId = localId;
this.hbaseTimeLatticeCol = Bytes.toBytes(localId);
tableCount = config.hasPath("hbase.table.count") ? config.getInt(
"hbase.table.count") : TABLE_COUNT;
ttl = config.hasPath("hbase.ttl") ? config.getLong("hbase.ttl") * 1000L : DEFAULT_TTL;
hbaseConf = HBaseConfiguration.create();
hbaseConf.set("hbase.client.retries.number", "1");
hbaseConf.set("hbase.zookeeper.quorum", config.getString("zookeeper.quorum"));
hbaseConf.set("hbase.zookeeper.property.clientPort", config.getString("zookeeper.port"));
hbaseConf.set("zookeeper.znode.parent", config.getString("zookeeper.znode.parent"));
hbaseConf.set("hbase.hconnection.threads.max",
config.getString("hbase.hconnection.threads.max"));
hbaseConf.set("hbase.hconnection.threads.core",
config.getString("hbase.hconnection.threads.core"));
connection = ConnectionFactory.createConnection(hbaseConf);
createTableConnections(tableCount);
metaTable = getTable(NAMESPACE, META_TABLE);
ThreadFactory factory = (new ThreadFactoryBuilder()).setDaemon(false)
.setNameFormat("hbase-probe-%s")
.setUncaughtExceptionHandler((t, e) -> {
if (log.isErrorEnabled())
log.error(
"hbase heartbeat thread error [thread {}]",
t.getId(), e);
})
.build();
executorService = Executors.newSingleThreadScheduledExecutor(factory);
checkConn();
// tricky: check hbase again, interrupts the creation process by exceptions if it fails
HBaseAdmin.checkHBaseAvailable(hbaseConf);
}
@Override
public void checkHBaseAvailable(List<Stage.ConfigIssue> issues) {
try {
HBaseAdmin.checkHBaseAvailable(hbaseConnectionHelper.getHBaseConfiguration());
} catch (Exception ex) {
LOG.warn("Received exception while connecting to cluster: ", ex);
issues.add(getContext().createConfigIssue(Groups.HBASE.name(), null, Errors.HBASE_06, ex.toString(), ex));
}
}
@Override
protected Result check() throws Exception {
try {
HBaseAdmin.checkHBaseAvailable(configuration);
return HealthCheck.Result.builder()
.healthy()
.withMessage("HBase running")
.build();
} catch (Exception e) {
return HealthCheck.Result.builder()
.unhealthy(e)
.build();
}
}