下面列出了org.apache.hadoop.hbase.client.Table#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Before
public void beforeTest() throws Exception {
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
Table table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
// Prune state table is a non-transactional table, hence no transaction co-processor
Collections.<String>emptyList());
table.close();
dataJanitorState =
new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
return testUtil.getConnection().getTable(pruneStateTable);
}
});
}
@Test
public void testPopulateEdgeIndex() throws Exception {
assertEquals(0, count(graph.vertices()));
Vertex v0 = graph.addVertex(T.id, id(0));
Vertex v1 = graph.addVertex(T.id, id(1));
Vertex v2 = graph.addVertex(T.id, id(2));
Vertex v3 = graph.addVertex(T.id, id(3));
Vertex v4 = graph.addVertex(T.id, id(4));
v0.addEdge("b", v1, "key1", 1);
v0.addEdge("b", v2, "key1", 2);
v0.addEdge("b", v3, "key2", 3);
v0.addEdge("a", v1, "key1", 1);
v0.addEdge("b", v4, "key1", 4);
HBaseGraphConfiguration hconf = graph.configuration();
Connection conn = graph.connection();
Table table = conn.getTable(HBaseGraphUtils.getTableName(hconf, Constants.EDGE_INDICES));
verifyTableCount(table, 5*2); // 5 edge endpoints
graph.createIndex(ElementType.EDGE, "b", "key1", false, true, false);
verifyTableCount(table, 5*2 + 3*2); // 5 edge endpoints and 3 indices
table.close();
}
@Test
public void testNegativeMemstoreSize() throws IOException, InterruptedException {
boolean IOEthrown = false;
Table table = null;
try {
table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName));
// Adding data
Put put1 = new Put(Bytes.toBytes("row1"));
put1.addColumn(family, qualifier, Bytes.toBytes("Value1"));
table.put(put1);
Put put2 = new Put(Bytes.toBytes("row2"));
put2.addColumn(family, qualifier, Bytes.toBytes("Value2"));
table.put(put2);
table.put(put2);
} catch (IOException e) {
IOEthrown = true;
} finally {
Assert.assertFalse("Shouldn't have thrown an exception", IOEthrown);
if (table != null) {
table.close();
}
}
}
/**
* 单条数据导入
*
* @param connection
* @return
* @throws IOException
*/
private static void singleRowImport(Connection connection) throws IOException {
Table table = connection.getTable(TableName.valueOf("t3"));
byte[] columnFamily = "f1".getBytes();
long startTime = System.currentTimeMillis();
for (int i = 0; i < 99999; i++) {
table.put(HBaseUtil.createPut(i + "", columnFamily, "c1", i + ""));
}
table.close();
System.out.println("共耗时:" + (System.currentTimeMillis() - startTime) + "ms");
}
@Test
public void testMultiRowRangeFilterWithRangeOverlap() throws IOException {
tableName = TableName.valueOf(name.getMethodName());
Table ht = TEST_UTIL.createTable(tableName, family, Integer.MAX_VALUE);
generateRows(numRows, ht, family, qf, value);
Scan scan = new Scan();
scan.readAllVersions();
List<RowRange> ranges = new ArrayList<>();
ranges.add(new RowRange(Bytes.toBytes(10), true, Bytes.toBytes(20), false));
ranges.add(new RowRange(Bytes.toBytes(15), true, Bytes.toBytes(40), false));
ranges.add(new RowRange(Bytes.toBytes(65), true, Bytes.toBytes(75), false));
ranges.add(new RowRange(Bytes.toBytes(60), true, null, false));
ranges.add(new RowRange(Bytes.toBytes(60), true, Bytes.toBytes(80), false));
MultiRowRangeFilter filter = new MultiRowRangeFilter(ranges);
scan.setFilter(filter);
int resultsSize = getResultsSize(ht, scan);
LOG.info("found " + resultsSize + " results");
List<Cell> results1 = getScanResult(Bytes.toBytes(10), Bytes.toBytes(40), ht);
List<Cell> results2 = getScanResult(Bytes.toBytes(60), Bytes.toBytes(""), ht);
assertEquals(results1.size() + results2.size(), resultsSize);
ht.close();
}
@Test
public void testIndexer_Multitable() throws Exception {
String tablePrefix = "_multitable_";
createHTable(Bytes.toBytes(tablePrefix + "a_"));
createHTable(Bytes.toBytes(tablePrefix + "b_"));
Table recordTable2 = HBASE_ADMIN.getConnection().getTable(TableName.valueOf(tablePrefix + "a_"));
Table recordTable3 = HBASE_ADMIN.getConnection().getTable(TableName.valueOf(tablePrefix + "b_"));
String hbaseTableName = tablePrefix + ".*";
try {
writeHBaseRecord("row1", ImmutableMap.of(
"firstname", "John",
"lastname", "Doe"), recordTable2);
writeHBaseRecord("row2", ImmutableMap.of(
"firstname", "John",
"lastname", "Doe"), recordTable3);
MR_TEST_UTIL.runTool(
"--hbase-indexer-file", new File(Resources.getResource(getClass(), "multitable_indexer.xml").toURI()).toString(),
"--reducers", "0",
"--collection", "collection1",
"--zk-host", SOLR_ZK);
assertEquals(2, executeSolrQuery("firstname_s:John lastname_s:Doe").size());
} finally {
HBASE_ADMIN.disableTables(hbaseTableName);
HBASE_ADMIN.deleteTables(hbaseTableName);
recordTable2.close();
recordTable3.close();
}
}
@Test
public void simpleMergeWithConcurrentFlushTest() throws Exception {
spliceClassWatcher.executeUpdate(String.format("insert into %s select col1+" + ITERATIONS+", col2 from %s"
,SCHEMA + ".F",SCHEMA + ".A"));
String tableName = sqlUtil.getConglomID(spliceTableWatcherF.toString());
Partition partition = driver.getTableFactory()
.getTable(tableName);
Table htable = ((ClientPartition) partition).unwrapDelegate();
List<Partition> partitions = partition.subPartitions();
int i = 0;
driver.getTableFactory().getAdmin().splitTable(tableName);
List<Cell> newCells = new ArrayList<>();
for (Partition subPartition: partitions){
Scan scan = new Scan(subPartition.getStartKey(),subPartition.getEndKey());
SplitRegionScanner srs = new SplitRegionScanner(scan,
htable,
clock,subPartition, driver.getConfiguration(), htable.getConfiguration());
while (srs.next(newCells)) {
i++;
if (i==ITERATIONS/2)
partition.flush();
newCells.clear();
}
srs.close();
}
htable.close();
Assert.assertEquals("Did not return all rows ",2*ITERATIONS,i);
}
@Override
public JSONArray queryAsJson(HbaseQuery query) {
JSONArray array = new JSONArray();
if (isDisabled() || query == null)
return array;
if (validator.isEmpty(query.tableName)) {
logger.warn(null, "表名称为空,检索失败!");
return array;
}
try {
Table table = getTable(query.getTableName());
ResultScanner scanner = query(table, query.getFilter());
scanner.forEach(result -> {
JSONObject object = new JSONObject();
setToJson(object, Bytes.toString(result.getRow()), result);
array.add(object);
});
scanner.close();
table.close();
} catch (IOException e) {
logger.warn(e, "检索HBase数据[{}]时发生异常!", query.getTableName());
}
return array;
}
@BeforeClass
public static void setUp() throws Exception {
conf.setInt("hfile.format.version", 3);
conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName());
conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
// Create the table schema
// Specify an encryption algorithm without a key
tdb = TableDescriptorBuilder.newBuilder(TableName.valueOf("default",
"TestEncryptionRandomKeying"));
ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("cf"));
String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
columnFamilyDescriptorBuilder.setEncryptionType(algorithm);
tdb.setColumnFamily(columnFamilyDescriptorBuilder.build());
// Start the minicluster
TEST_UTIL.startMiniCluster(1);
// Create the test table
TEST_UTIL.getAdmin().createTable(tdb.build());
TEST_UTIL.waitTableAvailable(tdb.build().getTableName(), 5000);
// Create a store file
Table table = TEST_UTIL.getConnection().getTable(tdb.build().getTableName());
try {
table.put(new Put(Bytes.toBytes("testrow"))
.addColumn(columnFamilyDescriptorBuilder.build().getName(),
Bytes.toBytes("q"), Bytes.toBytes("value")));
} finally {
table.close();
}
TEST_UTIL.getAdmin().flush(tdb.build().getTableName());
}
@Test
public void testWholesomeMerge() throws Exception {
LOG.info("Starting " + name.getMethodName());
final TableName tableName = TableName.valueOf(name.getMethodName());
try {
// Create table and load data.
Table table = createTableAndLoadData(MASTER, tableName);
// Merge 1st and 2nd region
mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1);
// Merge 2nd and 3th region
PairOfSameType<RegionInfo> mergedRegions =
mergeRegionsAndVerifyRegionNum(MASTER, tableName, 1, 2, INITIAL_REGION_NUM - 2);
verifyRowCount(table, ROWSIZE);
// Randomly choose one of the two merged regions
RegionInfo hri = RandomUtils.nextBoolean() ? mergedRegions.getFirst() : mergedRegions.getSecond();
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
AssignmentManager am = cluster.getMaster().getAssignmentManager();
RegionStates regionStates = am.getRegionStates();
// We should not be able to assign it again
am.assign(hri);
assertFalse("Merged region can't be assigned", regionStates.isRegionInTransition(hri));
// We should not be able to unassign it either
am.unassign(hri);
assertFalse("Merged region can't be unassigned", regionStates.isRegionInTransition(hri));
table.close();
} finally {
TEST_UTIL.deleteTable(tableName);
}
}
@Override
public <T extends Model> void delete(T model) {
if (isDisabled() || model == null || validator.isEmpty(model.getId()))
return;
ModelTable modelTable = modelTables.get(model.getClass());
try {
Table table = getTable(modelTable.getTableName());
delete(table, model.getId());
table.close();
} catch (IOException e) {
logger.warn(e, "删除HBase数据[{}:{}]时发生异常!", modelTable.getTableName(), model.getId());
}
}
public void testRegionReplicaReplication(int regionReplication) throws Exception {
// test region replica replication. Create a table with single region, write some data
// ensure that data is replicated to the secondary region
TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
+ regionReplication);
HTableDescriptor htd = HTU.createTableDescriptor(TableName.valueOf(tableName.toString()),
HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
HColumnDescriptor.DEFAULT_KEEP_DELETED);
htd.setRegionReplication(regionReplication);
HTU.getAdmin().createTable(htd);
TableName tableNameNoReplicas =
TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
HTU.deleteTableIfAny(tableNameNoReplicas);
HTU.createTable(tableNameNoReplicas, HBaseTestingUtility.fam1);
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName);
Table tableNoReplicas = connection.getTable(tableNameNoReplicas);
try {
// load some data to the non-replicated table
HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtility.fam1, 6000, 7000);
// load the data to the table
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
verifyReplication(tableName, regionReplication, 0, 1000);
} finally {
table.close();
tableNoReplicas.close();
HTU.deleteTableIfAny(tableNameNoReplicas);
connection.close();
}
}
@Test
public void sampledScanWithConcurrentFlushAndSplitTest() throws Exception {
String tableName = sqlUtil.getConglomID(spliceTableWatcherK.toString());
Partition partition = driver.getTableFactory()
.getTable(tableName);
Table htable = ((ClientPartition) partition).unwrapDelegate();
List<Partition> partitions = partition.subPartitions();
int i = 0;
List<Cell> newCells = new ArrayList<>();
for (Partition subPartition: partitions){
Scan scan = new Scan(subPartition.getStartKey(),subPartition.getEndKey());
scan.setFilter(new SamplingFilter(0.2)); // enable sampling for this scan
SplitRegionScanner srs = new SplitRegionScanner(scan,
htable,
clock,subPartition, driver.getConfiguration(), htable.getConfiguration());
while (srs.next(newCells)) {
i++;
if (i==(ITERATIONS*0.2)/2) {
partition.flush();
driver.getTableFactory().getAdmin().splitTable(tableName);
}
newCells.clear();
}
srs.close();
}
htable.close();
Assert.assertTrue("Returned more rows than expected: " + i, i < (ITERATIONS * 0.35));
Assert.assertTrue("Returned less rows than expected: " + i, i > (ITERATIONS * 0.05));
}
public static void verifyRowCount(final HBaseTestingUtility util, final TableName tableName,
long expectedRows) throws IOException {
Table table = util.getConnection().getTable(tableName);
try {
assertEquals(expectedRows, util.countRows(table));
} finally {
table.close();
}
}
/**
* Return the number of rows in the given table.
*/
public int countRows(final TableName tableName) throws IOException {
Table table = getConnection().getTable(tableName);
try {
return countRows(table);
} finally {
table.close();
}
}
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster();
Table table = TEST_UTIL.createTable(TableName.valueOf(TABLE_NAME), Bytes.toBytes(COL_FAM));
writeRows(table, TOTAL_ROWS, ROWS_WITH_ONE_COL);
table.close();
}
static private void removeTablePermissions(TableName tableName, byte[] column, Table table,
boolean closeTable) throws IOException {
Scan scan = new Scan();
scan.addFamily(ACL_LIST_FAMILY);
String columnName = Bytes.toString(column);
scan.setFilter(new QualifierFilter(CompareOperator.EQUAL, new RegexStringComparator(
String.format("(%s%s%s)|(%s%s)$",
ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
ACL_KEY_DELIMITER, columnName))));
Set<byte[]> qualifierSet = new TreeSet<>(Bytes.BYTES_COMPARATOR);
ResultScanner scanner = null;
try {
scanner = table.getScanner(scan);
for (Result res : scanner) {
for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
qualifierSet.add(q);
}
}
if (qualifierSet.size() > 0) {
Delete d = new Delete(tableName.getName());
for (byte[] qualifier : qualifierSet) {
d.addColumns(ACL_LIST_FAMILY, qualifier);
}
table.delete(d);
}
} finally {
if (scanner != null) {
scanner.close();
}
if (closeTable) {
table.close();
}
}
}
/**
* Initialize the tests with a table filled with some data
* and two snapshots (snapshotName0, snapshotName1) of different states.
* The tableName, snapshotNames and the number of rows in the snapshot are initialized.
*/
@Before
public void setup() throws Exception {
this.admin = UTIL.getAdmin();
long tid = System.currentTimeMillis();
tableName = TableName.valueOf("testtb-" + tid);
snapshotName0 = "snaptb0-" + tid;
snapshotName1 = "snaptb1-" + tid;
snapshotName2 = "snaptb2-" + tid;
// create Table and disable it
createTable();
SnapshotTestingUtils.loadData(UTIL, tableName, 500, FAMILY);
Table table = UTIL.getConnection().getTable(tableName);
snapshot0Rows = countRows(table);
LOG.info("=== before snapshot with 500 rows");
logFSTree();
// take a snapshot
admin.snapshot(snapshotName0, tableName, SnapshotType.FLUSH);
LOG.info("=== after snapshot with 500 rows");
logFSTree();
// insert more data
SnapshotTestingUtils.loadData(UTIL, tableName, 500, FAMILY);
snapshot1Rows = countRows(table);
LOG.info("=== before snapshot with 1000 rows");
logFSTree();
// take a snapshot of the updated table
admin.snapshot(snapshotName1, tableName, SnapshotType.FLUSH);
LOG.info("=== after snapshot with 1000 rows");
logFSTree();
table.close();
}
/**
* Tests that logs are deleted when some region has a compaction
* record in WAL and no other records. See HBASE-8597.
*/
@Test
public void testCompactionRecordDoesntBlockRolling() throws Exception {
Table table = null;
// When the hbase:meta table can be opened, the region servers are running
Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
try {
table = createTestTable(getName());
server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
HRegion region = server.getRegions(table.getName()).get(0);
final WAL log = server.getWAL(region.getRegionInfo());
Store s = region.getStore(HConstants.CATALOG_FAMILY);
// Put some stuff into table, to make sure we have some files to compact.
for (int i = 1; i <= 2; ++i) {
doPut(table, i);
admin.flush(table.getName());
}
doPut(table, 3); // don't flush yet, or compaction might trigger before we roll WAL
assertEquals("Should have no WAL after initial writes", 0,
AbstractFSWALProvider.getNumRolledLogFiles(log));
assertEquals(2, s.getStorefilesCount());
// Roll the log and compact table, to have compaction record in the 2nd WAL.
log.rollWriter();
assertEquals("Should have WAL; one table is not flushed", 1,
AbstractFSWALProvider.getNumRolledLogFiles(log));
admin.flush(table.getName());
region.compact(false);
// Wait for compaction in case if flush triggered it before us.
Assert.assertNotNull(s);
for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
Threads.sleepWithoutInterrupt(200);
}
assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
// Write some value to the table so the WAL cannot be deleted until table is flushed.
doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table.
log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
assertEquals("Should have WAL; one table is not flushed", 1,
AbstractFSWALProvider.getNumRolledLogFiles(log));
// Flush table to make latest WAL obsolete; write another record, and roll again.
admin.flush(table.getName());
doPut(table, 1);
log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
assertEquals("Should have 1 WALs at the end", 1,
AbstractFSWALProvider.getNumRolledLogFiles(log));
} finally {
if (t != null) t.close();
if (table != null) table.close();
}
}
private void writeToHBase(Long threadID) throws IOException {
// TODO: throw new IOException("Chaos Monkey");
Collection<AugmentedEvent> events = new ArrayList<>();
for ( String transactionID : buffered.get(threadID).keySet() ) {
events.addAll( buffered.get(threadID).get(transactionID) );
}
Map<String,List<Put>> mutationsByTable = generateMutations( events );
for (String tableName : mutationsByTable.keySet()) {
Table table = connection.getTable(TableName.valueOf(Bytes.toBytes(tableName)));
table.put(mutationsByTable.get(tableName));
table.close();
}
}