下面列出了org.apache.hadoop.hbase.client.Connection#getTable ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* e.g.</br>
*
* <pre>
* yarn jar super-devops-tool-hbase-migrator-master.jar \
* com.wl4g.devops.tool.hbase.migrator.HfileBulkImporter \
* -z emr-header-1:2181 \
* -t safeclound.tb_elec_power \
* -p /tmp-devops/safeclound.tb_elec_power
* </pre>
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
HbaseMigrateUtils.showBanner();
CommandLine line = new Builder().option("z", "zkaddr", null, "Zookeeper address.")
.option("t", "tabname", null, "Hbase table name.")
.option("p", "path", null, "Data hdfs path to be import. e.g. hdfs://localhost:9000/bak/safeclound.tb_air")
.build(args);
Configuration cfg = HBaseConfiguration.create();
cfg.set("hbase.zookeeper.quorum", line.getOptionValue("z"));
Connection conn = ConnectionFactory.createConnection(cfg);
Admin admin = conn.getAdmin();
Table table = conn.getTable(TableName.valueOf(line.getOptionValue("t")));
LoadIncrementalHFiles load = new LoadIncrementalHFiles(cfg);
load.doBulkLoad(new Path(line.getOptionValue("p")), admin, table,
conn.getRegionLocator(TableName.valueOf(line.getOptionValue("t"))));
}
private void testScanWithFilters(Connection connection, String tableName) throws IOException {
createTable(thriftAdmin, tableName);
try (Table table = connection.getTable(TableName.valueOf(tableName))){
FilterList filterList = new FilterList();
PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes("testrow"));
ColumnValueFilter columnValueFilter = new ColumnValueFilter(FAMILYA, QUALIFIER_1,
CompareOperator.EQUAL, VALUE_1);
filterList.addFilter(prefixFilter);
filterList.addFilter(columnValueFilter);
Scan scan = new Scan();
scan.readVersions(2);
scan.setFilter(filterList);
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> iterator = scanner.iterator();
assertTrue(iterator.hasNext());
int counter = 0;
while (iterator.hasNext()) {
Result result = iterator.next();
counter += result.size();
}
assertEquals(2, counter);
}
}
/**
* Returns a list of {@code Delete} to remove all entries returned by the passed scanner.
* @param connection connection to re-use
* @param scan the scanner to use to generate the list of deletes
*/
static List<Delete> createDeletesForExistingSnapshotsFromScan(Connection connection, Scan scan)
throws IOException {
List<Delete> deletes = new ArrayList<>();
try (Table quotaTable = connection.getTable(QUOTA_TABLE_NAME);
ResultScanner rs = quotaTable.getScanner(scan)) {
for (Result r : rs) {
CellScanner cs = r.cellScanner();
while (cs.advance()) {
Cell c = cs.current();
byte[] family = Bytes.copy(c.getFamilyArray(), c.getFamilyOffset(), c.getFamilyLength());
byte[] qual =
Bytes.copy(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength());
Delete d = new Delete(r.getRow());
d.addColumns(family, qual);
deletes.add(d);
}
}
return deletes;
}
}
@Test
public void testPopulateEdgeIndex() throws Exception {
assertEquals(0, count(graph.vertices()));
Vertex v0 = graph.addVertex(T.id, id(0));
Vertex v1 = graph.addVertex(T.id, id(1));
Vertex v2 = graph.addVertex(T.id, id(2));
Vertex v3 = graph.addVertex(T.id, id(3));
Vertex v4 = graph.addVertex(T.id, id(4));
v0.addEdge("b", v1, "key1", 1);
v0.addEdge("b", v2, "key1", 2);
v0.addEdge("b", v3, "key2", 3);
v0.addEdge("a", v1, "key1", 1);
v0.addEdge("b", v4, "key1", 4);
HBaseGraphConfiguration hconf = graph.configuration();
Connection conn = graph.connection();
Table table = conn.getTable(HBaseGraphUtils.getTableName(hconf, Constants.EDGE_INDICES));
verifyTableCount(table, 5*2); // 5 edge endpoints
graph.createIndex(ElementType.EDGE, "b", "key1", false, true, false);
verifyTableCount(table, 5*2 + 3*2); // 5 edge endpoints and 3 indices
table.close();
}
/**
* Fetches any persisted HBase snapshot sizes stored in the quota table. The sizes here are
* computed relative to the table which the snapshot was created from. A snapshot's size will
* not include the size of files which the table still refers. These sizes, in bytes, are what
* is used internally to compute quota violation for tables and namespaces.
*
* @return A map of snapshot name to size in bytes per space quota computations
*/
public static Map<String,Long> getObservedSnapshotSizes(Connection conn) throws IOException {
try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
ResultScanner rs = quotaTable.getScanner(createScanForSpaceSnapshotSizes())) {
final Map<String,Long> snapshotSizes = new HashMap<>();
for (Result r : rs) {
CellScanner cs = r.cellScanner();
while (cs.advance()) {
Cell c = cs.current();
final String snapshot = extractSnapshotNameFromSizeCell(c);
final long size = parseSnapshotSize(c);
snapshotSizes.put(snapshot, size);
}
}
return snapshotSizes;
}
}
private static Table getTableFromSingletonPool(RegionCoprocessorEnvironment env, TableName tableName) throws IOException {
// It's ok to not ever do a pool.close() as we're storing a single
// table only. The HTablePool holds no other resources that this table
// which will be closed itself when it's no longer needed.
Connection conn = ConnectionFactory.getConnection(ConnectionType.DEFAULT_SERVER_CONNECTION, env);
try {
return conn.getTable(tableName);
} catch (RuntimeException t) {
// handle cases that an IOE is wrapped inside a RuntimeException like HTableInterface#createHTableInterface
if(t.getCause() instanceof IOException) {
throw (IOException)t.getCause();
} else {
throw t;
}
}
}
private List<RegionInfo> listRegionsInMeta() throws Exception {
Connection connection = TEST_UTIL.getConnection();
Table table = connection.getTable(TableName.META_TABLE_NAME);
Scan scan = new Scan();
scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
ResultScanner scanner = table.getScanner(scan);
final List<RegionInfo> regionInfos = new ArrayList<>();
for(Result r : scanner) {
regionInfos.add(RegionInfo.parseFrom(r.getValue(HConstants.CATALOG_FAMILY,
HConstants.REGIONINFO_QUALIFIER)));
}
return regionInfos;
}
private static void prepareData(Connection conn) throws IOException {
Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
// check how many rows existing
int nRows = 0;
Scan scan = new Scan();
scan.setFilter(new KeyOnlyFilter());
ResultScanner scanner = table.getScanner(scan);
for (Result r : scanner) {
r.getRow(); // nothing to do
nRows++;
}
if (nRows > 0) {
logger.info("{} existing rows", nRows);
if (nRows != N_ROWS)
throw new IOException("Expect " + N_ROWS + " rows but it is not");
return;
}
// insert rows into empty table
logger.info("Writing {} rows to {}", N_ROWS, TEST_TABLE);
long nBytes = 0;
for (int i = 0; i < N_ROWS; i++) {
byte[] rowkey = Bytes.toBytes(i);
Put put = new Put(rowkey);
byte[] cell = randomBytes();
put.addColumn(CF, QN, cell);
table.put(put);
nBytes += cell.length;
dot(i, N_ROWS);
}
logger.info("Written {} rows, {} bytes", N_ROWS, nBytes);
} finally {
IOUtils.closeQuietly(table);
}
}
public HBaseLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
String tableName = extTableSnapshot.getStorageLocationIdentifier();
this.lookupTableName = TableName.valueOf(tableName);
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
Connection connection = HBaseConnection.get(kylinConfig.getStorageUrl());
try {
table = connection.getTable(lookupTableName);
} catch (IOException e) {
throw new RuntimeException("error when connect HBase", e);
}
String[] keyColumns = extTableSnapshot.getKeyColumns();
encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, extTableSnapshot.getShardNum());
}
private static void testColumnScan(Connection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
Stats stats = new Stats("COLUMN_SCAN");
Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
stats.markStart();
int nLogicCols = colScans.size();
int nLogicRows = colScans.get(0).getSecond() - colScans.get(0).getFirst();
Scan[] scans = new Scan[nLogicCols];
ResultScanner[] scanners = new ResultScanner[nLogicCols];
for (int i = 0; i < nLogicCols; i++) {
scans[i] = new Scan();
scans[i].addFamily(CF);
scanners[i] = table.getScanner(scans[i]);
}
for (int i = 0; i < nLogicRows; i++) {
for (int c = 0; c < nLogicCols; c++) {
Result r = scanners[c].next();
stats.consume(r);
}
dot(i, nLogicRows);
}
stats.markEnd();
} finally {
IOUtils.closeQuietly(table);
}
}
/**
* Returns a set of the names of all namespaces containing snapshot entries.
* @param conn connection to re-use
*/
public static Set<String> getNamespaceSnapshots(Connection conn) throws IOException {
try (Table quotaTable = conn.getTable(QUOTA_TABLE_NAME);
ResultScanner rs = quotaTable.getScanner(createScanForNamespaceSnapshotSizes())) {
Set<String> snapshots = new HashSet<>();
for (Result r : rs) {
CellScanner cs = r.cellScanner();
while (cs.advance()) {
cs.current();
snapshots.add(getNamespaceFromRowKey(r.getRow()));
}
}
return snapshots;
}
}
public static Result exist(Connection conn, TableName tableName, byte[] key) throws IOException {
Table htable = conn.getTable(tableName);
Get get = new Get(key);
get.addColumn(DATA_COLUMN_FAMILY_BYTES, SYS_COLUMN_VERSION_BYTES);
Result r = htable.get(get);
return r;
}
/**
* Fetches the computed size of all snapshots against tables in a namespace for space quotas.
*/
static long getNamespaceSnapshotSize(
Connection conn, String namespace) throws IOException {
try (Table quotaTable = conn.getTable(QuotaTableUtil.QUOTA_TABLE_NAME)) {
Result r = quotaTable.get(createGetNamespaceSnapshotSize(namespace));
if (r.isEmpty()) {
return 0L;
}
r.advance();
return parseSnapshotSize(r.current());
} catch (InvalidProtocolBufferException e) {
throw new IOException("Could not parse snapshot size value for namespace " + namespace, e);
}
}
@Override
protected int doWork() throws Exception {
ProcessBasedLocalHBaseCluster hbaseCluster =
new ProcessBasedLocalHBaseCluster(conf, NUM_DATANODES, numRegionServers);
hbaseCluster.startMiniDFS();
// start the process based HBase cluster
hbaseCluster.startHBase();
// create tables if needed
HBaseTestingUtility.createPreSplitLoadTestTable(conf, TABLE_NAME,
HFileTestUtil.DEFAULT_COLUMN_FAMILY, Compression.Algorithm.NONE,
DataBlockEncoding.NONE);
LOG.debug("Loading data....\n\n");
loadData();
LOG.debug("Sleeping for " + SLEEP_SEC_AFTER_DATA_LOAD +
" seconds....\n\n");
Threads.sleep(5 * SLEEP_SEC_AFTER_DATA_LOAD);
Connection connection = ConnectionFactory.createConnection(conf);
int metaRSPort = HBaseTestingUtility.getMetaRSPort(connection);
LOG.debug("Killing hbase:meta region server running on port " + metaRSPort);
hbaseCluster.killRegionServer(metaRSPort);
Threads.sleep(2000);
LOG.debug("Restarting region server running on port metaRSPort");
hbaseCluster.startRegionServer(metaRSPort);
Threads.sleep(2000);
LOG.debug("Trying to scan meta");
Table metaTable = connection.getTable(TableName.META_TABLE_NAME);
ResultScanner scanner = metaTable.getScanner(new Scan());
Result result;
while ((result = scanner.next()) != null) {
LOG.info("Region assignment from META: "
+ Bytes.toStringBinary(result.getRow())
+ " => "
+ Bytes.toStringBinary(result.getFamilyMap(HConstants.CATALOG_FAMILY)
.get(HConstants.SERVER_QUALIFIER)));
}
metaTable.close();
connection.close();
return 0;
}
public TTable(Connection connection, String tableName, CommitTable.Client commitTableClient) throws IOException {
this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
}
public SlicedRowFilterGTSDecoderIterator(long now, long timespan, List<Metadata> metadatas, Connection conn, TableName tableName, byte[] colfam, boolean writeTimestamp, KeyStore keystore, boolean useBlockCache) {
this.keystore = keystore;
this.now = now;
this.timespan = timespan;
this.hbaseAESKey = keystore.getKey(KeyStore.AES_HBASE_DATA);
this.writeTimestamp = writeTimestamp;
//
// Check that if 'timespan' is < 0 then 'now' is either Long.MAX_VALUE or congruent to 0 modulo DEFAULT_MODULUS
//
if (timespan < 0) {
if (Long.MAX_VALUE != now && 0 != (now % Constants.DEFAULT_MODULUS)) {
throw new RuntimeException("Incompatible 'timespan' (" + timespan + ") and 'now' (" + now + ")");
}
}
//
// Create a SlicedRowFilter for the prefix, class id, labels id and ts
// We include the prefix so we exit the filter early when the last
// matching row has been reached
//
// 128BITS
int[] bounds = { 0, 24 };
//
// Create singleton for each classId/labelsId combo
//
// TODO(hbs): we should really create multiple scanner, one per class Id for example,
//
List<Pair<byte[], byte[]>> ranges = new ArrayList<Pair<byte[], byte[]>>();
for (Metadata metadata: metadatas) {
byte[][] keys = getKeys(metadata, now, timespan);
byte[] lower = keys[0];
byte[] upper = keys[1];
this.metadatas.put(new String(Arrays.copyOfRange(lower, prefix.length, prefix.length + 16), StandardCharsets.ISO_8859_1), metadata);
Pair<byte[],byte[]> range = new Pair<byte[],byte[]>(lower, upper);
ranges.add(range);
}
SlicedRowFilter filter = new SlicedRowFilter(bounds, ranges, timespan < 0 ? -timespan : Long.MAX_VALUE);
//
// Create scanner. The start key is the lower bound of the first range
//
Scan scan = new Scan();
scan.addFamily(colfam); // (HBaseStore.GTS_COLFAM, Longs.toByteArray(Long.MAX_VALUE - modulus));
scan.setStartRow(filter.getStartKey());
byte[] filterStopKey = filter.getStopKey();
// Add one byte at the end (we can do that because we know the slice is the whole key)
byte[] stopRow = Arrays.copyOf(filterStopKey, filterStopKey.length + 1);
scan.setStopRow(stopRow);
scan.setFilter(filter);
scan.setMaxResultSize(1000000L);
scan.setBatch(50000);
scan.setCaching(50000);
scan.setCacheBlocks(useBlockCache);
Sensision.update(SensisionConstants.SENSISION_CLASS_CONTINUUM_HBASE_CLIENT_FILTERED_SCANNERS, Sensision.EMPTY_LABELS, 1);
Sensision.update(SensisionConstants.SENSISION_CLASS_CONTINUUM_HBASE_CLIENT_FILTERED_SCANNERS_RANGES, Sensision.EMPTY_LABELS, ranges.size());
try {
this.htable = conn.getTable(tableName);
this.scanner = this.htable.getScanner(scan);
iter = scanner.iterator();
} catch (IOException ioe) {
LOG.error("",ioe);
this.iter = null;
}
}
public TTable(Connection connection, byte[] tableName, CommitTable.Client commitTableClient) throws IOException {
this(connection.getTable(TableName.valueOf(tableName)), commitTableClient, false);
}
/**
* Test the tuning task of {@link PressureAwareFlushThroughputController}
*/
@Test
public void testFlushThroughputTuning() throws Exception {
Configuration conf = hbtu.getConfiguration();
setMaxMinThroughputs(20L * 1024 * 1024, 10L * 1024 * 1024);
conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD,
3000);
hbtu.startMiniCluster(1);
Connection conn = ConnectionFactory.createConnection(conf);
hbtu.getAdmin().createTable(TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).setCompactionEnabled(false)
.build());
hbtu.waitTableAvailable(tableName);
HRegionServer regionServer = hbtu.getRSForFirstRegionInTable(tableName);
double pressure = regionServer.getFlushPressure();
LOG.debug("Flush pressure before flushing: " + pressure);
PressureAwareFlushThroughputController throughputController =
(PressureAwareFlushThroughputController) regionServer.getFlushThroughputController();
for (HRegion region : regionServer.getRegions()) {
region.flush(true);
}
// We used to assert that the flush pressure is zero but after HBASE-15787 or HBASE-18294 we
// changed to use heapSize instead of dataSize to calculate the flush pressure, and since
// heapSize will never be zero, so flush pressure will never be zero either. So we changed the
// assertion here.
assertTrue(regionServer.getFlushPressure() < pressure);
Thread.sleep(5000);
boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(hbtu.getConfiguration());
if (tablesOnMaster) {
// If no tables on the master, this math is off and I'm not sure what it is supposed to be
// when meta is on the regionserver and not on the master.
assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
}
Table table = conn.getTable(tableName);
Random rand = new Random();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
byte[] value = new byte[256 * 1024];
rand.nextBytes(value);
table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
}
}
Thread.sleep(5000);
double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure());
assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON);
conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY,
NoLimitThroughputController.class.getName());
regionServer.onConfigurationChange(conf);
assertTrue(throughputController.isStopped());
assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController);
conn.close();
}
public TTable(Connection connection, byte[] tableName, boolean conflictFree) throws IOException {
this(connection.getTable(TableName.valueOf(tableName)), conflictFree);
}
@Test
public void testReadersAndWriters() throws Exception {
Configuration conf = util.getConfiguration();
String timeoutKey = String.format(TIMEOUT_KEY, this.getClass().getSimpleName());
long maxRuntime = conf.getLong(timeoutKey, DEFAULT_TIMEOUT_MINUTES);
long serverCount = util.getHBaseClusterInterface().getClusterMetrics()
.getLiveServerMetrics().size();
long keysToWrite = serverCount * KEYS_TO_WRITE_PER_SERVER;
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TABLE_NAME);
// Create multi-threaded writer and start it. We write multiple columns/CFs and verify
// their integrity, therefore multi-put is necessary.
MultiThreadedWriter writer =
new MultiThreadedWriter(dataGen, conf, TABLE_NAME);
writer.setMultiPut(true);
LOG.info("Starting writer; the number of keys to write is " + keysToWrite);
// TODO : Need to see if tag support has to be given here in the integration test suite
writer.start(1, keysToWrite, WRITER_THREADS);
// Now, do scans.
long now = EnvironmentEdgeManager.currentTime();
long timeLimit = now + (maxRuntime * 60000);
boolean isWriterDone = false;
while (now < timeLimit && !isWriterDone) {
LOG.info("Starting the scan; wrote approximately "
+ dataGen.getTotalNumberOfKeys() + " keys");
isWriterDone = writer.isDone();
if (isWriterDone) {
LOG.info("Scanning full result, writer is done");
}
Scan scan = new Scan();
for (byte[] cf : dataGen.getColumnFamilies()) {
scan.addFamily(cf);
}
scan.setFilter(dataGen.getScanFilter());
scan.setLoadColumnFamiliesOnDemand(true);
// The number of keys we can expect from scan - lower bound (before scan).
// Not a strict lower bound - writer knows nothing about filters, so we report
// this from generator. Writer might have generated the value but not put it yet.
long onesGennedBeforeScan = dataGen.getExpectedNumberOfKeys();
long startTs = EnvironmentEdgeManager.currentTime();
ResultScanner results = table.getScanner(scan);
long resultCount = 0;
Result result = null;
// Verify and count the results.
while ((result = results.next()) != null) {
boolean isOk = writer.verifyResultAgainstDataGenerator(result, true, true);
Assert.assertTrue("Failed to verify [" + Bytes.toString(result.getRow())+ "]", isOk);
++resultCount;
}
long timeTaken = EnvironmentEdgeManager.currentTime() - startTs;
// Verify the result count.
long onesGennedAfterScan = dataGen.getExpectedNumberOfKeys();
Assert.assertTrue("Read " + resultCount + " keys when at most " + onesGennedAfterScan
+ " were generated ", onesGennedAfterScan >= resultCount);
if (isWriterDone) {
Assert.assertTrue("Read " + resultCount + " keys; the writer is done and "
+ onesGennedAfterScan + " keys were generated", onesGennedAfterScan == resultCount);
} else if (onesGennedBeforeScan * 0.9 > resultCount) {
LOG.warn("Read way too few keys (" + resultCount + "/" + onesGennedBeforeScan
+ ") - there might be a problem, or the writer might just be slow");
}
LOG.info("Scan took " + timeTaken + "ms");
if (!isWriterDone) {
Thread.sleep(WAIT_BETWEEN_SCANS_MS);
now = EnvironmentEdgeManager.currentTime();
}
}
Assert.assertEquals("There are write failures", 0, writer.getNumWriteFailures());
Assert.assertTrue("Writer is not done", isWriterDone);
// Assert.fail("Boom!");
connection.close();
}