下面列出了怎么用org.apache.hadoop.hbase.client.RegionLocator的API类实例代码及写法,或者点击链接到github查看源代码。
protected static void binFullRange(
final Map<HRegionLocation, Map<HRegionInfo, List<ByteArrayRange>>> binnedRanges,
final RegionLocator regionLocator) throws IOException {
final List<HRegionLocation> locations = regionLocator.getAllRegionLocations();
for (final HRegionLocation location : locations) {
Map<HRegionInfo, List<ByteArrayRange>> regionInfoMap = binnedRanges.get(location);
if (regionInfoMap == null) {
regionInfoMap = new HashMap<>();
binnedRanges.put(location, regionInfoMap);
}
final HRegionInfo regionInfo = location.getRegionInfo();
List<ByteArrayRange> rangeList = regionInfoMap.get(regionInfo);
if (rangeList == null) {
rangeList = new ArrayList<>();
regionInfoMap.put(regionInfo, rangeList);
}
final ByteArrayRange regionRange =
new ByteArrayRange(regionInfo.getStartKey(), regionInfo.getEndKey());
rangeList.add(regionRange);
}
}
@Test
public void testSimpleTestCase() throws Exception {
RegionLocator regionLocator = mockRegionLocator("region1", "region2", "region3");
Admin admin = mockAdmin(
mockRegion("region1", 123),
mockRegion("region3", 1232),
mockRegion("region2", 54321)
);
RegionSizeCalculator calculator = new RegionSizeCalculator(regionLocator, admin);
assertEquals(123 * megabyte, calculator.getRegionSize(Bytes.toBytes("region1")));
assertEquals(54321 * megabyte, calculator.getRegionSize(Bytes.toBytes("region2")));
assertEquals(1232 * megabyte, calculator.getRegionSize(Bytes.toBytes("region3")));
// if regionCalculator does not know about a region, it should return 0
assertEquals(0 * megabyte, calculator.getRegionSize(Bytes.toBytes("otherTableRegion")));
assertEquals(3, calculator.getRegionSizeMap().size());
}
@Test
public void testMove() throws Exception {
List<HRegionLocation> regions;
try (RegionLocator locator = systemUserConnection.getRegionLocator(TEST_TABLE)) {
regions = locator.getAllRegionLocations();
}
HRegionLocation location = regions.get(0);
final RegionInfo hri = location.getRegion();
final ServerName server = location.getServerName();
AccessTestAction action = new AccessTestAction() {
@Override
public Object run() throws Exception {
ACCESS_CONTROLLER.preMove(ObserverContextImpl.createAndPrepare(CP_ENV),
hri, server, server);
return null;
}
};
verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_OWNER, USER_GROUP_ADMIN);
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_GROUP_READ,
USER_GROUP_WRITE, USER_GROUP_CREATE);
}
@Test
public void testUnassign() throws Exception {
List<HRegionLocation> regions;
try (RegionLocator locator = systemUserConnection.getRegionLocator(TEST_TABLE)) {
regions = locator.getAllRegionLocations();
}
HRegionLocation location = regions.get(0);
final RegionInfo hri = location.getRegion();
AccessTestAction action = new AccessTestAction() {
@Override
public Object run() throws Exception {
ACCESS_CONTROLLER.preUnassign(ObserverContextImpl.createAndPrepare(CP_ENV), hri, false);
return null;
}
};
verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_OWNER, USER_GROUP_ADMIN);
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_GROUP_READ,
USER_GROUP_WRITE, USER_GROUP_CREATE);
}
@Test
public void testRegionOffline() throws Exception {
List<HRegionLocation> regions;
try (RegionLocator locator = systemUserConnection.getRegionLocator(TEST_TABLE)) {
regions = locator.getAllRegionLocations();
}
HRegionLocation location = regions.get(0);
final RegionInfo hri = location.getRegion();
AccessTestAction action = new AccessTestAction() {
@Override
public Object run() throws Exception {
ACCESS_CONTROLLER.preRegionOffline(ObserverContextImpl.createAndPrepare(CP_ENV), hri);
return null;
}
};
verifyAllowed(action, SUPERUSER, USER_ADMIN, USER_OWNER, USER_GROUP_ADMIN);
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_GROUP_READ,
USER_GROUP_WRITE, USER_GROUP_CREATE);
}
@Override
public void setup(Context context) throws IOException {
cfRenameMap = createCfRenameMap(context.getConfiguration());
filter = instantiateFilter(context.getConfiguration());
int reduceNum = context.getNumReduceTasks();
Configuration conf = context.getConfiguration();
TableName tableName = TableName.valueOf(context.getConfiguration().get(TABLE_NAME));
try (Connection conn = ConnectionFactory.createConnection(conf);
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
byte[][] startKeys = regionLocator.getStartKeys();
if (startKeys.length != reduceNum) {
throw new IOException("Region split after job initialization");
}
CellWritableComparable[] startKeyWraps =
new CellWritableComparable[startKeys.length - 1];
for (int i = 1; i < startKeys.length; ++i) {
startKeyWraps[i - 1] =
new CellWritableComparable(KeyValueUtil.createFirstOnRow(startKeys[i]));
}
CellWritableComparablePartitioner.START_KEYS = startKeyWraps;
}
}
/**
* Splits a region
* @param r Region to split.
* @return List of region locations
*/
private List<HRegionLocation> splitRegion(final RegionInfo r)
throws IOException, InterruptedException, ExecutionException {
List<HRegionLocation> locations = new ArrayList<>();
// Split this table in two.
Admin admin = TEST_UTIL.getAdmin();
Connection connection = TEST_UTIL.getConnection();
admin.splitRegionAsync(r.getEncodedNameAsBytes()).get();
admin.close();
PairOfSameType<RegionInfo> regions = waitOnDaughters(r);
if (regions != null) {
try (RegionLocator rl = connection.getRegionLocator(r.getTable())) {
locations.add(rl.getRegionLocation(regions.getFirst().getEncodedNameAsBytes()));
locations.add(rl.getRegionLocation(regions.getSecond().getEncodedNameAsBytes()));
}
return locations;
}
return locations;
}
protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) {
HRegionLocation cached = null, real = null;
try (RegionLocator locator = connection.getRegionLocator(tableName)) {
cached = locator.getRegionLocation(rowKey, false);
real = locator.getRegionLocation(rowKey, true);
} catch (Throwable t) {
// Cannot obtain region information for another catch block - too bad!
}
String result = "no information can be obtained";
if (cached != null) {
result = "cached: " + cached.toString();
}
if (real != null && real.getServerName() != null) {
if (cached != null && cached.getServerName() != null && real.equals(cached)) {
result += "; cache is up to date";
} else {
result = (cached != null) ? (result + "; ") : "";
result += "real: " + real.toString();
}
}
return result;
}
private void verifyBounds(List<byte[]> expectedBounds, TableName tableName)
throws Exception {
// Get region boundaries from the cluster and verify their endpoints
final int numRegions = expectedBounds.size()-1;
try (Table table = UTIL.getConnection().getTable(tableName);
RegionLocator locator = UTIL.getConnection().getRegionLocator(tableName)) {
final List<HRegionLocation> regionInfoMap = locator.getAllRegionLocations();
assertEquals(numRegions, regionInfoMap.size());
for (HRegionLocation entry : regionInfoMap) {
final RegionInfo regionInfo = entry.getRegion();
byte[] regionStart = regionInfo.getStartKey();
byte[] regionEnd = regionInfo.getEndKey();
// This region's start key should be one of the region boundaries
int startBoundaryIndex = indexOfBytes(expectedBounds, regionStart);
assertNotSame(-1, startBoundaryIndex);
// This region's end key should be the region boundary that comes
// after the starting boundary.
byte[] expectedRegionEnd = expectedBounds.get(startBoundaryIndex + 1);
assertEquals(0, Bytes.compareTo(regionEnd, expectedRegionEnd));
}
}
}
@BeforeClass
public static void before() throws Exception {
HTU.startMiniCluster(NB_SERVERS);
final TableName tableName = TableName.valueOf(TestRegionServerNoMaster.class.getSimpleName());
// Create table then get the single region for our new table.
table = HTU.createTable(tableName,HConstants.CATALOG_FAMILY);
Put p = new Put(row);
p.addColumn(HConstants.CATALOG_FAMILY, row, row);
table.put(p);
try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
hri = locator.getRegionLocation(row, false).getRegion();
}
regionName = hri.getRegionName();
stopMasterAndAssignMeta(HTU);
}
/** verify region boundaries obtained from HTable.getStartEndKeys() */
void verifyRegionsUsingHTable() throws IOException {
Table table = null;
try {
// HTable.getStartEndKeys()
table = connection.getTable(tableName);
try (RegionLocator rl = connection.getRegionLocator(tableName)) {
Pair<byte[][], byte[][]> keys = rl.getStartEndKeys();
verifyStartEndKeys(keys);
Set<RegionInfo> regions = new TreeSet<>(RegionInfo.COMPARATOR);
for (HRegionLocation loc : rl.getAllRegionLocations()) {
regions.add(loc.getRegion());
}
verifyTableRegions(regions);
}
} finally {
IOUtils.closeQuietly(table);
}
}
@BeforeClass
public static void before() throws Exception {
// Reduce the hdfs block size and prefetch to trigger the file-link reopen
// when the file is moved to archive (e.g. compaction)
HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192);
HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1);
HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
HTU.startMiniCluster(NB_SERVERS);
final TableName tableName = TableName.valueOf(TestRegionReplicas.class.getSimpleName());
// Create table then get the single region for our new table.
table = HTU.createTable(tableName, f);
try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
hriPrimary = locator.getRegionLocation(row, false).getRegion();
}
// mock a secondary region info to open
hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
// No master
TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
}
@Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
public void testJobConfiguration() throws Exception {
Configuration conf = new Configuration(this.util.getConfiguration());
conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration")
.toString());
Job job = new Job(conf);
job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
Table table = Mockito.mock(Table.class);
RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
setupMockStartKeys(regionLocator);
setupMockTableName(regionLocator);
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
assertEquals(job.getNumReduceTasks(), 4);
}
/**
* Return the start keys of all of the regions in this table,
* as a list of ImmutableBytesWritable.
*/
private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table) throws IOException {
byte[][] byteKeys = table.getStartKeys();
ArrayList<ImmutableBytesWritable> ret = new ArrayList<ImmutableBytesWritable>(byteKeys.length);
for (byte[] byteKey : byteKeys) {
ret.add(new ImmutableBytesWritable(byteKey));
}
return ret;
}
protected static int getReduceNumberOfRegions(String hbaseTableID) throws IOException{
SConfiguration configuration=SIDriver.driver().getConfiguration();
Connection connection=HBaseConnectionFactory.getInstance(configuration).getConnection();
int regions;
TableName tn = HBaseTableInfoFactory.getInstance(configuration).getTableInfo(hbaseTableID);
try(RegionLocator rl = connection.getRegionLocator(tn)){
List<HRegionLocation> allRegionLocations=rl.getAllRegionLocations();
regions = allRegionLocations.size();
}
return regions;
}
static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator,
Class<? extends OutputFormat<?, ?>> cls) throws IOException, UnsupportedEncodingException {
Configuration conf = job.getConfiguration();
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(cls);
// Based on the configured map output class, set the correct reducer to properly
// sort the incoming values.
// TODO it would be nice to pick one or the other of these formats.
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(PutSortReducer.class);
} else if (Text.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(TextSortReducer.class);
} else {
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}
conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(),
ResultSerialization.class.getName(), KeyValueSerialization.class.getName());
// Use table's region boundaries for TOP split points.
LOG.info("Looking up current regions for table " + tableDescriptor.getTableName());
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count");
job.setNumReduceTasks(startKeys.size());
configurePartitioner(job, startKeys);
// Set compression algorithms based on column families
configureCompression(conf, tableDescriptor);
configureBloomType(tableDescriptor, conf);
configureBlockSize(tableDescriptor, conf);
configureDataBlockEncoding(tableDescriptor, conf);
TableMapReduceUtil.addDependencyJars(job);
TableMapReduceUtil.initCredentials(job);
LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
}
private void addTableFragments(byte[] userData) throws IOException {
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(context.getDataSource()));
List <HRegionLocation> locations = regionLocator.getAllRegionLocations();
for (HRegionLocation location : locations) {
addFragment(location, userData);
}
regionLocator.close();
}
/**
* Count regions in <code>hbase:meta</code> for passed table.
* @param connection Connection object
* @param tableName table name to count regions for
* @return Count or regions in table <code>tableName</code>
*/
public static int getRegionCount(final Connection connection, final TableName tableName)
throws IOException {
try (RegionLocator locator = connection.getRegionLocator(tableName)) {
List<HRegionLocation> locations = locator.getAllRegionLocations();
return locations == null ? 0 : locations.size();
}
}
private static List<byte[]> genRegionKeys(Connection conn, TableName table) throws IOException {
RegionLocator locator = conn.getRegionLocator(table);
Pair<byte[][],byte[][]> regionBounds = locator.getStartEndKeys();
//
// Load and sort all region bounds
//
List<byte[]> regionKeys = new ArrayList<byte[]>();
regionKeys.addAll(Arrays.asList(regionBounds.getFirst()));
regionKeys.addAll(Arrays.asList(regionBounds.getSecond()));
regionKeys.sort(Bytes.BYTES_COMPARATOR);
//
// Start key of the first region and end key of the last region are 'nulls', we need
// to replace those with something else
//
// INFO(hbs): this implies that prefix is between 0x00 and 0xFF
regionKeys.remove(0);
regionKeys.set(0, ZERO_BYTES);
regionKeys.add(ONES_BYTES);
return regionKeys;
}
/** Returns a list of region locations for a given table and scan. */
static List<HRegionLocation> getRegionLocations(
Connection connection, String tableId, ByteKeyRange range) throws Exception {
byte[] startRow = range.getStartKey().getBytes();
byte[] stopRow = range.getEndKey().getBytes();
final List<HRegionLocation> regionLocations = new ArrayList<>();
final boolean scanWithNoLowerBound = startRow.length == 0;
final boolean scanWithNoUpperBound = stopRow.length == 0;
TableName tableName = TableName.valueOf(tableId);
RegionLocator regionLocator = connection.getRegionLocator(tableName);
List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations();
for (HRegionLocation regionLocation : tableRegionInfos) {
final byte[] startKey = regionLocation.getRegionInfo().getStartKey();
final byte[] endKey = regionLocation.getRegionInfo().getEndKey();
boolean isLastRegion = endKey.length == 0;
// filters regions who are part of the scan
if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0)
&& (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {
regionLocations.add(regionLocation);
}
}
return regionLocations;
}
/**
* Return the start keys of all of the regions in this table,
* as a list of ImmutableBytesWritable.
*/
private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table) throws IOException {
byte[][] byteKeys = table.getStartKeys();
ArrayList<ImmutableBytesWritable> ret = new ArrayList<ImmutableBytesWritable>(byteKeys.length);
for (byte[] byteKey : byteKeys) {
ret.add(new ImmutableBytesWritable(byteKey));
}
return ret;
}
private void init(RegionLocator regionLocator, Admin admin)
throws IOException {
if (!enabled(admin.getConfiguration())) {
LOG.info("Region size calculation disabled.");
return;
}
LOG.info("Calculating region sizes for table \"" + regionLocator.getName() + "\".");
//get regions for table
List<HRegionLocation> tableRegionInfos = regionLocator.getAllRegionLocations();
Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
for (HRegionLocation regionInfo : tableRegionInfos) {
tableRegions.add(regionInfo.getRegionInfo().getRegionName());
}
ClusterStatus clusterStatus = admin.getClusterStatus();
Collection<ServerName> servers = clusterStatus.getServers();
final long megaByte = 1024L * 1024L;
//iterate all cluster regions, filter regions from our table and compute their size
for (ServerName serverName: servers) {
ServerLoad serverLoad = clusterStatus.getLoad(serverName);
for (RegionLoad regionLoad: serverLoad.getRegionsLoad().values()) {
byte[] regionId = regionLoad.getName();
if (tableRegions.contains(regionId)) {
long regionSizeBytes = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * megaByte;
sizeMap.put(regionId, regionSizeBytes);
if (LOG.isDebugEnabled()) {
LOG.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeBytes);
}
}
}
}
LOG.debug("Region sizes calculated");
}
/**
* Sets up the actual job.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
public Job createSubmittableJob(String[] args) throws IOException {
Configuration conf = getConf();
String inputDirs = args[0];
String tabName = args[1];
conf.setStrings(TABLES_KEY, tabName);
conf.set(FileInputFormat.INPUT_DIR, inputDirs);
Job job =
Job.getInstance(conf,
conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime()));
job.setJarByClass(MapReduceHFileSplitterJob.class);
job.setInputFormatClass(HFileInputFormat.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
TableName tableName = TableName.valueOf(tabName);
job.setMapperClass(HFileCellMapper.class);
job.setReducerClass(CellSortReducer.class);
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputValueClass(MapReduceExtendedCell.class);
try (Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
}
LOG.debug("success configuring load incremental job");
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
} else {
throw new IOException("No bulk output directory specified");
}
return job;
}
private void setupIndexAndDataTable(Connection connection) throws SQLException, IOException {
pDataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable);
if (!isValidIndexTable(connection, qDataTable, indexTable, tenantId)) {
throw new IllegalArgumentException(
String.format(" %s is not an index table for %s for this connection",
indexTable, qDataTable));
}
pIndexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty()
? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable);
indexType = pIndexTable.getIndexType();
if (schemaName != null && !schemaName.isEmpty()) {
qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable);
} else {
qIndexTable = indexTable;
}
if (IndexType.LOCAL.equals(indexType)) {
isLocalIndexBuild = true;
try (org.apache.hadoop.hbase.client.Connection hConn
= getTemporaryHConnection(connection.unwrap(PhoenixConnection.class))) {
RegionLocator regionLocator = hConn
.getRegionLocator(TableName.valueOf(pIndexTable.getPhysicalName().getBytes()));
splitKeysBeforeJob = regionLocator.getStartKeys();
}
}
// We have to mark Disable index to Building before we can set it to Active in the reducer. Otherwise it errors out with
// index state transition error
changeDisabledIndexStateToBuiding(connection);
}
private RegionLocator getLocator(ByteBuffer tableName) {
try {
return connectionCache.getRegionLocator(byteBufferToByteArray(tableName));
} catch (IOException ie) {
throw new RuntimeException(ie);
}
}
@Test
public void testSingleMethod() throws Throwable {
try (Table table = util.getConnection().getTable(TEST_TABLE);
RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
Map<byte [], String> results = table.coprocessorService(PingService.class,
null, ROW_A,
new Batch.Call<PingService, String>() {
@Override
public String call(PingService instance) throws IOException {
CoprocessorRpcUtils.BlockingRpcCallback<PingResponse> rpcCallback =
new CoprocessorRpcUtils.BlockingRpcCallback<>();
instance.ping(null, PingRequest.newBuilder().build(), rpcCallback);
return rpcCallback.get().getPong();
}
});
// Should have gotten results for 1 of the three regions only since we specified
// rows from 1 region
assertEquals(1, results.size());
verifyRegionResults(locator, results, ROW_A);
final String name = "NAME";
results = hello(table, name, null, ROW_A);
// Should have gotten results for 1 of the three regions only since we specified
// rows from 1 region
assertEquals(1, results.size());
verifyRegionResults(locator, results, "Hello, NAME", ROW_A);
}
}
@Test
public void testCompoundCall() throws Throwable {
try (Table table = util.getConnection().getTable(TEST_TABLE);
RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
Map<byte [], String> results = compoundOfHelloAndPing(table, ROW_A, ROW_C);
verifyRegionResults(locator, results, "Hello, pong", ROW_A);
verifyRegionResults(locator, results, "Hello, pong", ROW_B);
verifyRegionResults(locator, results, "Hello, pong", ROW_C);
}
}
@Test
public void testNullCall() throws Throwable {
try (Table table = util.getConnection().getTable(TEST_TABLE);
RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
Map<byte[],String> results = hello(table, null, ROW_A, ROW_C);
verifyRegionResults(locator, results, "Who are you?", ROW_A);
verifyRegionResults(locator, results, "Who are you?", ROW_B);
verifyRegionResults(locator, results, "Who are you?", ROW_C);
}
}
@Test
public void testNullReturn() throws Throwable {
try (Table table = util.getConnection().getTable(TEST_TABLE);
RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
Map<byte[],String> results = hello(table, "nobody", ROW_A, ROW_C);
verifyRegionResults(locator, results, null, ROW_A);
verifyRegionResults(locator, results, null, ROW_B);
verifyRegionResults(locator, results, null, ROW_C);
}
}
private void verifyRegionResults(RegionLocator regionLocator, Map<byte[], String> results,
String expected, byte[] row) throws Exception {
for (Map.Entry<byte [], String> e: results.entrySet()) {
LOG.info("row=" + Bytes.toString(row) + ", expected=" + expected +
", result key=" + Bytes.toString(e.getKey()) +
", value=" + e.getValue());
}
HRegionLocation loc = regionLocator.getRegionLocation(row, true);
byte[] region = loc.getRegion().getRegionName();
assertTrue("Results should contain region " +
Bytes.toStringBinary(region) + " for row '" + Bytes.toStringBinary(row)+ "'",
results.containsKey(region));
assertEquals("Invalid result for row '"+Bytes.toStringBinary(row)+"'",
expected, results.get(region));
}