下面列出了org.apache.hadoop.hbase.client.Connection#getRegionLocator ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Splits a region
* @param r Region to split.
* @return List of region locations
*/
private List<HRegionLocation> splitRegion(final RegionInfo r)
throws IOException, InterruptedException, ExecutionException {
List<HRegionLocation> locations = new ArrayList<>();
// Split this table in two.
Admin admin = TEST_UTIL.getAdmin();
Connection connection = TEST_UTIL.getConnection();
admin.splitRegionAsync(r.getEncodedNameAsBytes()).get();
admin.close();
PairOfSameType<RegionInfo> regions = waitOnDaughters(r);
if (regions != null) {
try (RegionLocator rl = connection.getRegionLocator(r.getTable())) {
locations.add(rl.getRegionLocation(regions.getFirst().getEncodedNameAsBytes()));
locations.add(rl.getRegionLocation(regions.getSecond().getEncodedNameAsBytes()));
}
return locations;
}
return locations;
}
/**
* Count regions in <code>hbase:meta</code> for passed table.
* @param connection Connection object
* @param tableName table name to count regions for
* @return Count or regions in table <code>tableName</code>
*/
public static int getRegionCount(final Connection connection, final TableName tableName)
throws IOException {
try (RegionLocator locator = connection.getRegionLocator(tableName)) {
List<HRegionLocation> locations = locator.getAllRegionLocations();
return locations == null ? 0 : locations.size();
}
}
private static List<byte[]> genRegionKeys(Connection conn, TableName table) throws IOException {
RegionLocator locator = conn.getRegionLocator(table);
Pair<byte[][],byte[][]> regionBounds = locator.getStartEndKeys();
//
// Load and sort all region bounds
//
List<byte[]> regionKeys = new ArrayList<byte[]>();
regionKeys.addAll(Arrays.asList(regionBounds.getFirst()));
regionKeys.addAll(Arrays.asList(regionBounds.getSecond()));
regionKeys.sort(Bytes.BYTES_COMPARATOR);
//
// Start key of the first region and end key of the last region are 'nulls', we need
// to replace those with something else
//
// INFO(hbs): this implies that prefix is between 0x00 and 0xFF
regionKeys.remove(0);
regionKeys.set(0, ZERO_BYTES);
regionKeys.add(ONES_BYTES);
return regionKeys;
}
/** Returns a list of region locations for a given table and scan. */
static List<HRegionLocation> getRegionLocations(
Connection connection, String tableId, ByteKeyRange range) throws Exception {
byte[] startRow = range.getStartKey().getBytes();
byte[] stopRow = range.getEndKey().getBytes();
final List<HRegionLocation> regionLocations = new ArrayList<>();
final boolean scanWithNoLowerBound = startRow.length == 0;
final boolean scanWithNoUpperBound = stopRow.length == 0;
TableName tableName = TableName.valueOf(tableId);
RegionLocator regionLocator = connection.getRegionLocator(tableName);
List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations();
for (HRegionLocation regionLocation : tableRegionInfos) {
final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
boolean isLastRegion = endKey.length == 0;
// filters regions who are part of the scan
if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0)
&& (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
regionLocations.add(regionLocation);
}
}
return regionLocations;
}
/**
* Count regions in <code>hbase:meta</code> for passed table.
* @param connection Connection object
* @param tableName table name to count regions for
* @return Count or regions in table <code>tableName</code>
*/
public static int getRegionCount(final Connection connection, final TableName tableName)
throws IOException {
try (RegionLocator locator = connection.getRegionLocator(tableName)) {
List<HRegionLocation> locations = locator.getAllRegionLocations();
return locations == null ? 0 : locations.size();
}
}
@Override
public void doAnAction() throws Exception {
long iteration = numBulkLoads.getAndIncrement();
Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
iteration));
// create HFiles for different column families
FileSystem fs = UTIL.getTestFileSystem();
byte[] val = Bytes.toBytes(String.format("%010d", iteration));
Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (int i = 0; i < NUM_CFS; i++) {
Path hfile = new Path(dir, family(i));
byte[] fam = Bytes.toBytes(family(i));
createHFile(fs, hfile, fam, QUAL, val, 1000);
family2Files.put(fam, Collections.singletonList(hfile));
}
// bulk load HFiles
BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(tableName, family2Files);
final Connection conn = UTIL.getConnection();
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 5 == 0) {
// 5 * 50 = 250 open file handles!
try (RegionLocator locator = conn.getRegionLocator(tableName)) {
HRegionLocation loc = locator.getRegionLocation(Bytes.toBytes("aaa"), true);
conn.getAdmin().compactRegion(loc.getRegion().getRegionName());
numCompactions.incrementAndGet();
}
}
}
/**
* Allows subclasses to initialize the table information.
*
* @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close.
* @param tableName The {@link TableName} of the table to process.
* @throws IOException
*/
protected void initializeTable(Connection connection, TableName tableName) throws IOException {
if (this.table != null || this.connection != null) {
LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
"reference; TableInputFormatBase will not close these old references when done.");
}
this.table = connection.getTable(tableName);
this.regionLocator = connection.getRegionLocator(tableName);
this.connection = connection;
}
/**
* Allows subclasses to initialize the table information.
*
* @param connection The Connection to the HBase cluster. MUST be unmanaged. We will close.
* @param tableName The {@link TableName} of the table to process.
* @throws IOException
*/
protected void initializeTable(Connection connection, TableName tableName) throws IOException {
if (this.table != null || this.connection != null) {
LOG.warn("initializeTable called multiple times. Overwriting connection and table " +
"reference; TableInputFormatBase will not close these old references when done.");
}
this.table = connection.getTable(tableName);
this.regionLocator = connection.getRegionLocator(tableName);
this.admin = connection.getAdmin();
this.connection = connection;
}
private void warmUpConnectionCache(Connection connection, TableName tn) throws IOException {
try (RegionLocator locator = connection.getRegionLocator(tn)) {
LOG.info(
"Warmed up region location cache for " + tn
+ " got " + locator.getAllRegionLocations().size());
}
}
protected static int getReduceNumberOfRegions(String hbaseTableID) throws IOException{
SConfiguration configuration=SIDriver.driver().getConfiguration();
Connection connection=HBaseConnectionFactory.getInstance(configuration).getConnection();
int regions;
TableName tn = HBaseTableInfoFactory.getInstance(configuration).getTableInfo(hbaseTableID);
try(RegionLocator rl = connection.getRegionLocator(tn)){
List<HRegionLocation> allRegionLocations=rl.getAllRegionLocations();
regions = allRegionLocations.size();
}
return regions;
}
/**
* Computes size of each region for table and given column families.
* */
public HBaseRegionSizeCalculator(String tableName, Connection hbaseConnection) throws IOException {
Table table = null;
Admin admin = null;
try {
table = hbaseConnection.getTable(TableName.valueOf(tableName));
admin = hbaseConnection.getAdmin();
if (!enabled(table.getConfiguration())) {
logger.info("Region size calculation disabled.");
return;
}
logger.info("Calculating region sizes for table \"" + table.getName() + "\".");
// Get regions for table.
RegionLocator regionLocator = hbaseConnection.getRegionLocator(table.getName());
List<HRegionLocation> regionLocationList = regionLocator.getAllRegionLocations();
Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
for (HRegionLocation hRegionLocation : regionLocationList) {
tableRegions.add(hRegionLocation.getRegionInfo().getRegionName());
}
ClusterStatus clusterStatus = admin.getClusterStatus();
Collection<ServerName> servers = clusterStatus.getServers();
final long megaByte = 1024L * 1024L;
// Iterate all cluster regions, filter regions from our table and
// compute their size.
for (ServerName serverName : servers) {
ServerLoad serverLoad = clusterStatus.getLoad(serverName);
for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
byte[] regionId = regionLoad.getName();
if (tableRegions.contains(regionId)) {
long regionSizeBytes = regionLoad.getStorefileSizeMB() * megaByte;
sizeMap.put(regionId, regionSizeBytes);
countMap.put(regionId, new Pair<>(regionLoad.getStores(), regionLoad.getStorefiles()));
if (regionSizeBytes == 0L) {
logger.info("Region " + regionLoad.getNameAsString() + " has size " + regionSizeBytes);
}
}
}
}
} finally {
IOUtils.closeQuietly(admin);
}
}
/**
* Computes size of each region for table and given column families.
* */
public HBaseRegionSizeCalculator(String tableName, Connection hbaseConnection) throws IOException {
Table table = null;
Admin admin = null;
try {
table = hbaseConnection.getTable(TableName.valueOf(tableName));
admin = hbaseConnection.getAdmin();
if (!enabled(table.getConfiguration())) {
logger.info("Region size calculation disabled.");
return;
}
logger.info("Calculating region sizes for table \"" + table.getName() + "\".");
// Get regions for table.
RegionLocator regionLocator = hbaseConnection.getRegionLocator(table.getName());
List<HRegionLocation> regionLocationList = regionLocator.getAllRegionLocations();
Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
for (HRegionLocation hRegionLocation : regionLocationList) {
tableRegions.add(hRegionLocation.getRegionInfo().getRegionName());
}
ClusterStatus clusterStatus = admin.getClusterStatus();
Collection<ServerName> servers = clusterStatus.getServers();
final long megaByte = 1024L * 1024L;
// Iterate all cluster regions, filter regions from our table and
// compute their size.
for (ServerName serverName : servers) {
ServerLoad serverLoad = clusterStatus.getLoad(serverName);
for (RegionLoad regionLoad : serverLoad.getRegionsLoad().values()) {
byte[] regionId = regionLoad.getName();
if (tableRegions.contains(regionId)) {
long regionSizeBytes = regionLoad.getStorefileSizeMB() * megaByte;
sizeMap.put(regionId, regionSizeBytes);
countMap.put(regionId, new Pair<>(regionLoad.getStores(), regionLoad.getStorefiles()));
if (regionSizeBytes == 0L) {
logger.info("Region " + regionLoad.getNameAsString() + " has size " + regionSizeBytes);
}
}
}
}
} finally {
IOUtils.closeQuietly(admin);
}
}
private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
throws Exception {
// tests having edits from a disabled or dropped table is handled correctly by skipping those
// entries and further edits after the edits from dropped/disabled table can be replicated
// without problems.
final TableName tableName = TableName.valueOf(
name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication);
HTableDescriptor htd = HTU.createTableDescriptor(tableName);
int regionReplication = 3;
htd.setRegionReplication(regionReplication);
HTU.deleteTableIfAny(tableName);
HTU.getAdmin().createTable(htd);
TableName toBeDisabledTable = TableName.valueOf(
dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
HTU.deleteTableIfAny(toBeDisabledTable);
htd = HTU.createTableDescriptor(TableName.valueOf(toBeDisabledTable.toString()),
HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
HColumnDescriptor.DEFAULT_KEEP_DELETED);
htd.setRegionReplication(regionReplication);
HTU.getAdmin().createTable(htd);
// both tables are created, now pause replication
HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
// now that the replication is disabled, write to the table to be dropped, then drop the table.
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName);
Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes();
Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
.setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
Entry entry = new Entry(
new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
new WALEdit()
.add(cell));
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
if (dropTable) {
HTU.getAdmin().deleteTable(toBeDisabledTable);
} else if (disableReplication) {
htd.setRegionReplication(regionReplication - 2);
HTU.getAdmin().modifyTable(htd);
HTU.getAdmin().enableTable(toBeDisabledTable);
}
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
MetricsSource metrics = mock(MetricsSource.class);
ReplicationEndpoint.Context ctx =
new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
HTU.getTestFileSystem(), ServerRegionReplicaUtil.getReplicationPeerId(),
UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
.getPeer(ServerRegionReplicaUtil.getReplicationPeerId()),
metrics, rs.getTableDescriptors(), rs);
RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
rrpe.init(ctx);
rrpe.start();
ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
repCtx.setEntries(Lists.newArrayList(entry, entry));
assertTrue(rrpe.replicate(repCtx));
verify(metrics, times(1)).incrLogEditsFiltered(eq(2L));
rrpe.stop();
if (disableReplication) {
// enable replication again so that we can verify replication
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
htd.setRegionReplication(regionReplication);
HTU.getAdmin().modifyTable(htd);
HTU.getAdmin().enableTable(toBeDisabledTable);
}
try {
// load some data to the to-be-dropped table
// load the data to the table
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
// now enable the replication
HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
verifyReplication(tableName, regionReplication, 0, 1000);
} finally {
table.close();
rl.close();
tableToBeDisabled.close();
HTU.deleteTableIfAny(toBeDisabledTable);
connection.close();
}
}
public static int getMetaRSPort(Connection connection) throws IOException {
try (RegionLocator locator = connection.getRegionLocator(TableName.META_TABLE_NAME)) {
return locator.getRegionLocation(Bytes.toBytes("")).getPort();
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(3);
REST_TEST_UTIL.startServletContainer(TEST_UTIL.getConfiguration());
client = new Client(new Cluster().add("localhost",
REST_TEST_UTIL.getServletPort()));
context = JAXBContext.newInstance(
TableModel.class,
TableInfoModel.class,
TableListModel.class,
TableRegionModel.class);
TEST_UTIL.createMultiRegionTable(TABLE, Bytes.toBytes(COLUMN_FAMILY), NUM_REGIONS);
byte[] k = new byte[3];
byte [][] famAndQf = CellUtil.parseColumn(Bytes.toBytes(COLUMN));
List<Put> puts = new ArrayList<>();
for (byte b1 = 'a'; b1 < 'z'; b1++) {
for (byte b2 = 'a'; b2 < 'z'; b2++) {
for (byte b3 = 'a'; b3 < 'z'; b3++) {
k[0] = b1;
k[1] = b2;
k[2] = b3;
Put put = new Put(k);
put.setDurability(Durability.SKIP_WAL);
put.addColumn(famAndQf[0], famAndQf[1], k);
puts.add(put);
}
}
}
Connection connection = TEST_UTIL.getConnection();
Table table = connection.getTable(TABLE);
table.put(puts);
table.close();
RegionLocator regionLocator = connection.getRegionLocator(TABLE);
List<HRegionLocation> m = regionLocator.getAllRegionLocations();
// should have four regions now
assertEquals(NUM_REGIONS, m.size());
regionMap = m;
LOG.error("regions: " + regionMap);
regionLocator.close();
}