下面列出了怎么用org.apache.hadoop.hbase.master.MasterServices的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void initialize(MasterServices master, MetricsMaster metricsMaster)
throws IOException, UnsupportedOperationException {
this.master = master;
this.done = false;
// setup the default procedure coordinator
String name = master.getServerName().toString();
// get the configuration for the coordinator
Configuration conf = master.getConfiguration();
long wakeFrequency = conf.getInt(BACKUP_WAKE_MILLIS_KEY, BACKUP_WAKE_MILLIS_DEFAULT);
long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY,BACKUP_TIMEOUT_MILLIS_DEFAULT);
int opThreads = conf.getInt(BACKUP_POOL_THREAD_NUMBER_KEY,
BACKUP_POOL_THREAD_NUMBER_DEFAULT);
// setup the default procedure coordinator
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
ProcedureCoordinationManager coordManager = new ZKProcedureCoordinationManager(master);
ProcedureCoordinatorRpcs comms =
coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
}
@Override
public void init(Map<String, Object> params) {
MasterServices master = (MasterServices) MapUtils.getObject(params,
HMaster.MASTER);
if (master != null) {
conn = master.getConnection();
if (getConf() == null) {
super.setConf(conn.getConfiguration());
}
}
if (conn == null) {
try {
conn = ConnectionFactory.createConnection(getConf());
} catch (IOException ioe) {
throw new RuntimeException("Failed to create connection", ioe);
}
}
}
public SyncReplicationReplayWALManager(MasterServices services)
throws IOException, ReplicationException {
this.serverManager = services.getServerManager();
this.fs = services.getMasterFileSystem().getWALFileSystem();
this.walRootDir = services.getMasterFileSystem().getWALRootDir();
this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
serverManager.registerListener(new ServerListener() {
@Override
public void serverAdded(ServerName serverName) {
MasterProcedureScheduler scheduler =
services.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler();
for (UsedReplayWorkersForPeer usedWorkers : usedWorkersByPeer.values()) {
synchronized (usedWorkers) {
usedWorkers.wake(scheduler);
}
}
}
});
}
/**
* Fully specify all necessary components of a snapshot manager. Exposed for testing.
* @param master services for the master where the manager is running
* @param coordinator procedure coordinator instance. exposed for testing.
* @param pool HBase ExecutorServcie instance, exposed for testing.
*/
@VisibleForTesting
SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
ExecutorService pool, int sentinelCleanInterval)
throws IOException, UnsupportedOperationException {
this.master = master;
this.rootDir = master.getMasterFileSystem().getRootDir();
Configuration conf = master.getConfiguration();
checkSnapshotSupport(conf, master.getMasterFileSystem());
this.coordinator = coordinator;
this.executorService = pool;
resetTempDir();
snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
}
@Override
public void initialize(MasterServices master, MetricsMaster metricsMaster)
throws KeeperException, IOException, UnsupportedOperationException {
this.master = master;
// get the configuration for the coordinator
Configuration conf = master.getConfiguration();
long wakeFrequency = conf.getInt(FLUSH_WAKE_MILLIS_KEY, FLUSH_WAKE_MILLIS_DEFAULT);
long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(FLUSH_PROC_POOL_THREADS_KEY, FLUSH_PROC_POOL_THREADS_DEFAULT);
// setup the procedure coordinator
String name = master.getServerName().toString();
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, threads);
ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(
master.getZooKeeper(), getProcedureSignature(), name);
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
}
@BeforeClass
public static void beforeAllTests() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setClass("hbase.util.ip.to.rack.determiner", MockMapping.class, DNSToSwitchMapping.class);
loadBalancer = new MockBalancer();
loadBalancer.setConf(conf);
MasterServices st = Mockito.mock(MasterServices.class);
Mockito.when(st.getServerName()).thenReturn(master);
loadBalancer.setMasterServices(st);
// Set up the rack topologies (5 machines per rack)
rackManager = Mockito.mock(RackManager.class);
for (int i = 0; i < NUM_SERVERS; i++) {
servers[i] = ServerName.valueOf("foo"+i+":1234",-1);
if (i < 5) {
Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack1");
}
if (i >= 5 && i < 10) {
Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack2");
}
if (i >= 10) {
Mockito.when(rackManager.getRack(servers[i])).thenReturn("rack3");
}
}
}
@Test
public void testInit() {
IOException exceptionFound = null;
try {
MasterServices service = null;
RangerAccessControlLists.init(service);
} catch (IOException e) {
exceptionFound = e;
}
Assert.assertFalse("Expected to get a NullPointerExecution after init method Execution - Found [" + exceptionFound + "]", (!(exceptionFound != null && exceptionFound.getCause() instanceof NullPointerException)));
}
public FavoredNodesManager(MasterServices masterServices) {
this.masterServices = masterServices;
this.globalFavoredNodesAssignmentPlan = new FavoredNodesPlan();
this.primaryRSToRegionMap = new HashMap<>();
this.secondaryRSToRegionMap = new HashMap<>();
this.teritiaryRSToRegionMap = new HashMap<>();
this.rackManager = new RackManager(masterServices.getConfiguration());
}
@Override
public synchronized void setMasterServices(MasterServices masterServices) {
super.setMasterServices(masterServices);
this.localityCost.setServices(masterServices);
this.rackLocalityCost.setServices(masterServices);
this.localityCandidateGenerator.setServices(masterServices);
}
LocalityBasedCostFunction(Configuration conf,
MasterServices srv,
LocalityType type,
String localityCostKey,
float defaultLocalityCost) {
super(conf);
this.type = type;
this.setMultiplier(conf.getFloat(localityCostKey, defaultLocalityCost));
this.services = srv;
this.locality = 0.0;
this.bestLocality = 0.0;
}
ServerLocalityCostFunction(Configuration conf, MasterServices srv) {
super(
conf,
srv,
LocalityType.SERVER,
LOCALITY_COST_KEY,
DEFAULT_LOCALITY_COST
);
}
public RackLocalityCostFunction(Configuration conf, MasterServices services) {
super(
conf,
services,
LocalityType.RACK,
RACK_LOCALITY_COST_KEY,
DEFAULT_RACK_LOCALITY_COST
);
}
@Override
public void setMasterServices(MasterServices masterServices) {
masterServerName = masterServices.getServerName();
this.services = masterServices;
if (useRegionFinder) {
this.regionFinder.setServices(masterServices);
}
if (this.services.isInMaintenanceMode()) {
this.maintenanceMode = true;
}
}
/**
* {@inheritDoc}
*/
@Override
public long submit(MasterServices masterServices) throws IOException {
LOG.info("Executing merging normalization plan: " + this);
// Do not use force=true as corner cases can happen, non adjacent regions,
// merge with a merged child region with no GC done yet, it is going to
// cause all different issues.
return masterServices
.mergeRegions(new RegionInfo[] { firstRegion, secondRegion }, false, HConstants.NO_NONCE,
HConstants.NO_NONCE);
}
@VisibleForTesting
AssignmentManager(final MasterServices master, final RegionStateStore stateStore) {
this.master = master;
this.regionStateStore = stateStore;
this.metrics = new MetricsAssignmentManager();
final Configuration conf = master.getConfiguration();
// Only read favored nodes if using the favored nodes load balancer.
this.shouldAssignRegionsWithFavoredNodes = FavoredStochasticBalancer.class.isAssignableFrom(
conf.getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class));
this.assignDispatchWaitMillis = conf.getInt(ASSIGN_DISPATCH_WAIT_MSEC_CONF_KEY,
DEFAULT_ASSIGN_DISPATCH_WAIT_MSEC);
this.assignDispatchWaitQueueMaxSize = conf.getInt(ASSIGN_DISPATCH_WAITQ_MAX_CONF_KEY,
DEFAULT_ASSIGN_DISPATCH_WAITQ_MAX);
this.assignMaxAttempts = Math.max(1, conf.getInt(ASSIGN_MAX_ATTEMPTS,
DEFAULT_ASSIGN_MAX_ATTEMPTS));
this.assignRetryImmediatelyMaxAttempts = conf.getInt(ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS,
DEFAULT_ASSIGN_RETRY_IMMEDIATELY_MAX_ATTEMPTS);
int ritChoreInterval = conf.getInt(RIT_CHORE_INTERVAL_MSEC_CONF_KEY,
DEFAULT_RIT_CHORE_INTERVAL_MSEC);
this.ritChore = new RegionInTransitionChore(ritChoreInterval);
int deadRegionChoreInterval = conf.getInt(DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC_CONF_KEY,
DEFAULT_DEAD_REGION_METRIC_CHORE_INTERVAL_MSEC);
if (deadRegionChoreInterval > 0) {
this.deadMetricChore = new DeadServerMetricRegionChore(deadRegionChoreInterval);
} else {
this.deadMetricChore = null;
}
}
/**
* Check that cluster is up and master is running. Check table is modifiable.
* If <code>enabled</code>, check table is enabled else check it is disabled.
* Call in Procedure constructor so can pass any exception to caller.
* @param enabled If true, check table is enabled and throw exception if not. If false, do the
* inverse. If null, do no table checks.
*/
protected void preflightChecks(MasterProcedureEnv env, Boolean enabled) throws HBaseIOException {
MasterServices master = env.getMasterServices();
if (!master.isClusterUp()) {
throw new HBaseIOException("Cluster not up!");
}
if (master.isStopping() || master.isStopped()) {
throw new HBaseIOException("Master stopping=" + master.isStopping() +
", stopped=" + master.isStopped());
}
if (enabled == null) {
// Don't do any table checks.
return;
}
try {
// Checks table exists and is modifiable.
checkTableModifiable(env);
TableName tn = getTableName();
TableStateManager tsm = master.getTableStateManager();
TableState ts = tsm.getTableState(tn);
if (enabled) {
if (!ts.isEnabledOrEnabling()) {
throw new TableNotEnabledException(tn);
}
} else {
if (!ts.isDisabledOrDisabling()) {
throw new TableNotDisabledException(tn);
}
}
} catch (IOException ioe) {
if (ioe instanceof HBaseIOException) {
throw (HBaseIOException)ioe;
}
throw new HBaseIOException(ioe);
}
}
public MasterProcedureEnv(final MasterServices master,
final RSProcedureDispatcher remoteDispatcher) {
this.master = master;
this.procSched = new MasterProcedureScheduler(
procId -> master.getMasterProcedureExecutor().getProcedure(procId));
this.remoteDispatcher = remoteDispatcher;
}
public RSProcedureDispatcher(final MasterServices master) {
super(master.getConfiguration());
this.master = master;
this.rsStartupWaitTime = master.getConfiguration().getLong(
RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME);
}
/**
* @param services services for the master
* @param snapshot snapshot to check
* @param workingDirFs the file system containing the temporary snapshot information
*/
public MasterSnapshotVerifier(MasterServices services,
SnapshotDescription snapshot, FileSystem workingDirFs) {
this.workingDirFs = workingDirFs;
this.services = services;
this.snapshot = snapshot;
this.tableName = TableName.valueOf(snapshot.getTable());
}
/**
* @param snapshot descriptor of the snapshot to take
* @param masterServices master services provider
* @throws IllegalArgumentException if the working snapshot directory set from the
* configuration is the same as the completed snapshot directory
* @throws IOException if the file system of the working snapshot directory cannot be
* determined
*/
public TakeSnapshotHandler(SnapshotDescription snapshot, final MasterServices masterServices,
final SnapshotManager snapshotManager) throws IOException {
super(masterServices, EventType.C_M_SNAPSHOT_TABLE);
assert snapshot != null : "SnapshotDescription must not be nul1";
assert masterServices != null : "MasterServices must not be nul1";
this.master = masterServices;
this.conf = this.master.getConfiguration();
this.rootDir = this.master.getMasterFileSystem().getRootDir();
this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf);
Preconditions.checkArgument(!SnapshotDescriptionUtils.isSubDirectoryOf(workingDir, rootDir) ||
SnapshotDescriptionUtils.isWithinDefaultWorkingDir(workingDir, conf),
"The working directory " + workingDir + " cannot be in the root directory unless it is "
+ "within the default working directory");
this.snapshot = snapshot;
this.snapshotManager = snapshotManager;
this.snapshotTable = TableName.valueOf(snapshot.getTable());
this.rootFs = this.master.getMasterFileSystem().getFileSystem();
this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
this.workingDirFs = this.workingDir.getFileSystem(this.conf);
this.monitor = new ForeignExceptionDispatcher(snapshot.getName());
this.snapshotManifest = SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor);
this.tableLock = master.getLockManager().createMasterLock(
snapshotTable, LockType.EXCLUSIVE,
this.getClass().getName() + ": take snapshot " + snapshot.getName());
// prepare the verify
this.verifier = new MasterSnapshotVerifier(masterServices, snapshot, workingDirFs);
// update the running tasks
this.status = TaskMonitor.get().createStatus(
"Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable);
}
@Override
public void initialize(MasterServices master, MetricsMaster metricsMaster)
throws KeeperException, IOException, UnsupportedOperationException {
this.master = master;
this.done = false;
// setup the default procedure coordinator
String name = master.getServerName().toString();
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(
master.getZooKeeper(), getProcedureSignature(), name);
this.coordinator = new ProcedureCoordinator(comms, tpool);
}
private RSGroupInfoManagerImpl(MasterServices masterServices) {
this.masterServices = masterServices;
this.watcher = masterServices.getZooKeeper();
this.conn = masterServices.getAsyncClusterConnection();
this.rsGroupStartupWorker = new RSGroupStartupWorker();
this.script = new RSGroupMappingScript(masterServices.getConfiguration());
}
@Test
public void testUninitializedQuotaManangerDoesNotFail() {
MasterServices masterServices = mock(MasterServices.class);
MasterQuotaManager manager = new MasterQuotaManager(masterServices);
manager.addRegionSize(null, 0, 0);
assertNotNull(manager.snapshotRegionSizes());
}
protected static MasterServices getMockedMaster() throws IOException {
TableDescriptors tds = Mockito.mock(TableDescriptors.class);
Mockito.when(tds.get(tables[0])).thenReturn(tableDescs.get(tables[0]));
Mockito.when(tds.get(tables[1])).thenReturn(tableDescs.get(tables[1]));
Mockito.when(tds.get(tables[2])).thenReturn(tableDescs.get(tables[2]));
Mockito.when(tds.get(tables[3])).thenReturn(tableDescs.get(tables[3]));
MasterServices services = Mockito.mock(HMaster.class);
Mockito.when(services.getTableDescriptors()).thenReturn(tds);
AssignmentManager am = Mockito.mock(AssignmentManager.class);
Mockito.when(services.getAssignmentManager()).thenReturn(am);
return services;
}
private static double getRegionSizeMB(final MasterServices masterServices,
final RegionInfo regionInfo) {
final ServerName sn = masterServices.getAssignmentManager()
.getRegionStates()
.getRegionServerOfRegion(regionInfo);
final RegionMetrics regionLoad = masterServices.getServerManager()
.getLoad(sn)
.getRegionMetrics()
.get(regionInfo.getRegionName());
if (regionLoad == null) {
LOG.debug("{} was not found in RegionsLoad", regionInfo.getRegionNameAsString());
return -1;
}
return regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
}
@Override
protected AssignmentManager createAssignmentManager(MasterServices master) {
return new AssignmentManagerForTest(master);
}
public SplitLogManagerDetails(ConcurrentMap<String, Task> tasks, MasterServices master,
Set<String> failedDeletions) {
this.tasks = tasks;
this.master = master;
this.failedDeletions = failedDeletions;
}
public MockRegionStateStore(final MasterServices master) {
super(master);
}
public MasterQuotaManager(final MasterServices masterServices) {
this.masterServices = masterServices;
}
LocalityBasedCandidateGenerator(MasterServices masterServices) {
this.masterServices = masterServices;
}