下面列出了怎么用org.apache.hadoop.hbase.client.Connection的API类实例代码及写法,或者点击链接到github查看源代码。
public static void readPrefix(String projectId, String instanceId, String tableId) {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests. After completing all of your requests, call
// the "close" method on the client to safely clean up any remaining background resources.
try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)) {
Table table = connection.getTable(TableName.valueOf(tableId));
Scan prefixScan = new Scan().setRowPrefixFilter(Bytes.toBytes("phone"));
ResultScanner rows = table.getScanner(prefixScan);
for (Result row : rows) {
printRow(row);
}
} catch (IOException e) {
System.out.println(
"Unable to initialize service client, as a network error occurred: \n" + e.toString());
}
}
/**
* Grant permissions on a namespace to the given user using AccessControl Client.
* Will wait until all active AccessController instances have updated their permissions caches
* or will throw an exception upon timeout (10 seconds).
*/
public static void grantOnNamespaceUsingAccessControlClient(final HBaseTestingUtility util,
final Connection connection, final String user, final String namespace,
final Permission.Action... actions) throws Exception {
SecureTestUtil.updateACLs(util, new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
AccessControlClient.grant(connection, namespace, user, actions);
} catch (Throwable t) {
LOG.error("grant failed: ", t);
}
return null;
}
});
}
private static InetSocketAddress getFavoredNode(BulkImportPartition partition) throws IOException {
InetSocketAddress favoredNode = null;
SConfiguration configuration = HConfiguration.getConfiguration();
Connection connection = HBaseConnectionFactory.getInstance(configuration).getConnection();
String regionName = partition.getRegionName();
HRegionLocation regionLocation = MetaTableAccessor.getRegionLocation(connection,
com.splicemachine.primitives.Bytes.toBytesBinary(regionName));
if (regionLocation != null) {
String hostname = regionLocation.getHostname();
int port = regionLocation.getPort();
InetSocketAddress address = new InetSocketAddress(hostname, port);
if (!address.isUnresolved()) {
favoredNode = address;
} else {
SpliceLogUtils.info(LOG, "Cannot resolve host %s to achieve better data locality.", hostname);
}
}
else {
SpliceLogUtils.info(LOG, "Cannot to get region location %s to achieve better data locality.", regionName);
}
return favoredNode;
}
@Override
public Boolean call() throws Exception {
ProcessRecord updatedRecord = null;
Connection hbaseConnection = null;
try {
hbaseConnection = ConnectionFactory.createConnection(hbaseConf);
// Connect only when needed.
ProcessRecordService processRecordService =
new ProcessRecordService(hbaseConf, hbaseConnection);
updatedRecord =
processRecordService.setProcessState(processRecord, newState);
} finally {
if (hbaseConnection != null) {
hbaseConnection.close();
}
}
if ((updatedRecord != null)
&& (updatedRecord.getProcessState() == newState)) {
return true;
}
return false;
}
private void verifyUserDeniedForDeleteExactVersion(final User user, final byte[] row,
final byte[] q1, final byte[] q2) throws IOException, InterruptedException {
user.runAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try (Connection connection = ConnectionFactory.createConnection(conf)) {
try (Table t = connection.getTable(testTable.getTableName())) {
Delete d = new Delete(row, 127);
d.addColumns(TEST_FAMILY1, q1);
d.addColumns(TEST_FAMILY1, q2);
d.addFamily(TEST_FAMILY2, 129);
t.delete(d);
fail(user.getShortName() + " can not do the delete");
} catch (Exception e) {
}
}
return null;
}
});
}
public WAL.Entry getEntry(TableName tableName, Get get) throws IOException {
WAL.Entry entry = null;
try(Connection conn = ConnectionFactory.createConnection(getUtility().getConfiguration())){
Table htable = conn.getTable(tableName);
Result result = htable.get(get);
WALEdit edit = new WALEdit();
if (result != null) {
List<Cell> cellList = result.listCells();
Assert.assertNotNull("Didn't retrieve any cells from SYSTEM.CATALOG", cellList);
for (Cell c : cellList) {
edit.add(c);
}
}
Assert.assertTrue("Didn't retrieve any cells from SYSTEM.CATALOG",
edit.getCells().size() > 0);
WALKeyImpl key = new WALKeyImpl(REGION, tableName, 0, 0, uuid);
entry = new WAL.Entry(key, edit);
}
return entry;
}
private void offlineParentInMetaAndputMetaEntries(Connection conn,
HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
ServerName serverName, List<Mutation> metaEntries) throws IOException {
List<Mutation> mutations = metaEntries;
HRegionInfo copyOfParent = new HRegionInfo(parent);
copyOfParent.setOffline(true);
copyOfParent.setSplit(true);
//Put for parent
Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
mutations.add(putParent);
//Puts for daughters
Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);
addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
addLocation(putB, serverName, 1);
mutations.add(putA);
mutations.add(putB);
MetaTableAccessor.mutateMetaTable(conn, mutations);
}
@VisibleForTesting
BigtableStorage(Connection connection,
ExecutorService executor,
WaitStrategy waitStrategy,
StopStrategy retryStopStrategy) {
this.connection = Objects.requireNonNull(connection, "connection");
this.executor = MDCUtil.withMDC(Context.currentContextExecutor(
register(closer, Objects.requireNonNull(executor, "executor"), "bigtable-storage")));
retryer = RetryerBuilder.<Void>newBuilder()
.retryIfExceptionOfType(IOException.class)
.withWaitStrategy(Objects.requireNonNull(waitStrategy, "waitStrategy"))
.withStopStrategy(Objects.requireNonNull(retryStopStrategy, "retryStopStrategy"))
.withRetryListener(BigtableStorage::onRequestAttempt)
.build();
}
private void clean() throws IOException {
Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
Admin hbaseAdmin = conn.getAdmin();
for (HTableDescriptor descriptor : hbaseAdmin.listTables()) {
String name = descriptor.getNameAsString().toLowerCase(Locale.ROOT);
if (name.startsWith("kylin") || name.startsWith("_kylin")) {
String x = descriptor.getValue(IRealizationConstants.HTableTag);
System.out.println("table name " + descriptor.getNameAsString() + " host: " + x);
System.out.println(descriptor);
System.out.println();
descriptor.setValue(IRealizationConstants.HTableOwner, "[email protected]");
hbaseAdmin.modifyTable(TableName.valueOf(descriptor.getNameAsString()), descriptor);
}
}
hbaseAdmin.close();
}
/**
* @param conf used to communicate arguments to the running jobs.
* @param hbaseConnection used to create Table references connecting to HBase.
* @param cluster for which we are processing
* @param reprocess Reprocess those records that may have been processed
* already. Otherwise successfully processed job files are skipped.
* @param batchSize the total number of jobs to process in a batch (a MR job
* scanning these many records in the raw table).
* @param minJobId used to start the scan. If null then there is no min limit
* on JobId.
* @param maxJobId used to end the scan (inclusive). If null then there is no
* max limit on jobId.
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
* @throws ExecutionException
* @throws RowKeyParseException
*/
private List<JobRunner> getJobRunners(Configuration conf, Connection hbaseConnection, String cluster,
boolean reprocess, int batchSize, String minJobId, String maxJobId)
throws IOException, InterruptedException, ClassNotFoundException,
RowKeyParseException {
List<JobRunner> jobRunners = new LinkedList<JobRunner>();
JobHistoryRawService jobHistoryRawService = new JobHistoryRawService(hbaseConnection);
// Bind all MR jobs together with one runID.
long now = System.currentTimeMillis();
conf.setLong(Constants.MR_RUN_CONF_KEY, now);
List<Scan> scanList = jobHistoryRawService.getHistoryRawTableScans(cluster,
minJobId, maxJobId, reprocess, batchSize);
for (Scan scan : scanList) {
Job job = getProcessingJob(conf, scan, scanList.size());
JobRunner jobRunner = new JobRunner(job, null);
jobRunners.add(jobRunner);
}
return jobRunners;
}
private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException,
InterruptedException, ClassNotFoundException, ExecutionException {
// Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
// the TestOptions introspection for us and dump the output in a readable format.
LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
Admin admin = null;
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(getConf());
admin = connection.getAdmin();
checkTable(admin, opts);
} finally {
if (admin != null) admin.close();
if (connection != null) connection.close();
}
if (opts.nomapred) {
doLocalClients(opts, getConf());
} else {
doMapReduce(opts, getConf());
}
}
public void readFromHBase(Connection conn) throws IOException {
// Get table object
Table table = conn.getTable(this.tn);
// Query row
Get get = new Get(KEY);
Result result = table.get(get);
if (!result.isEmpty()) {
this.currentSp = Bytes.toLong(get(result, "currentSp"));
this.serverId = Bytes.toLong(get(result, "serverId"));
this.createTimestamp = Optional.ofNullable(get(result, "createTimestamp")).map(Bytes::toLong).orElse(0l);
this.updateTimestamp = Optional.ofNullable(get(result, "updateTimestamp")).map(Bytes::toLong).orElse(0l);
this.createOrcaVersion = Bytes.toString(get(result, "createOrcaVersion"));
this.updateorcaVersion = Bytes.toString(get(result, "updateOrcaVersion"));
this.isActive = Optional.ofNullable(get(result, "isActive")).map(Bytes::toBoolean).orElse(Boolean.FALSE);
}
}
@Test
public void testDeleteFamilyWithoutCellVisibilityWithMulipleVersions() throws Exception {
setAuths();
final TableName tableName = TableName.valueOf(testName.getMethodName());
try (Table table = doPutsWithoutVisibility(tableName)) {
TEST_UTIL.getAdmin().flush(tableName);
PrivilegedExceptionAction<Void> actiona = new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try (Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(tableName)) {
Delete d = new Delete(row1);
d.addFamily(fam);
table.delete(d);
} catch (Throwable t) {
throw new IOException(t);
}
return null;
}
};
SUPERUSER.runAs(actiona);
TEST_UTIL.getAdmin().flush(tableName);
Scan s = new Scan();
s.readVersions(5);
s.setAuthorizations(new Authorizations(SECRET, PRIVATE, CONFIDENTIAL, TOPSECRET));
ResultScanner scanner = table.getScanner(s);
Result[] next = scanner.next(3);
assertTrue(next.length == 1);
// All cells wrt row1 should be deleted as we are not passing the Cell Visibility
CellScanner cellScanner = next[0].cellScanner();
cellScanner.advance();
Cell current = cellScanner.current();
assertTrue(Bytes.equals(current.getRowArray(), current.getRowOffset(), current.getRowLength(),
row2, 0, row2.length));
}
}
public static void closeQuietly(Connection conn) {
try {
if (conn != null) {
conn.close();
}
}
catch (Exception ignored) {}
}
/**
* Checks if an authentication tokens exists for the connected cluster,
* obtaining one if needed and adding it to the user's credentials.
*
* @param conn The HBase cluster connection
* @param user The user for whom to obtain the token
* @throws IOException If making a remote call to the authentication service fails
* @throws InterruptedException If executing as the given user is interrupted
* @return true if the token was added, false if it already existed
*/
public static boolean addTokenIfMissing(Connection conn, User user)
throws IOException, InterruptedException {
Token<AuthenticationTokenIdentifier> token = getAuthToken(conn, user);
if (token == null) {
token = ClientTokenUtil.obtainToken(conn, user);
user.getUGI().addToken(token.getService(), token);
return true;
}
return false;
}
@Before
public void createTable() throws Exception {
tableName = TableName.valueOf(name.getMethodName());
// Create a table and write a record as the service user (hbase)
UserGroupInformation serviceUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
"hbase/localhost", KEYTAB_FILE.getAbsolutePath());
clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() {
@Override public String run() throws Exception {
try (Connection conn = ConnectionFactory.createConnection(CONF);
Admin admin = conn.getAdmin();) {
admin.createTable(TableDescriptorBuilder
.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1"))
.build());
UTIL.waitTableAvailable(tableName);
try (Table t = conn.getTable(tableName)) {
Put p = new Put(Bytes.toBytes("r1"));
p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
t.put(p);
}
return admin.getClusterMetrics().getClusterId();
}
}
});
assertNotNull(clusterId);
}
public static boolean tableExists(Connection conn, String tableName) throws IOException {
Admin hbase = conn.getAdmin();
try {
return hbase.tableExists(TableName.valueOf(tableName));
} finally {
hbase.close();
}
}
public BackupSystemTable(Connection conn) throws IOException {
this.connection = conn;
Configuration conf = this.connection.getConfiguration();
tableName = BackupSystemTable.getTableName(conf);
bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
checkSystemTable();
}
/**
* Reads user permission assignments stored in the <code>l:</code> column family of the first
* table row in <code>_acl_</code>.
* <p>
* See {@link PermissionStorage class documentation} for the key structure used for storage.
* </p>
*/
static ListMultimap<String, UserPermission> getPermissions(Configuration conf, byte[] entryName,
Table t, byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
if (entryName == null) {
entryName = ACL_GLOBAL_NAME;
}
// for normal user tables, we just read the table row from _acl_
ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
Get get = new Get(entryName);
get.addFamily(ACL_LIST_FAMILY);
Result row = null;
if (t == null) {
try (Connection connection = ConnectionFactory.createConnection(conf)) {
try (Table table = connection.getTable(ACL_TABLE_NAME)) {
row = table.get(get);
}
}
} else {
row = t.get(get);
}
if (!row.isEmpty()) {
perms = parsePermissions(entryName, row, cf, cq, user, hasFilterUser);
} else {
LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
+ Bytes.toString(entryName));
}
return perms;
}
/**
* Instantiate an indexer based on the given {@link IndexerConf}.
*/
public static Indexer createIndexer(String indexerName, IndexerConf conf, String tableName, ResultToSolrMapper mapper,
Connection tablePool, Sharder sharder, SolrInputDocumentWriter solrWriter) {
switch (conf.getMappingType()) {
case COLUMN:
return new ColumnBasedIndexer(indexerName, conf, tableName, mapper, sharder, solrWriter);
case ROW:
return new RowBasedIndexer(indexerName, conf, tableName, mapper, tablePool, sharder, solrWriter);
default:
throw new IllegalStateException("Can't determine the type of indexing to use for mapping type "
+ conf.getMappingType());
}
}
private void checkHbase(ConfigService conf) throws Exception {
// check configuration
String hbaseConfPath = conf.getHBaseConf();
if (hbaseConfPath != null) {
if (!new File(hbaseConfPath).exists()) {
println("error: hbase config %s is not found", hbaseConfPath);
System.exit(0);
}
println("hbase config: %s", hbaseConfPath);
}
else if (conf.getProperty("hbase.zookeeper.quorum", null) == null) {
println("error: hbase is not configured");
System.exit(0);
}
// check the connection
Configuration hbaseConf = HBaseStorageService.getHBaseConfig(conf);
hbaseConf.set("hbase.client.retries.number", "0");
if (hbaseConf.get("hbase.zookeeper.quorum") != null) {
println("zookeeper quorum: %s", hbaseConf.get("hbase.zookeeper.quorum"));
}
try {
Connection conn = ConnectionFactory.createConnection(hbaseConf);
conn.getAdmin().listNamespaceDescriptors();
conn.close();
println("quorum is connected");
}
catch (Exception x) {
println("error: unable to connect to quorum");
}
}
public HBaseLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
String tableName = extTableSnapshot.getStorageLocationIdentifier();
this.lookupTableName = TableName.valueOf(tableName);
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
Connection connection = HBaseConnection.get(kylinConfig.getStorageUrl());
try {
table = connection.getTable(lookupTableName);
} catch (IOException e) {
throw new RuntimeException("error when connect HBase", e);
}
String[] keyColumns = extTableSnapshot.getKeyColumns();
encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, extTableSnapshot.getShardNum());
}
@Before
public void removeAllQuotas() throws Exception {
if (helper == null) {
helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, new AtomicLong());
}
final Connection conn = TEST_UTIL.getConnection();
// Wait for the quota table to be created
if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) {
helper.waitForQuotaTable(conn);
} else {
// Or, clean up any quotas from previous test runs.
helper.removeAllQuotas(conn);
assertEquals(0, helper.listNumDefinedQuotas(conn));
}
}
RandomReadTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
if (opts.multiGet > 0) {
LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
this.gets = new ArrayList<>(opts.multiGet);
}
}
private void loadData(TableName table, int numRows) throws IOException {
Connection conn = util.getConnection();
// #0- insert some data to a table
Table t1 = conn.getTable(table);
util.loadRandomRows(t1, new byte[]{'f'}, 100, numRows);
// flush table
conn.getAdmin().flush(TableName.valueOf(table.getName()));
}
@Test
public void testExistingCluster() throws Exception {
TableName testRepairMode = TableName.valueOf(name.getMethodName());
TEST_UTIL.startMiniCluster();
Table t = TEST_UTIL.createTable(testRepairMode, FAMILYNAME);
Put p = new Put(Bytes.toBytes("r"));
p.addColumn(FAMILYNAME, Bytes.toBytes("c"), new byte[0]);
t.put(p);
TEST_UTIL.shutdownMiniHBaseCluster();
LOG.info("Starting master-only");
enableMaintenanceMode();
TEST_UTIL.startMiniHBaseCluster(StartMiniClusterOption.builder()
.numRegionServers(0).createRootDir(false).build());
Connection conn = TEST_UTIL.getConnection();
assertTrue(conn.getAdmin().isMasterInMaintenanceMode());
try (Table table = conn.getTable(TableName.META_TABLE_NAME);
ResultScanner scanner = table.getScanner(HConstants.TABLE_FAMILY);
Stream<Result> results = StreamSupport.stream(scanner.spliterator(), false)) {
assertTrue("Did not find user table records while reading hbase:meta",
results.anyMatch(r -> Arrays.equals(r.getRow(), testRepairMode.getName())));
}
try (Table table = conn.getTable(testRepairMode);
ResultScanner scanner = table.getScanner(new Scan())) {
scanner.next();
fail("Should not be able to access user-space tables in repair mode.");
} catch (Exception e) {
// Expected
}
}
/**
* 获取连接
*
* @return
*/
public synchronized static Connection getConnection() {
try {
if (null == con || con.isClosed()) {
// 获得连接对象
con = ConnectionFactory.createConnection(conf);
}
} catch (IOException e) {
System.out.println("获取连接失败!");
e.printStackTrace();
}
return con;
}
/**
* Revoke permissions globally from the given user. Will wait until all active
* AccessController instances have updated their permissions caches or will
* throw an exception upon timeout (10 seconds).
*/
public static void revokeGlobal(final User caller, final HBaseTestingUtility util,
final String user, final Permission.Action... actions) throws Exception {
SecureTestUtil.updateACLs(util, new Callable<Void>() {
@Override
public Void call() throws Exception {
Configuration conf = util.getConfiguration();
try (Connection connection = ConnectionFactory.createConnection(conf, caller)) {
connection.getAdmin().revoke(
new UserPermission(user, Permission.newBuilder().withActions(actions).build()));
}
return null;
}
});
}
/**
* Do hfile bulk exporting
*
* @param builder
* @throws Exception
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void doExporting(CommandLine line) throws Exception {
// Configuration.
String tabname = line.getOptionValue("tabname");
String user = line.getOptionValue("user");
Configuration conf = new Configuration();
conf.set("hbase.zookeeper.quorum", line.getOptionValue("zkaddr"));
conf.set("hbase.fs.tmp.dir", line.getOptionValue("T", DEFAULT_HBASE_MR_TMPDIR));
conf.set(TableInputFormat.INPUT_TABLE, tabname);
conf.set(TableInputFormat.SCAN_BATCHSIZE, line.getOptionValue("batchSize", DEFAULT_SCAN_BATCH_SIZE));
// Check directory.
String outputDir = line.getOptionValue("output", DEFAULT_HFILE_OUTPUT_DIR) + "/" + tabname;
FileSystem fs = FileSystem.get(new URI(outputDir), new Configuration(), user);
state(!fs.exists(new Path(outputDir)), format("HDFS temporary directory already has data, path: '%s'", outputDir));
// Set scan condition.(if necessary)
setScanIfNecessary(conf, line);
// Job.
Connection conn = ConnectionFactory.createConnection(conf);
TableName tab = TableName.valueOf(tabname);
Job job = Job.getInstance(conf);
job.setJobName(HfileBulkExporter.class.getSimpleName() + "@" + tab.getNameAsString());
job.setJarByClass(HfileBulkExporter.class);
job.setMapperClass((Class<Mapper>) ClassUtils.getClass(line.getOptionValue("mapperClass", DEFAULT_MAPPER_CLASS)));
job.setInputFormatClass(TableInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tab), conn.getRegionLocator(tab));
FileOutputFormat.setOutputPath(job, new Path(outputDir));
if (job.waitForCompletion(true)) {
long total = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_TOTAL).getValue();
long processed = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_PROCESSED).getValue();
log.info(String.format("Exported to successfully! with processed:(%d)/total:(%d)", processed, total));
}
}
private int getTableCount(Connection connection) {
try (Table table = connection.getTable(SlowLogTableAccessor.SLOW_LOG_TABLE_NAME)) {
ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
int count = 0;
for (Result result : resultScanner) {
++count;
}
return count;
} catch (Exception e) {
return 0;
}
}