下面列出了org.apache.hadoop.hbase.client.HBaseAdmin#split ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
protected void splitTable(Connection conn, byte[] splitPoint, byte[] tabName) throws IOException, InterruptedException, SQLException {
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
int nRegionsNow = services.getAllTableRegions(tabName).size();
HBaseAdmin admin = services.getAdmin();
try {
admin.split(tabName, splitPoint);
int nTries = 0;
int nRegions;
do {
Thread.sleep(2000);
services.clearTableRegionCache(tabName);
nRegions = services.getAllTableRegions(tabName).size();
nTries++;
} while (nRegions == nRegionsNow && nTries < 10);
if (nRegions == nRegionsNow) {
fail();
}
// FIXME: I see the commit of the stats finishing before this with a lower timestamp that the scan timestamp,
// yet without this sleep, the query finds the old data. Seems like an HBase bug and a potentially serious one.
Thread.sleep(8000);
} finally {
admin.close();
}
}
@Test
public void testManualSplit() throws Exception {
initTable();
Connection conn = DriverManager.getConnection(getUrl());
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
int nRegions = services.getAllTableRegions(TABLE_NAME_BYTES).size();
int nInitialRegions = nRegions;
HBaseAdmin admin = services.getAdmin();
try {
admin.split(TABLE_NAME);
int nTries = 0;
while (nRegions == nInitialRegions && nTries < 10) {
Thread.sleep(1000);
nRegions = services.getAllTableRegions(TABLE_NAME_BYTES).size();
nTries++;
}
// Split finished by this time, but cache isn't updated until
// table is accessed
assertEquals(nRegions, nInitialRegions);
int nRows = 2;
String query = "SELECT count(*) FROM S WHERE a IN ('tl','jt',' a',' b',' c',' d')";
ResultSet rs1 = conn.createStatement().executeQuery(query);
assertTrue(rs1.next());
nRegions = services.getAllTableRegions(TABLE_NAME_BYTES).size();
// Region cache has been updated, as there are more regions now
assertNotEquals(nRegions, nInitialRegions);
/*
if (nRows != rs1.getInt(1)) {
// Run the same query again and it always passes now
// (as region cache is up-to-date)
ResultSet r2 = conn.createStatement().executeQuery(query);
assertTrue(r2.next());
assertEquals(nRows, r2.getInt(1));
}
*/
assertEquals(nRows, rs1.getInt(1));
} finally {
admin.close();
}
}
@Test
public void testSplitWithCachedMeta() throws Exception {
// Tests that you don't get an ambiguous column exception when using the same alias as the column name
String query = "SELECT a_string, b_string, count(1) FROM atable WHERE organization_id=? and entity_id<=? GROUP BY a_string,b_string";
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
Connection conn = DriverManager.getConnection(getUrl(), props);
HBaseAdmin admin = null;
try {
PreparedStatement statement = conn.prepareStatement(query);
statement.setString(1, tenantId);
statement.setString(2, ROW4);
ResultSet rs = statement.executeQuery();
assertTrue(rs.next());
assertEquals(A_VALUE, rs.getString(1));
assertEquals(B_VALUE, rs.getString(2));
assertEquals(2, rs.getLong(3));
assertTrue(rs.next());
assertEquals(A_VALUE, rs.getString(1));
assertEquals(C_VALUE, rs.getString(2));
assertEquals(1, rs.getLong(3));
assertTrue(rs.next());
assertEquals(A_VALUE, rs.getString(1));
assertEquals(E_VALUE, rs.getString(2));
assertEquals(1, rs.getLong(3));
assertFalse(rs.next());
byte[] tableName = Bytes.toBytes(ATABLE_NAME);
admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName);
htable.clearRegionCache();
int nRegions = htable.getRegionLocations().size();
if(!admin.tableExists(TableName.valueOf(MetaDataUtil.getLocalIndexTableName(ATABLE_NAME)))) {
admin.split(tableName, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A" + Character.valueOf((char) ('3' + nextRunCount())) + ts))); // vary split point with test run
int retryCount = 0;
do {
Thread.sleep(2000);
retryCount++;
//htable.clearRegionCache();
} while (retryCount < 10 && htable.getRegionLocations().size() == nRegions);
assertNotEquals(nRegions, htable.getRegionLocations().size());
}
statement.setString(1, tenantId);
rs = statement.executeQuery();
assertTrue(rs.next());
assertEquals(A_VALUE, rs.getString(1));
assertEquals(B_VALUE, rs.getString(2));
assertEquals(2, rs.getLong(3));
assertTrue(rs.next());
assertEquals(A_VALUE, rs.getString(1));
assertEquals(C_VALUE, rs.getString(2));
assertEquals(1, rs.getLong(3));
assertTrue(rs.next());
assertEquals(A_VALUE, rs.getString(1));
assertEquals(E_VALUE, rs.getString(2));
assertEquals(1, rs.getLong(3));
assertFalse(rs.next());
} finally {
if (admin != null) {
admin.close();
}
conn.close();
}
}
@Test
public void testLocalIndexScanAfterRegionSplit() throws Exception {
createBaseTable(TestUtil.DEFAULT_DATA_TABLE_NAME, null, "('e','j','o')");
Connection conn1 = DriverManager.getConnection(getUrl());
try{
String[] strings = {"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"};
for (int i = 0; i < 26; i++) {
conn1.createStatement().execute(
"UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_NAME + " values('"+strings[i]+"'," + i + ","
+ (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
}
conn1.commit();
conn1.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_NAME + "(v1)");
conn1.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + "_2 ON " + TestUtil.DEFAULT_DATA_TABLE_NAME + "(k3)");
ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_NAME);
assertTrue(rs.next());
HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
for (int i = 1; i < 5; i++) {
admin.split(Bytes.toBytes(TestUtil.DEFAULT_DATA_TABLE_NAME), ByteUtil.concat(Bytes.toBytes(strings[3*i])));
List<HRegionInfo> regionsOfUserTable =
MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(), admin.getConnection(),
TableName.valueOf(TestUtil.DEFAULT_DATA_TABLE_NAME), false);
while (regionsOfUserTable.size() != (4+i)) {
Thread.sleep(100);
regionsOfUserTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
admin.getConnection(), TableName.valueOf(TestUtil.DEFAULT_DATA_TABLE_NAME), false);
}
assertEquals(4+i, regionsOfUserTable.size());
TableName indexTable =
TableName.valueOf(MetaDataUtil.getLocalIndexTableName(TestUtil.DEFAULT_DATA_TABLE_NAME));
List<HRegionInfo> regionsOfIndexTable =
MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
admin.getConnection(), indexTable, false);
while (regionsOfIndexTable.size() != (4 + i)) {
Thread.sleep(100);
regionsOfIndexTable = MetaTableAccessor.getTableRegions(getUtility().getZooKeeperWatcher(),
admin.getConnection(), indexTable, false);
}
assertEquals(4 + i, regionsOfIndexTable.size());
String query = "SELECT t_id,k1,v1 FROM " + TestUtil.DEFAULT_DATA_TABLE_NAME;
rs = conn1.createStatement().executeQuery(query);
Thread.sleep(1000);
for (int j = 0; j < 26; j++) {
assertTrue(rs.next());
assertEquals(strings[25-j], rs.getString("t_id"));
assertEquals(25-j, rs.getInt("k1"));
assertEquals(strings[j], rs.getString("V1"));
}
rs = conn1.createStatement().executeQuery("EXPLAIN " + query);
assertEquals(
"CLIENT PARALLEL " + (4 + i) + "-WAY RANGE SCAN OVER "
+ MetaDataUtil.getLocalIndexTableName(TestUtil.DEFAULT_DATA_TABLE_NAME) + " [-32768]\n"
+ " SERVER FILTER BY FIRST KEY ONLY\n"
+ "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
query = "SELECT t_id,k1,k3 FROM " + TestUtil.DEFAULT_DATA_TABLE_NAME;
rs = conn1.createStatement().executeQuery("EXPLAIN "+query);
assertEquals(
"CLIENT PARALLEL "
+ ((strings[3 * i].compareTo("j") < 0) ? (4 + i) : (4 + i - 1))
+ "-WAY RANGE SCAN OVER "
+ MetaDataUtil.getLocalIndexTableName(TestUtil.DEFAULT_DATA_TABLE_NAME) + " [-32767]\n"
+ " SERVER FILTER BY FIRST KEY ONLY\n"
+ "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
rs = conn1.createStatement().executeQuery(query);
Thread.sleep(1000);
for (int j = 0; j < 26; j++) {
assertTrue(rs.next());
assertEquals(strings[j], rs.getString("t_id"));
assertEquals(j, rs.getInt("k1"));
assertEquals(j+2, rs.getInt("k3"));
}
}
} finally {
conn1.close();
}
}
@Test
public void testLocalIndexStateWhenSplittingInProgress() throws Exception {
createBaseTable(TestUtil.DEFAULT_DATA_TABLE_NAME+"2", null, "('e','j','o')");
Connection conn1 = DriverManager.getConnection(getUrl());
try{
String[] strings = {"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"};
for (int i = 0; i < 26; i++) {
conn1.createStatement().execute(
"UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_NAME+"2" + " values('"+strings[i]+"'," + i + ","
+ (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
}
conn1.commit();
conn1.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_NAME+"2" + "(v1)");
conn1.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + "_2 ON " + TestUtil.DEFAULT_DATA_TABLE_NAME+"2" + "(k3)");
ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_NAME+"2");
assertTrue(rs.next());
HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
HTableDescriptor tableDesc = admin.getTableDescriptor(TableName.valueOf(TestUtil.DEFAULT_DATA_TABLE_NAME+"2"));
tableDesc.removeCoprocessor(LocalIndexSplitter.class.getName());
tableDesc.addCoprocessor(MockedLocalIndexSplitter.class.getName(), null,
1, null);
admin.disableTable(tableDesc.getTableName());
admin.modifyTable(tableDesc.getTableName(), tableDesc);
admin.enableTable(tableDesc.getTableName());
TableName indexTable =
TableName.valueOf(MetaDataUtil.getLocalIndexTableName(TestUtil.DEFAULT_DATA_TABLE_NAME+"2"));
HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable);
indexTableDesc.removeCoprocessor(IndexHalfStoreFileReaderGenerator.class.getName());
indexTableDesc.addCoprocessor(MockedIndexHalfStoreFileReaderGenerator.class.getName(), null,
1, null);
admin.disableTable(indexTable);
admin.modifyTable(indexTable, indexTableDesc);
admin.enableTable(indexTable);
admin.split(Bytes.toBytes(TestUtil.DEFAULT_DATA_TABLE_NAME+"2"), ByteUtil.concat(Bytes.toBytes(strings[3])));
List<HRegionInfo> regionsOfUserTable =
admin.getTableRegions(TableName.valueOf(TestUtil.DEFAULT_DATA_TABLE_NAME+"2"));
while (regionsOfUserTable.size() != 5) {
Thread.sleep(100);
regionsOfUserTable = admin.getTableRegions(TableName.valueOf(TestUtil.DEFAULT_DATA_TABLE_NAME+"2"));
}
assertEquals(5, regionsOfUserTable.size());
List<HRegionInfo> regionsOfIndexTable = admin.getTableRegions(indexTable);
while (regionsOfIndexTable.size() != 5) {
Thread.sleep(100);
regionsOfIndexTable = admin.getTableRegions(indexTable);
}
assertEquals(5, regionsOfIndexTable.size());
latch1.await();
// Verify the metadata for index is correct.
rs = conn1.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), TestUtil.DEFAULT_INDEX_TABLE_NAME,
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
assertEquals(TestUtil.DEFAULT_INDEX_TABLE_NAME, rs.getString(3));
assertEquals(PIndexState.INACTIVE.toString(), rs.getString("INDEX_STATE"));
assertFalse(rs.next());
rs = conn1.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), TestUtil.DEFAULT_INDEX_TABLE_NAME+"_2",
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
assertEquals(TestUtil.DEFAULT_INDEX_TABLE_NAME+"_2", rs.getString(3));
assertEquals(PIndexState.INACTIVE.toString(), rs.getString("INDEX_STATE"));
assertFalse(rs.next());
String query = "SELECT t_id,k1,v1 FROM " + TestUtil.DEFAULT_DATA_TABLE_NAME+"2";
rs = conn1.createStatement().executeQuery("EXPLAIN " + query);
assertEquals("CLIENT PARALLEL " + 1 + "-WAY FULL SCAN OVER " + TestUtil.DEFAULT_DATA_TABLE_NAME+"2",
QueryUtil.getExplainPlan(rs));
latch2.countDown();
} finally {
conn1.close();
latch1.countDown();
latch2.countDown();
}
}
@Test
public void testSplitWithCachedMeta() throws Exception {
// Tests that you don't get an ambiguous column exception when using the same alias as the column name
String query = "SELECT a_string, b_string, count(1) FROM atable WHERE organization_id=? and entity_id<=? GROUP BY a_string,b_string";
Properties props = new Properties(TEST_PROPERTIES);
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 2)); // Execute at timestamp 2
Connection conn = DriverManager.getConnection(PHOENIX_JDBC_URL, props);
HBaseAdmin admin = null;
try {
PreparedStatement statement = conn.prepareStatement(query);
statement.setString(1, tenantId);
statement.setString(2,ROW4);
ResultSet rs = statement.executeQuery();
assertTrue(rs.next());
assertEquals(A_VALUE, rs.getString(1));
assertEquals(B_VALUE, rs.getString(2));
assertEquals(2, rs.getLong(3));
assertTrue(rs.next());
assertEquals(A_VALUE, rs.getString(1));
assertEquals(C_VALUE, rs.getString(2));
assertEquals(1, rs.getLong(3));
assertTrue(rs.next());
assertEquals(A_VALUE, rs.getString(1));
assertEquals(E_VALUE, rs.getString(2));
assertEquals(1, rs.getLong(3));
assertFalse(rs.next());
byte[] tableName = Bytes.toBytes(ATABLE_NAME);
admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
HTable htable = (HTable)conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableName);
htable.clearRegionCache();
int nRegions = htable.getRegionLocations().size();
admin.split(tableName, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A" + Character.valueOf((char)('3' + nextRunCount()))+ ts))); // vary split point with test run
int retryCount = 0;
do {
Thread.sleep(2000);
retryCount++;
//htable.clearRegionCache();
} while (retryCount < 10 && htable.getRegionLocations().size() == nRegions);
assertNotEquals(nRegions,htable.getRegionLocations().size());
statement.setString(1, tenantId);
rs = statement.executeQuery();
assertTrue(rs.next());
assertEquals(A_VALUE, rs.getString(1));
assertEquals(B_VALUE, rs.getString(2));
assertEquals(2, rs.getLong(3));
assertTrue(rs.next());
assertEquals(A_VALUE, rs.getString(1));
assertEquals(C_VALUE, rs.getString(2));
assertEquals(1, rs.getLong(3));
assertTrue(rs.next());
assertEquals(A_VALUE, rs.getString(1));
assertEquals(E_VALUE, rs.getString(2));
assertEquals(1, rs.getLong(3));
assertFalse(rs.next());
} finally {
admin.close();
conn.close();
}
}
private void doSplit(HBaseAdmin admin, TableName tableName) throws IOException {
LOG.trace("Splitting " + tableName);
admin.split(tableName);
LOG.trace("Split " + tableName);
}