下面列出了怎么用org.apache.hadoop.hbase.util.RegionSplitter的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testWithMockedMapReduceWithSplitsPerRegion() throws Exception {
String snapshotName = "testWithMockedMapReduceMultiRegion";
final TableName tableName = TableName.valueOf(name.getMethodName());
try {
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
Configuration conf = UTIL.getConfiguration();
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
Job job = new Job(conf);
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
// test scan with startRow and stopRow
Scan scan = new Scan().withStartRow(bbc).withStopRow(yya);
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
tmpTableDir, new RegionSplitter.UniformSplit(), 5);
verifyWithMockedMapReduce(job, 10, 40, bbc, yya);
} finally {
UTIL.getAdmin().deleteSnapshot(snapshotName);
UTIL.deleteTable(tableName);
}
}
@Test
public void testWithMockedMapReduceWithNoStartRowStopRow() throws Exception {
String snapshotName = "testWithMockedMapReduceMultiRegion";
final TableName tableName = TableName.valueOf(name.getMethodName());
try {
createTableAndSnapshot(UTIL, tableName, snapshotName, getStartRow(), getEndRow(), 10);
Configuration conf = UTIL.getConfiguration();
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, false);
Job job = new Job(conf);
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
// test scan without startRow and stopRow
Scan scan2 = new Scan();
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan2,
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
tmpTableDir, new RegionSplitter.UniformSplit(), 5);
verifyWithMockedMapReduce(job, 10, 50, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_START_ROW);
} finally {
UTIL.getAdmin().deleteSnapshot(snapshotName);
UTIL.deleteTable(tableName);
}
}
private void createTable() throws Exception {
deleteTable();
LOG.info("Creating table");
Configuration conf = util.getConfiguration();
String encodingKey = String.format(ENCODING_KEY, this.getClass().getSimpleName());
DataBlockEncoding blockEncoding = DataBlockEncoding.valueOf(conf.get(encodingKey, "FAST_DIFF"));
TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
new TableDescriptorBuilder.ModifyableTableDescriptor(TABLE_NAME);
for (byte[] cf : dataGen.getColumnFamilies()) {
ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(cf);
familyDescriptor.setDataBlockEncoding(blockEncoding);
tableDescriptor.setColumnFamily(familyDescriptor);
}
int serverCount = util.getHBaseClusterInterface().getClusterMetrics()
.getLiveServerMetrics().size();
byte[][] splits = new RegionSplitter.HexStringSplit().split(serverCount * REGIONS_PER_SERVER);
util.getAdmin().createTable(tableDescriptor, splits);
LOG.info("Created table");
}
@Test
public void testCreateTableWithRegions() throws Exception {
ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes("cf"))
.build();
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(columnFamilyDescriptor)
.build();
SplitAlgorithm algo = new RegionSplitter.HexStringSplit();
byte[][] splits = algo.split(REGION_COUNT);
LOG.info(String.format("Creating table %s with %d splits.", TABLE_NAME, REGION_COUNT));
long startTime = System.currentTimeMillis();
try {
admin.createTable(tableDescriptor, splits);
LOG.info(String.format("Pre-split table created successfully in %dms.",
(System.currentTimeMillis() - startTime)));
} catch (IOException e) {
LOG.error("Failed to create table", e);
}
}
@Test
public void testSplit10_10() throws Exception {
int numRegions = 10;
int cardinality = 10;
RegionSplitter.SplitAlgorithm splitAlgorithm = new DecimalStringSplit(cardinality);
byte[][] splits = splitAlgorithm.split(numRegions);
assertEquals(numRegions - 1, splits.length);
int digits = 2;
assertEquals(String.format("%0" + digits + "d", 1), Bytes.toString(splits[0]));
assertEquals(String.format("%0" + digits + "d", 9), Bytes.toString(splits[numRegions - 2]));
}
@Test
public void testSplit3_10() throws Exception {
int numRegions = 3;
int cardinality = 10;
RegionSplitter.SplitAlgorithm splitAlgorithm = new DecimalStringSplit(cardinality);
byte[][] splits = splitAlgorithm.split(numRegions);
assertEquals(numRegions - 1, splits.length);
int digits = 2;
assertEquals(String.format("%0" + digits + "d", 3), Bytes.toString(splits[0]));
assertEquals(String.format("%0" + digits + "d", 6), Bytes.toString(splits[numRegions - 2]));
}
@Test
public void testSplit300_1000() throws Exception {
int numRegions = 300;
int cardinality = 1000;
RegionSplitter.SplitAlgorithm splitAlgorithm = new DecimalStringSplit(cardinality);
byte[][] splits = splitAlgorithm.split(numRegions);
assertEquals(numRegions - 1, splits.length);
int digits = 4;
assertEquals(String.format("%0" + digits + "d", 3), Bytes.toString(splits[0]));
assertEquals(String.format("%0" + digits + "d", 6), Bytes.toString(splits[1]));
assertEquals(String.format("%0" + digits + "d", 10), Bytes.toString(splits[2]));
assertEquals(String.format("%0" + digits + "d", 996), Bytes.toString(splits[numRegions - 2]));
}
@Test
public void testSplit10_10() throws Exception {
int numRegions = 10;
int cardinality = 10;
RegionSplitter.SplitAlgorithm splitAlgorithm = new DecimalStringSplit(cardinality);
byte[][] splits = splitAlgorithm.split(numRegions);
assertEquals(numRegions - 1, splits.length);
int digits = 2;
assertEquals(String.format("%0" + digits + "d", 1), Bytes.toString(splits[0]));
assertEquals(String.format("%0" + digits + "d", 9), Bytes.toString(splits[numRegions - 2]));
}
@Test
public void testSplit3_10() throws Exception {
int numRegions = 3;
int cardinality = 10;
RegionSplitter.SplitAlgorithm splitAlgorithm = new DecimalStringSplit(cardinality);
byte[][] splits = splitAlgorithm.split(numRegions);
assertEquals(numRegions - 1, splits.length);
int digits = 2;
assertEquals(String.format("%0" + digits + "d", 3), Bytes.toString(splits[0]));
assertEquals(String.format("%0" + digits + "d", 6), Bytes.toString(splits[numRegions - 2]));
}
@Test
public void testSplit300_1000() throws Exception {
int numRegions = 300;
int cardinality = 1000;
RegionSplitter.SplitAlgorithm splitAlgorithm = new DecimalStringSplit(cardinality);
byte[][] splits = splitAlgorithm.split(numRegions);
assertEquals(numRegions - 1, splits.length);
int digits = 4;
assertEquals(String.format("%0" + digits + "d", 3), Bytes.toString(splits[0]));
assertEquals(String.format("%0" + digits + "d", 6), Bytes.toString(splits[1]));
assertEquals(String.format("%0" + digits + "d", 10), Bytes.toString(splits[2]));
assertEquals(String.format("%0" + digits + "d", 996), Bytes.toString(splits[numRegions - 2]));
}
@Test
public void testSplit10_10() throws Exception {
int numRegions = 10;
int cardinality = 10;
RegionSplitter.SplitAlgorithm splitAlgorithm = new DecimalStringSplit(cardinality);
byte[][] splits = splitAlgorithm.split(numRegions);
assertEquals(numRegions - 1, splits.length);
int digits = 2;
assertEquals(String.format("%0" + digits + "d", 1), Bytes.toString(splits[0]));
assertEquals(String.format("%0" + digits + "d", 9), Bytes.toString(splits[numRegions - 2]));
}
@Test
public void testSplit3_10() throws Exception {
int numRegions = 3;
int cardinality = 10;
RegionSplitter.SplitAlgorithm splitAlgorithm = new DecimalStringSplit(cardinality);
byte[][] splits = splitAlgorithm.split(numRegions);
assertEquals(numRegions - 1, splits.length);
int digits = 2;
assertEquals(String.format("%0" + digits + "d", 3), Bytes.toString(splits[0]));
assertEquals(String.format("%0" + digits + "d", 6), Bytes.toString(splits[numRegions - 2]));
}
@Test
public void testSplit300_1000() throws Exception {
int numRegions = 300;
int cardinality = 1000;
RegionSplitter.SplitAlgorithm splitAlgorithm = new DecimalStringSplit(cardinality);
byte[][] splits = splitAlgorithm.split(numRegions);
assertEquals(numRegions - 1, splits.length);
int digits = 4;
assertEquals(String.format("%0" + digits + "d", 3), Bytes.toString(splits[0]));
assertEquals(String.format("%0" + digits + "d", 6), Bytes.toString(splits[1]));
assertEquals(String.format("%0" + digits + "d", 10), Bytes.toString(splits[2]));
assertEquals(String.format("%0" + digits + "d", 996), Bytes.toString(splits[numRegions - 2]));
}
@Test
public void testSplit10_10() throws Exception {
int numRegions = 10;
int cardinality = 10;
RegionSplitter.SplitAlgorithm splitAlgorithm = new DecimalStringSplit(cardinality);
byte[][] splits = splitAlgorithm.split(numRegions);
assertEquals(numRegions - 1, splits.length);
int digits = 2;
assertEquals(String.format("%0" + digits + "d", 1), Bytes.toString(splits[0]));
assertEquals(String.format("%0" + digits + "d", 9), Bytes.toString(splits[numRegions - 2]));
}
@Test
public void testSplit3_10() throws Exception {
int numRegions = 3;
int cardinality = 10;
RegionSplitter.SplitAlgorithm splitAlgorithm = new DecimalStringSplit(cardinality);
byte[][] splits = splitAlgorithm.split(numRegions);
assertEquals(numRegions - 1, splits.length);
int digits = 2;
assertEquals(String.format("%0" + digits + "d", 3), Bytes.toString(splits[0]));
assertEquals(String.format("%0" + digits + "d", 6), Bytes.toString(splits[numRegions - 2]));
}
@Test
public void testSplit300_1000() throws Exception {
int numRegions = 300;
int cardinality = 1000;
RegionSplitter.SplitAlgorithm splitAlgorithm = new DecimalStringSplit(cardinality);
byte[][] splits = splitAlgorithm.split(numRegions);
assertEquals(numRegions - 1, splits.length);
int digits = 4;
assertEquals(String.format("%0" + digits + "d", 3), Bytes.toString(splits[0]));
assertEquals(String.format("%0" + digits + "d", 6), Bytes.toString(splits[1]));
assertEquals(String.format("%0" + digits + "d", 10), Bytes.toString(splits[2]));
assertEquals(String.format("%0" + digits + "d", 996), Bytes.toString(splits[numRegions - 2]));
}
private void createWriteTable(int numberOfServers) throws IOException {
int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " +
"(current lower limit of regions per server is {} and you can change it with config {}).",
numberOfServers, numberOfRegions, regionsLowerLimit,
HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY);
ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder
.newBuilder(Bytes.toBytes(CANARY_TABLE_FAMILY_NAME)).setMaxVersions(1)
.setTimeToLive(writeDataTTL).build();
TableDescriptor desc = TableDescriptorBuilder.newBuilder(writeTableName)
.setColumnFamily(family).build();
byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
admin.createTable(desc, splits);
}
@Before
public void setUp() throws Exception {
tableDescriptor = HTU.createModifyableTableDescriptor(test.getMethodName());
admin = HTU.getAdmin();
cleanerChore = new MobFileCleanerChore();
familyDescriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
familyDescriptor.setMobEnabled(true);
familyDescriptor.setMobThreshold(mobLen);
familyDescriptor.setMaxVersions(1);
tableDescriptor.setColumnFamily(familyDescriptor);
RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
byte[][] splitKeys = splitAlgo.split(numRegions);
table = HTU.createTable(tableDescriptor, splitKeys).getName();
}
/**
* Creates a pre-split table for load testing. If the table already exists,
* logs a warning and continues.
* @return the number of regions the table was split into
*/
public static int createPreSplitLoadTestTable(Configuration conf,
TableDescriptor desc, ColumnFamilyDescriptor[] hcds,
int numRegionsPerServer) throws IOException {
return createPreSplitLoadTestTable(conf, desc, hcds,
new RegionSplitter.HexStringSplit(), numRegionsPerServer);
}
public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException {
String splitAlgoClassName = conf.get(SPLIT_ALGO);
if (splitAlgoClassName == null) {
return null;
}
try {
return Class.forName(splitAlgoClassName).asSubclass(RegionSplitter.SplitAlgorithm.class)
.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
NoSuchMethodException | InvocationTargetException e) {
throw new IOException("SplitAlgo class " + splitAlgoClassName + " is not found", e);
}
}
@Override
protected void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
try {
createTableAndSnapshot(
util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
JobConf job = new JobConf(util.getConfiguration());
// setLocalityEnabledTo is ignored no matter what is specified, so as to test the case that
// SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY is not explicitly specified
// and the default value is taken.
Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
if (numSplitsPerRegion > 1) {
TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(),
numSplitsPerRegion);
} else {
TableMapReduceUtil.initTableSnapshotMapJob(snapshotName,
COLUMNS, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false, tmpTableDir);
}
// mapred doesn't support start and end keys? o.O
verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
} finally {
util.getAdmin().deleteSnapshot(snapshotName);
util.deleteTable(tableName);
}
}
@Override
public void testWithMockedMapReduce(HBaseTestingUtility util, String snapshotName,
int numRegions, int numSplitsPerRegion, int expectedNumSplits, boolean setLocalityEnabledTo)
throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
try {
createTableAndSnapshot(
util, tableName, snapshotName, getStartRow(), getEndRow(), numRegions);
Configuration conf = util.getConfiguration();
conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, setLocalityEnabledTo);
Job job = new Job(conf);
Path tmpTableDir = util.getDataTestDirOnTestFS(snapshotName);
Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow()); // limit the scan
if (numSplitsPerRegion > 1) {
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false, tmpTableDir, new RegionSplitter.UniformSplit(),
numSplitsPerRegion);
} else {
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class,
NullWritable.class, job, false, tmpTableDir);
}
verifyWithMockedMapReduce(job, numRegions, expectedNumSplits, getStartRow(), getEndRow());
} finally {
util.getAdmin().deleteSnapshot(snapshotName);
util.deleteTable(tableName);
}
}
protected void createTable(TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor)
throws Exception {
deleteTable();
if (util.getHBaseClusterInterface() instanceof MiniHBaseCluster) {
LOG.warn("Test does not make a lot of sense for minicluster. Will set flush size low.");
tableDescriptor.setValue(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, "1048576");
}
byte[][] splits = new RegionSplitter.HexStringSplit().split(
util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size());
util.getAdmin().createTable(tableDescriptor, splits);
}
private byte[][] calculateRegionSplits(MongoURI uri, String tableName) throws Exception {
DBCollection collection = uri.connectDB().getCollection(uri.getCollection());
long size = collection.getStats().getLong("size");
long regionSize = ConfigUtil.getPresplitTableRegionSize(_conf);
int numRegions = (int) Math.min((size / regionSize) + 1, 4096);
if (numRegions > 1) {
log.info("Pre-splitting " + tableName + " into " + numRegions + " regions");
RegionSplitter.UniformSplit splitter = new RegionSplitter.UniformSplit();
return splitter.split(numRegions);
} else {
log.info("Not splitting " + tableName + ", because the data can fit into a single region");
return new byte[0][0];
}
}
@Override
public RegionSplitter.SplitAlgorithm createSplitter(int cardinality) {
return new RegionSplitter.HexStringSplit();
}
@Override
public RegionSplitter.SplitAlgorithm createSplitter(int cardinality) {
return new RegionSplitter.HexStringSplit();
}
@Override
public RegionSplitter.SplitAlgorithm createSplitter(int cardinality) {
return new DecimalStringSplit(cardinality);
}
@Override
public RegionSplitter.SplitAlgorithm createSplitter(int cardinality) {
return new RegionSplitter.UniformSplit();
}
public List<byte[]> split(int numRegions, int cardinality) {
List<byte[]> splitPointList = new ArrayList<>();
RegionSplitter.SplitAlgorithm splitter = createSplitter(cardinality);
Collections.addAll(splitPointList, splitter.split(numRegions));
return splitPointList;
}
@Override
public RegionSplitter.SplitAlgorithm createSplitter(int cardinality) {
return new RegionSplitter.HexStringSplit();
}