下面列出了org.apache.hadoop.hbase.client.Admin#mergeRegionsAsync ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void perform() throws Exception {
HBaseTestingUtility util = context.getHBaseIntegrationTestingUtility();
Admin admin = util.getAdmin();
getLogger().info("Performing action: Merge random adjacent regions of table " + tableName);
List<RegionInfo> regions = admin.getRegions(tableName);
if (regions == null || regions.size() < 2) {
getLogger().info("Table " + tableName + " doesn't have enough regions to merge");
return;
}
int i = RandomUtils.nextInt(0, regions.size() - 1);
RegionInfo a = regions.get(i++);
RegionInfo b = regions.get(i);
getLogger().debug("Merging " + a.getRegionNameAsString() + " and " + b.getRegionNameAsString());
// Don't try the merge if we're stopping
if (context.isStopping()) {
return;
}
try {
admin.mergeRegionsAsync(a.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), false);
} catch (Exception ex) {
getLogger().warn("Merge failed, might be caused by other chaos: " + ex.getMessage());
}
if (sleepTime > 0) {
Thread.sleep(sleepTime);
}
}
public void mergeRegions(String tblName, int targetRegions) throws Exception {
TableName table = TableName.valueOf(tblName);
Path tableDir = getTablePath(table);
try(Connection conn = ConnectionFactory.createConnection(conf)) {
Admin admin = conn.getAdmin();
LongAdder counter = new LongAdder();
LongAdder lastTimeProgessed = new LongAdder();
//need to get all regions for the table, regardless of region state
List<RegionInfo> regions = admin.getRegions(table);
Map<Future, Pair<RegionInfo, RegionInfo>> regionsMerging = new ConcurrentHashMap<>();
long roundsNoProgress = 0;
while (regions.size() > targetRegions) {
LOG.info("Iteration: {}", counter);
RegionInfo previous = null;
int regionSize = regions.size();
LOG.info("Attempting to merge {} regions to reach the target {} ...", regionSize, targetRegions);
//to request merge, regions must be OPEN, though
regions = getOpenRegions(conn, table);
for (RegionInfo current : regions) {
if (!current.isSplit()) {
if (previous != null && canMerge(tableDir, previous, current, regionsMerging.values())) {
Future f = admin.mergeRegionsAsync(current.getEncodedNameAsBytes(),
previous.getEncodedNameAsBytes(), true);
Pair<RegionInfo, RegionInfo> regionPair = new Pair<>(previous, current);
regionsMerging.put(f,regionPair);
previous = null;
if ((regionSize - regionsMerging.size()) <= targetRegions) {
break;
}
} else {
previous = current;
}
}
else{
LOG.debug("Skipping split region: {}", current.getEncodedName());
}
}
counter.increment();
LOG.info("Sleeping for {} seconds before next iteration...", (sleepBetweenCycles/1000));
Thread.sleep(sleepBetweenCycles);
regionsMerging.forEach((f, currentPair)-> {
if (f.isDone()) {
LOG.info("Merged regions {} and {} together.",
currentPair.getFirst().getEncodedName(),
currentPair.getSecond().getEncodedName());
regionsMerging.remove(f);
lastTimeProgessed.reset();
lastTimeProgessed.add(counter.longValue());
} else {
LOG.warn("Merge of regions {} and {} isn't completed yet.",
currentPair.getFirst(),
currentPair.getSecond());
}
});
roundsNoProgress = counter.longValue() - lastTimeProgessed.longValue();
if(roundsNoProgress == this.maxRoundsStuck){
LOG.warn("Reached {} iterations without progressing with new merges. Aborting...",
roundsNoProgress);
break;
}
//again, get all regions, regardless of the state,
// in order to avoid breaking the loop prematurely
regions = admin.getRegions(table);
}
}
}
@Test
public void testLocalIndexScanAfterRegionsMerge() throws Exception {
String schemaName = generateUniqueName();
String tableName = schemaName + "." + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
TableName physicalTableName = SchemaUtil.getPhysicalTableName(tableName.getBytes(), false);
String indexPhysicalTableName = physicalTableName.getNameAsString();
createBaseTable(tableName, "('e','j','o')");
Connection conn1 = getConnectionForLocalIndexTest();
try {
String[] strings =
{ "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o",
"p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
for (int i = 0; i < 26; i++) {
conn1.createStatement()
.execute("UPSERT INTO " + tableName + " values('" + strings[i] + "'," + i
+ "," + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
}
conn1.commit();
conn1.createStatement()
.execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
conn1.createStatement()
.execute("CREATE LOCAL INDEX " + indexName + "_2 ON " + tableName + "(k3)");
ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + tableName);
assertTrue(rs.next());
Admin admin = conn1.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
List<RegionInfo> regionsOfUserTable =
MetaTableAccessor.getTableRegions(admin.getConnection(), physicalTableName,
false);
admin.mergeRegionsAsync(regionsOfUserTable.get(0).getEncodedNameAsBytes(),
regionsOfUserTable.get(1).getEncodedNameAsBytes(), false);
regionsOfUserTable =
MetaTableAccessor.getTableRegions(admin.getConnection(), physicalTableName,
false);
while (regionsOfUserTable.size() != 3) {
Thread.sleep(100);
regionsOfUserTable =
MetaTableAccessor.getTableRegions(admin.getConnection(), physicalTableName,
false);
}
String query = "SELECT t_id,k1,v1 FROM " + tableName;
rs = conn1.createStatement().executeQuery(query);
Thread.sleep(1000);
for (int j = 0; j < 26; j++) {
assertTrue(rs.next());
assertEquals(strings[25 - j], rs.getString("t_id"));
assertEquals(25 - j, rs.getInt("k1"));
assertEquals(strings[j], rs.getString("V1"));
}
rs = conn1.createStatement().executeQuery("EXPLAIN " + query);
assertEquals(
"CLIENT PARALLEL " + 3 + "-WAY RANGE SCAN OVER " + indexPhysicalTableName + " [1]\n"
+ " SERVER FILTER BY FIRST KEY ONLY\n" + "CLIENT MERGE SORT",
QueryUtil.getExplainPlan(rs));
query = "SELECT t_id,k1,k3 FROM " + tableName;
rs = conn1.createStatement().executeQuery("EXPLAIN " + query);
assertEquals(
"CLIENT PARALLEL " + 3 + "-WAY RANGE SCAN OVER " + indexPhysicalTableName + " [2]\n"
+ " SERVER FILTER BY FIRST KEY ONLY\n" + "CLIENT MERGE SORT",
QueryUtil.getExplainPlan(rs));
rs = conn1.createStatement().executeQuery(query);
Thread.sleep(1000);
for (int j = 0; j < 26; j++) {
assertTrue(rs.next());
assertEquals(strings[j], rs.getString("t_id"));
assertEquals(j, rs.getInt("k1"));
assertEquals(j + 2, rs.getInt("k3"));
}
} finally {
conn1.close();
}
}
@Test
public void testLocalIndexScanWithMergeSpecialCase() throws Exception {
String schemaName = generateUniqueName();
String tableName = schemaName + "." + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
TableName physicalTableName = SchemaUtil.getPhysicalTableName(tableName.getBytes(), false);
createBaseTable(tableName, "('a','aaaab','def')");
Connection conn1 = getConnectionForLocalIndexTest();
try {
String[] strings =
{ "aa", "aaa", "aaaa", "bb", "cc", "dd", "dff", "g", "h", "i", "j", "k", "l",
"m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
for (int i = 0; i < 26; i++) {
conn1.createStatement()
.execute("UPSERT INTO " + tableName + " values('" + strings[i] + "'," + i
+ "," + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
}
conn1.commit();
conn1.createStatement()
.execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
conn1.createStatement()
.execute("CREATE LOCAL INDEX " + indexName + "_2 ON " + tableName + "(k3)");
Admin admin = conn1.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
List<RegionInfo> regionsOfUserTable =
MetaTableAccessor.getTableRegions(admin.getConnection(), physicalTableName,
false);
admin.mergeRegionsAsync(regionsOfUserTable.get(0).getEncodedNameAsBytes(),
regionsOfUserTable.get(1).getEncodedNameAsBytes(), false);
regionsOfUserTable =
MetaTableAccessor.getTableRegions(admin.getConnection(), physicalTableName,
false);
while (regionsOfUserTable.size() != 3) {
Thread.sleep(100);
regionsOfUserTable =
MetaTableAccessor.getTableRegions(admin.getConnection(), physicalTableName,
false);
}
String query = "SELECT t_id,k1,v1 FROM " + tableName;
ResultSet rs = conn1.createStatement().executeQuery(query);
for (int j = 0; j < 26; j++) {
assertTrue(rs.next());
assertEquals(strings[25-j], rs.getString("t_id"));
assertEquals(25-j, rs.getInt("k1"));
assertEquals(strings[j], rs.getString("V1"));
}
query = "SELECT t_id,k1,k3 FROM " + tableName;
rs = conn1.createStatement().executeQuery(query);
for (int j = 0; j < 26; j++) {
assertTrue(rs.next());
assertEquals(strings[j], rs.getString("t_id"));
assertEquals(j, rs.getInt("k1"));
assertEquals(j + 2, rs.getInt("k3"));
}
} finally {
conn1.close();
}
}