下面列出了怎么用org.apache.hadoop.hbase.Waiter的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Waits until there is only one log(the current writing one) in the replication queue
* @param numRs number of regionservers
*/
private void waitForLogAdvance(int numRs) throws Exception {
Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
Replication replicationService = (Replication) UTIL1.getHBaseCluster()
.getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
.getSources()) {
ReplicationSource source = (ReplicationSource) rsi;
if (!currentFile.equals(source.getCurrentPath())) {
return false;
}
}
}
return true;
}
});
}
@Test
public void testReplicationStatusSink() throws Exception {
try (Admin admin = UTIL2.getConnection().getAdmin()) {
ServerName server = UTIL2.getHBaseCluster().getRegionServer(0).getServerName();
ReplicationLoadSink loadSink = getLatestSinkMetric(admin, server);
//First checks if status of timestamp of last applied op is same as RS start, since no edits
//were replicated yet
Assert.assertEquals(loadSink.getTimestampStarted(), loadSink.getTimestampsOfLastAppliedOp());
//now insert some rows on source, so that it gets delivered to target
TestReplicationStatus.insertRowsOnSource();
long wait =
Waiter.waitFor(UTIL2.getConfiguration(), 10000, (Waiter.Predicate<Exception>) () -> {
ReplicationLoadSink loadSink1 = getLatestSinkMetric(admin, server);
return loadSink1.getTimestampsOfLastAppliedOp() > loadSink1.getTimestampStarted();
});
Assert.assertNotEquals(-1, wait);
}
}
private static void waitPeer(final String peerId,
ReplicationSourceManager manager, final boolean waitForSource) {
ReplicationPeers rp = manager.getReplicationPeers();
Waiter.waitFor(conf, 20000, () -> {
if (waitForSource) {
ReplicationSourceInterface rs = manager.getSource(peerId);
if (rs == null) {
return false;
}
if (rs instanceof ReplicationSourceDummy) {
return ((ReplicationSourceDummy)rs).isStartup();
}
return true;
} else {
return (rp.getPeer(peerId) != null);
}
});
}
/**
* Remove a peer and wait for it to get cleaned up
*/
private void removePeerAndWait(final String peerId) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
if (rp.getPeerStorage().listPeerIds().contains(peerId)) {
rp.getPeerStorage().removePeer(peerId);
try {
manager.removePeer(peerId);
} catch (Exception e) {
// ignore the failed exception and continue.
}
}
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
Collection<String> peers = rp.getPeerStorage().listPeerIds();
return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null)
&& (!peers.contains(peerId)) && manager.getSource(peerId) == null;
}
});
}
@Test
public void testRace() throws Exception {
ProcedureExecutor<MasterProcedureEnv> executor =
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
DummyProcedure p = new DummyProcedure();
long procId = executor.submitProcedure(p);
p.failureSet.await();
assertEquals(GetProcedureResultResponse.State.RUNNING, getState(procId));
p.canRollback.countDown();
UTIL.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return getState(procId) == GetProcedureResultResponse.State.FINISHED;
}
@Override
public String explainFailure() throws Exception {
return "Procedure pid=" + procId + " is still in " + getState(procId) +
" state, expected " + GetProcedureResultResponse.State.FINISHED;
}
});
}
private void waitForTableToEnterQuotaViolation(TableName tn) throws Exception {
// Verify that the RegionServer has the quota in violation
final HRegionServer rs = TEST_UTIL.getHBaseCluster().getRegionServer(0);
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000, 1000, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
Map<TableName,SpaceQuotaSnapshot> snapshots =
rs.getRegionServerSpaceQuotaManager().copyQuotaSnapshots();
SpaceQuotaSnapshot snapshot = snapshots.get(tn);
if (snapshot == null) {
LOG.info("Found no snapshot for " + tn);
return false;
}
LOG.info("Found snapshot " + snapshot);
return snapshot.getQuotaStatus().isInViolation();
}
});
}
@Test
public void testRegionSizesFromMaster() throws Exception {
final long tableSize = 1024L * 10L; // 10KB
final int numRegions = 10;
final TableName tn = helper.createTableWithRegions(numRegions);
// Will write at least `tableSize` data
helper.writeData(tn, tableSize);
final HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
final MasterQuotaManager quotaManager = master.getMasterQuotaManager();
// Make sure the master has all of the reports
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
Map<RegionInfo,Long> regionSizes = quotaManager.snapshotRegionSizes();
LOG.trace("Region sizes=" + regionSizes);
return numRegions == countRegionsForTable(tn, regionSizes) &&
tableSize <= getTableSize(tn, regionSizes);
}
});
Map<TableName, Long> sizes = TEST_UTIL.getAdmin().getSpaceQuotaTableSizes();
Long size = sizes.get(tn);
assertNotNull("No reported size for " + tn, size);
assertTrue("Reported table size was " + size, size.longValue() >= tableSize);
}
@Test
public void testAsyncSnapshotWillNotBlockSnapshotHFileCleaner() throws Exception {
// Write some data
Table table = UTIL.getConnection().getTable(TABLE_NAME);
for (int i = 0; i < 10; i++) {
Put put = new Put(Bytes.toBytes(i)).addColumn(TEST_FAM, Bytes.toBytes("q"), Bytes.toBytes(i));
table.put(put);
}
String snapshotName = "testAsyncSnapshotWillNotBlockSnapshotHFileCleaner01";
Future<Void> future =
UTIL.getAdmin().snapshotAsync(new org.apache.hadoop.hbase.client.SnapshotDescription(
snapshotName, TABLE_NAME, SnapshotType.FLUSH));
Waiter.waitFor(UTIL.getConfiguration(), 10 * 1000L, 200L,
() -> UTIL.getAdmin().listSnapshots(Pattern.compile(snapshotName)).size() == 1);
UTIL.waitFor(30000, () -> !master.getSnapshotManager().isTakingAnySnapshot());
}
private void stopServersAndWaitUntilProcessed(List<ServerName> currentFN) throws Exception {
for (ServerName sn : currentFN) {
for (JVMClusterUtil.RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
if (ServerName.isSameAddress(sn, rst.getRegionServer().getServerName())) {
LOG.info("Shutting down server: " + sn);
cluster.stopRegionServer(rst.getRegionServer().getServerName());
cluster.waitForRegionServerToStop(rst.getRegionServer().getServerName(), 60000);
}
}
}
// Wait until dead servers are processed.
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !master.getServerManager().areDeadServersInProgress();
}
});
assertEquals("Not all servers killed",
SLAVES - currentFN.size(), cluster.getLiveRegionServerThreads().size());
}
public static boolean waitForAssignment(AssignmentManager am, RegionInfo regionInfo)
throws IOException {
// This method can be called before the regionInfo has made it into the regionStateMap
// so wait around here a while.
Waiter.waitFor(am.getConfiguration(), 10000,
() -> am.getRegionStates().getRegionStateNode(regionInfo) != null);
RegionStateNode regionNode = am.getRegionStates().getRegionStateNode(regionInfo);
// Wait until the region has already been open, or we have a TRSP along with it.
Waiter.waitFor(am.getConfiguration(), 30000,
() -> regionNode.isInState(State.OPEN) || regionNode.isInTransition());
TransitRegionStateProcedure proc = regionNode.getProcedure();
regionNode.lock();
try {
if (regionNode.isInState(State.OPEN)) {
return true;
}
proc = regionNode.getProcedure();
} finally {
regionNode.unlock();
}
assertNotNull(proc);
ProcedureSyncWait.waitForProcedureToCompleteIOE(am.getMaster().getMasterProcedureExecutor(),
proc, 5L * 60 * 1000);
return true;
}
private void startCluster(int numRS) throws Exception {
SplitLogCounters.resetCounters();
LOG.info("Starting cluster");
conf.setLong("hbase.splitlog.max.resubmit", 0);
// Make the failure test faster
conf.setInt("zookeeper.recovery.retry", 0);
conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3);
conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
conf.set("hbase.wal.provider", getWalProvider());
StartMiniClusterOption option = StartMiniClusterOption.builder()
.numMasters(NUM_MASTERS).numRegionServers(numRS).build();
TEST_UTIL.startMiniHBaseCluster(option);
cluster = TEST_UTIL.getHBaseCluster();
LOG.info("Waiting for active/ready master");
cluster.waitForActiveAndReadyMaster();
master = cluster.getMaster();
TEST_UTIL.waitFor(120000, 200, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return cluster.getLiveRegionServerThreads().size() >= numRS;
}
});
}
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
RSGroupUtil.enableRSGroup(TEST_UTIL.getConfiguration());
TEST_UTIL.getConfiguration().set(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, "1");
StartMiniClusterOption option =
StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
TEST_UTIL.startMiniCluster(option);
cluster = TEST_UTIL.getHBaseCluster();
master = ((MiniHBaseCluster) cluster).getMaster();
master.balanceSwitch(false);
hbaseAdmin = TEST_UTIL.getAdmin();
// wait till the balancer is in online mode
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return master.isInitialized() &&
((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline() &&
master.getServerManager().getOnlineServersList().size() >= 3;
}
});
}
@Test
public void testNamespaceCreateAndAssign() throws Exception {
LOG.info("testNamespaceCreateAndAssign");
String nsName = TABLE_PREFIX + "_foo";
final TableName tableName = TableName.valueOf(nsName, TABLE_PREFIX + "_testCreateAndAssign");
RSGroupInfo appInfo = addGroup("appInfo", 1);
ADMIN.createNamespace(NamespaceDescriptor.create(nsName)
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "appInfo").build());
final TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build();
ADMIN.createTable(desc);
// wait for created table to be assigned
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return getTableRegionMap().get(desc.getTableName()) != null;
}
});
ServerName targetServer = getServerName(appInfo.getServers().iterator().next());
// verify it was assigned to the right group
Assert.assertEquals(1, ADMIN.getRegions(targetServer).size());
}
@Test
public void testDefaultNamespaceCreateAndAssign() throws Exception {
LOG.info("testDefaultNamespaceCreateAndAssign");
String tableName = TABLE_PREFIX + "_testCreateAndAssign";
ADMIN.modifyNamespace(NamespaceDescriptor.create("default")
.addConfiguration(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP, "default").build());
final TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build();
ADMIN.createTable(desc);
// wait for created table to be assigned
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return getTableRegionMap().get(desc.getTableName()) != null;
}
});
}
@Before
@Override
public void setUp() throws Exception {
utility = new HBaseTestingUtility();
Configuration conf = utility.getConfiguration();
RSGroupUtil.enableRSGroup(conf);
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, NUM_SLAVES_BASE);
conf.setInt("hbase.hfile.compaction.discharger.interval", 10);
utility.startMiniCluster(NUM_SLAVES_BASE);
MiniHBaseCluster cluster = utility.getHBaseCluster();
final HMaster master = cluster.getMaster();
//wait for balancer to come online
utility.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() {
return master.isInitialized() &&
((RSGroupBasedLoadBalancer) master.getLoadBalancer()).isOnline();
}
});
admin = utility.getAdmin();
}
private Pair<ServerName, RegionStateNode> createTableWithRegionSplitting(RSGroupInfo rsGroupInfo,
int tableRegionCount) throws Exception {
final byte[] familyNameBytes = Bytes.toBytes("f");
// All the regions created below will be assigned to the default group.
TEST_UTIL.createMultiRegionTable(tableName, familyNameBytes, tableRegionCount);
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
List<String> regions = getTableRegionMap().get(tableName);
if (regions == null) {
return false;
}
return getTableRegionMap().get(tableName).size() >= tableRegionCount;
}
});
return randomlySetOneRegionStateToSplitting(rsGroupInfo);
}
protected static void initialize() throws Exception {
ADMIN = new VerifyingRSGroupAdmin(TEST_UTIL.getConfiguration());
CLUSTER = TEST_UTIL.getHBaseCluster();
MASTER = TEST_UTIL.getMiniHBaseCluster().getMaster();
// wait for balancer to come online
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return MASTER.isInitialized() &&
((RSGroupBasedLoadBalancer) MASTER.getLoadBalancer()).isOnline();
}
});
ADMIN.balancerSwitch(false, true);
MasterCoprocessorHost host = MASTER.getMasterCoprocessorHost();
OBSERVER = (CPMasterObserver) host.findCoprocessor(CPMasterObserver.class.getName());
}
private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval,
long timems, boolean failIfTimeout) throws Exception {
long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return (ctr.sum() >= newval);
}
});
if( timeWaited > 0) {
// when not timed out
assertEquals(newval, ctr.sum());
}
return true;
}
@Test
public void removeClosedRegionFromConfigurationManager() throws Exception {
try (Connection connection = ConnectionFactory.createConnection(conf)) {
Admin admin = connection.getAdmin();
assertTrue("The open region doesn't register as a ConfigurationObserver",
rs1.getConfigurationManager().containsObserver(r1));
admin.move(r1name);
hbaseTestingUtility.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return rs1.getOnlineRegion(r1name) == null;
}
});
assertFalse("The closed region is not removed from ConfigurationManager",
rs1.getConfigurationManager().containsObserver(r1));
admin.move(r1name, rs1.getServerName());
hbaseTestingUtility.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return rs1.getOnlineRegion(r1name) != null;
}
});
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniCluster();
TEST_UTIL.createTable(TableName.valueOf("TestStatusResource"), Bytes.toBytes("D"));
TEST_UTIL.createTable(TableName.valueOf("TestStatusResource2"), Bytes.toBytes("D"));
REST_TEST_UTIL.startServletContainer(conf);
Cluster cluster = new Cluster();
cluster.add("localhost", REST_TEST_UTIL.getServletPort());
client = new Client(cluster);
context = JAXBContext.newInstance(StorageClusterStatusModel.class);
TEST_UTIL.waitFor(6000, new Waiter.Predicate<IOException>() {
@Override
public boolean evaluate() throws IOException {
return TEST_UTIL.getMiniHBaseCluster().getClusterMetrics().getAverageLoad() > 0;
}
});
}
/**
* Tests the replication scenario 0 -> 0. By default
* {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
* ReplicationSource should terminate, and no further logs should get enqueued
*/
@Test
public void testLoopedReplication() throws Exception {
LOG.info("testLoopedReplication");
startMiniClusters(1);
createTableOnClusters(table);
addPeer("1", 0, 0);
Thread.sleep(SLEEP_TIME);
// wait for source to terminate
final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ClusterMetrics clusterStatus = utilities[0].getAdmin()
.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.LIVE_SERVERS));
ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(rsName);
List<ReplicationLoadSource> replicationLoadSourceList =
serverLoad.getReplicationLoadSourceList();
return replicationLoadSourceList.isEmpty();
}
});
Table[] htables = getHTablesOnClusters(tableName);
putAndWait(row, famName, htables[0], htables[0]);
rollWALAndWait(utilities[0], table.getTableName(), row);
ZKWatcher zkw = utilities[0].getZooKeeperWatcher();
String queuesZnode = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode,
ZNodePaths.joinZNode("replication", "rs"));
List<String> listChildrenNoWatch =
ZKUtil.listChildrenNoWatch(zkw, ZNodePaths.joinZNode(queuesZnode, rsName.toString()));
assertEquals(0, listChildrenNoWatch.size());
}
@Test
public void testCustomReplicationEndpoint() throws Exception {
// test installing a custom replication endpoint other than the default one.
hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint",
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()));
// check whether the class has been constructed and started
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
}
});
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
}
});
Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
// now replicate some data.
doPut(Bytes.toBytes("row42"));
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.replicateCount.get() >= 1;
}
});
doAssert(Bytes.toBytes("row42"));
hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint");
}
@Test
public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
int peerCount = hbaseAdmin.listReplicationPeers().size();
final String id = "testReplicationEndpointReturnsFalseOnReplicate";
hbaseAdmin.addReplicationPeer(id,
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()));
// This test is flakey and then there is so much stuff flying around in here its, hard to
// debug. Peer needs to be up for the edit to make it across. This wait on
// peer count seems to be a hack that has us not progress till peer is up.
if (hbaseAdmin.listReplicationPeers().size() <= peerCount) {
LOG.info("Waiting on peercount to go up from " + peerCount);
Threads.sleep(100);
}
// now replicate some data
doPut(row);
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
// Looks like replication endpoint returns false unless we put more than 10 edits. We
// only send over one edit.
int count = ReplicationEndpointForTest.replicateCount.get();
LOG.info("count=" + count);
return ReplicationEndpointReturningFalse.replicated.get();
}
});
if (ReplicationEndpointReturningFalse.ex.get() != null) {
throw ReplicationEndpointReturningFalse.ex.get();
}
hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate");
}
@Test
public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
ReplicationPeerConfig rpc =
new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
// test that we can create mutliple WALFilters reflectively
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
EverythingPassesWALEntryFilter.class.getName() + "," +
EverythingPassesWALEntryFilterSubclass.class.getName());
hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
// now replicate some data.
try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
doPut(connection, Bytes.toBytes("row1"));
doPut(connection, row);
doPut(connection, Bytes.toBytes("row2"));
}
Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.replicateCount.get() >= 1;
}
});
Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
//make sure our reflectively created filter is in the filter chain
Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint");
}
@Test
public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
LOG.info("testSimplePutDelete");
MiniHBaseCluster peerCluster = UTIL2.getMiniHBaseCluster();
// This test wants two RS's up. We only run one generally so add one.
peerCluster.startRegionServer();
Waiter.waitFor(peerCluster.getConfiguration(), 30000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return peerCluster.getLiveRegionServerThreads().size() > 1;
}
});
int numRS = peerCluster.getRegionServerThreads().size();
doPutTest(Bytes.toBytes(1));
int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0;
peerCluster.stopRegionServer(rsToStop);
peerCluster.waitOnRegionServer(rsToStop);
// Sanity check
assertEquals(numRS - 1, peerCluster.getRegionServerThreads().size());
doPutTest(Bytes.toBytes(2));
peerCluster.startRegionServer();
// Sanity check
assertEquals(numRS, peerCluster.getRegionServerThreads().size());
doPutTest(Bytes.toBytes(3));
}
@Test
public void testMultiLimits() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
Table t = TEST_UTIL.createTable(tableName, FAMILY);
TEST_UTIL.loadTable(t, FAMILY, false);
// Split the table to make sure that the chunking happens accross regions.
try (final Admin admin = TEST_UTIL.getAdmin()) {
admin.split(tableName);
TEST_UTIL.waitFor(60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return admin.getRegions(tableName).size() > 1;
}
});
}
List<Get> gets = new ArrayList<>(MAX_SIZE);
for (int i = 0; i < MAX_SIZE; i++) {
gets.add(new Get(HBaseTestingUtility.ROWS[i]));
}
RpcServerInterface rpcServer = TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer();
BaseSource s = rpcServer.getMetrics().getMetricsSource();
long startingExceptions = METRICS_ASSERT.getCounter("exceptions", s);
long startingMultiExceptions = METRICS_ASSERT.getCounter("exceptions.multiResponseTooLarge", s);
Result[] results = t.get(gets);
assertEquals(MAX_SIZE, results.length);
// Cells from TEST_UTIL.loadTable have a length of 27.
// Multiplying by less than that gives an easy lower bound on size.
// However in reality each kv is being reported as much higher than that.
METRICS_ASSERT.assertCounterGt("exceptions",
startingExceptions + ((MAX_SIZE * 25) / MAX_SIZE), s);
METRICS_ASSERT.assertCounterGt("exceptions.multiResponseTooLarge",
startingMultiExceptions + ((MAX_SIZE * 25) / MAX_SIZE), s);
}
static void waitUntilAllMetaReplicasAreReady(HBaseTestingUtility util,
ConnectionRegistry registry) {
Configuration conf = util.getConfiguration();
int regionReplicaCount = util.getConfiguration().getInt(HConstants.META_REPLICAS_NUM,
HConstants.DEFAULT_META_REPLICA_NUM);
Waiter.waitFor(conf, conf.getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
new ExplainingPredicate<IOException>() {
@Override
public String explainFailure() {
return "Not all meta replicas get assigned";
}
@Override
public boolean evaluate() {
try {
RegionLocations locs = registry.getMetaRegionLocations().get();
if (locs.size() < regionReplicaCount) {
return false;
}
for (int i = 0; i < regionReplicaCount; i++) {
HRegionLocation loc = locs.getRegionLocation(i);
// Wait until the replica is served by a region server. There could be delay between
// the replica being available to the connection and region server opening it.
Optional<ServerName> rsCarryingReplica =
getRSCarryingReplica(util, loc.getRegion().getTable(), i);
if (!rsCarryingReplica.isPresent()) {
return false;
}
}
return true;
} catch (Exception e) {
TestZKConnectionRegistry.LOG.warn("Failed to get meta region locations", e);
return false;
}
}
});
}
private void waitUntilTableRegionCountReached(final TableName tableName, final int numRegions)
throws Exception {
TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return MetaTableAccessor.getRegionCount(TEST_UTIL.getConfiguration(), tableName) == numRegions;
}
});
}
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
SpaceQuotaHelperForTests.updateConfigForQuotas(conf);
TEST_UTIL.startMiniCluster(1);
// Wait till quota table onlined.
TEST_UTIL.waitFor(10000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return MetaTableAccessor.tableExists(TEST_UTIL.getConnection(),
QuotaTableUtil.QUOTA_TABLE_NAME);
}
});
}
private void waitForHFilesCountLessorEqual(TableName tn, byte[] cf, int count) throws Exception {
Waiter.waitFor(TEST_UTIL.getConfiguration(), 30 * 1000, 1000, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return TEST_UTIL.getNumHFiles(tn, cf) <= count;
}
});
}