下面列出了怎么用org.apache.hadoop.hbase.regionserver.RegionServerServices的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void initialize(RegionServerServices rss) throws KeeperException {
this.rss = rss;
if (!BackupManager.isBackupEnabled(rss.getConfiguration())) {
LOG.warn("Backup is not enabled. Check your " + BackupRestoreConstants.BACKUP_ENABLE_KEY
+ " setting");
return;
}
ProcedureCoordinationManager coordManager = new ZKProcedureCoordinationManager(rss);
this.memberRpcs = coordManager
.getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
// read in the backup handler configuration properties
Configuration conf = rss.getConfiguration();
long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT);
// create the actual cohort member
ThreadPoolExecutor pool =
ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder());
}
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
justification="NPE should never happen; if it does it is a bigger issue")
public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
RegionCoprocessorEnvironment env = ctx.getEnvironment();
Configuration c = env.getConfiguration();
if (pairs == null || pairs.isEmpty() ||
!c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
LOG.debug("Skipping recording bulk load entries in preCommitStoreFile for bulkloaded "
+ "data replication.");
return;
}
// This is completely cheating AND getting a HRegionServer from a RegionServerEnvironment is
// just going to break. This is all private. Not allowed. Regions shouldn't assume they are
// hosted in a RegionServer. TODO: fix.
RegionServerServices rss = ((HasRegionServerServices)env).getRegionServerServices();
Replication rep = (Replication)((HRegionServer)rss).getReplicationSourceService();
rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs);
}
@Override
public void start(CoprocessorEnvironment env) {
// if running at region
if (env instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment regionEnv = (RegionCoprocessorEnvironment)env;
/* Getting the RpcServer from a RegionCE is wrong. There cannot be an expectation that Region
is hosted inside a RegionServer. If you need RpcServer, then pass in a RegionServerCE.
TODO: FIX.
*/
RegionServerServices rss = ((HasRegionServerServices)regionEnv).getRegionServerServices();
RpcServerInterface server = rss.getRpcServer();
SecretManager<?> mgr = ((RpcServer)server).getSecretManager();
if (mgr instanceof AuthenticationTokenSecretManager) {
secretManager = (AuthenticationTokenSecretManager)mgr;
}
}
}
/**
* Creates the "default" {@link SpaceViolationPolicyEnforcement} for a table that isn't in
* violation. This is used to have uniform policy checking for tables in and not quotas. This
* policy will still verify that new bulk loads do not exceed the configured quota limit.
*
* @param rss RegionServerServices instance the policy enforcement should use.
* @param tableName The target HBase table.
* @param snapshot The current quota snapshot for the {@code tableName}, can be null.
*/
public SpaceViolationPolicyEnforcement createWithoutViolation(
RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot) {
if (snapshot == null) {
// If we have no snapshot, this is equivalent to no quota for this table.
// We should do use the (singleton instance) of this policy to do nothing.
return MissingSnapshotViolationPolicyEnforcement.getInstance();
}
// We have a snapshot which means that there is a quota set on this table, but it's not in
// violation of that quota. We need to construct a policy for this table.
SpaceQuotaStatus status = snapshot.getQuotaStatus();
if (status.isInViolation()) {
throw new IllegalArgumentException(
tableName + " is in violation. Logic error. Snapshot=" + snapshot);
}
// We have a unique size snapshot to use. Create an instance for this tablename + snapshot.
DefaultViolationPolicyEnforcement enforcement = new DefaultViolationPolicyEnforcement();
enforcement.initialize(rss, tableName, snapshot);
return enforcement;
}
/**
* Create a default snapshot handler - uses a zookeeper based member controller.
* @param rss region server running the handler
* @throws KeeperException if the zookeeper cluster cannot be reached
*/
@Override
public void initialize(RegionServerServices rss) throws KeeperException {
this.rss = rss;
ZKWatcher zkw = rss.getZooKeeper();
this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
// read in the snapshot request configuration properties
Configuration conf = rss.getConfiguration();
long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
// create the actual snapshot procedure member
ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
opThreads, keepAlive);
this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
}
private void initialize() throws IOException {
// ZK configuration must _not_ have hbase.security.authentication or it will require SASL auth
Configuration zkConf = new Configuration(conf);
zkConf.set(User.HBASE_SECURITY_CONF_KEY, "simple");
this.zookeeper = new ZKWatcher(zkConf, TokenServer.class.getSimpleName(),
this, true);
this.rpcServer.start();
// Mock up region coprocessor environment
RegionCoprocessorEnvironment mockRegionCpEnv = mock(RegionCoprocessorEnvironment.class,
Mockito.withSettings().extraInterfaces(HasRegionServerServices.class));
when(mockRegionCpEnv.getConfiguration()).thenReturn(conf);
when(mockRegionCpEnv.getClassLoader()).then(
(var1) -> Thread.currentThread().getContextClassLoader());
RegionServerServices mockRss = mock(RegionServerServices.class);
when(mockRss.getRpcServer()).thenReturn(rpcServer);
when(((HasRegionServerServices) mockRegionCpEnv).getRegionServerServices())
.thenReturn(mockRss);
super.start(mockRegionCpEnv);
started = true;
}
@Test
public void testExceptionOnPolicyEnforcementEnable() throws Exception {
final TableName tableName = TableName.valueOf("foo");
final SpaceQuotaSnapshot snapshot = new SpaceQuotaSnapshot(
new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L, 2048L);
RegionServerServices rss = mock(RegionServerServices.class);
SpaceViolationPolicyEnforcementFactory factory = mock(
SpaceViolationPolicyEnforcementFactory.class);
SpaceViolationPolicyEnforcement enforcement = mock(SpaceViolationPolicyEnforcement.class);
RegionServerSpaceQuotaManager realManager = new RegionServerSpaceQuotaManager(rss, factory);
when(factory.create(rss, tableName, snapshot)).thenReturn(enforcement);
doThrow(new IOException("Failed for test!")).when(enforcement).enable();
realManager.enforceViolationPolicy(tableName, snapshot);
Map<TableName, SpaceViolationPolicyEnforcement> enforcements =
realManager.copyActiveEnforcements();
assertTrue("Expected active enforcements to be empty, but were " + enforcements,
enforcements.isEmpty());
}
@Test
public void testExceptionOnPolicyEnforcementDisable() throws Exception {
final TableName tableName = TableName.valueOf("foo");
final SpaceQuotaSnapshot snapshot = new SpaceQuotaSnapshot(
new SpaceQuotaStatus(SpaceViolationPolicy.DISABLE), 1024L, 2048L);
RegionServerServices rss = mock(RegionServerServices.class);
SpaceViolationPolicyEnforcementFactory factory = mock(
SpaceViolationPolicyEnforcementFactory.class);
SpaceViolationPolicyEnforcement enforcement = mock(SpaceViolationPolicyEnforcement.class);
RegionServerSpaceQuotaManager realManager = new RegionServerSpaceQuotaManager(rss, factory);
when(factory.create(rss, tableName, snapshot)).thenReturn(enforcement);
doNothing().when(enforcement).enable();
doThrow(new IOException("Failed for test!")).when(enforcement).disable();
// Enabling should work
realManager.enforceViolationPolicy(tableName, snapshot);
Map<TableName, SpaceViolationPolicyEnforcement> enforcements =
realManager.copyActiveEnforcements();
assertEquals(1, enforcements.size());
// If the disable fails, we should still treat it as "active"
realManager.disableViolationPolicyEnforcement(tableName);
enforcements = realManager.copyActiveEnforcements();
assertEquals(1, enforcements.size());
}
@SuppressWarnings("unchecked")
@Before
public void setup() throws IOException {
conf = HBaseConfiguration.create();
rss = mock(RegionServerServices.class);
manager = mock(RegionServerSpaceQuotaManager.class);
conn = mock(Connection.class);
when(manager.getRegionServerServices()).thenReturn(rss);
when(rss.getConfiguration()).thenReturn(conf);
chore = mock(SpaceQuotaRefresherChore.class);
when(chore.getConnection()).thenReturn(conn);
when(chore.getManager()).thenReturn(manager);
when(chore.checkQuotaTableExists()).thenReturn(true);
doCallRealMethod().when(chore).chore();
when(chore.isInViolation(any())).thenCallRealMethod();
doCallRealMethod().when(chore).extractQuotaSnapshot(any(), any());
}
/**
* Setup the config for the cluster
*/
@BeforeClass
public static void setupCluster() throws Exception {
setupConf(UTIL.getConfiguration());
UTIL.startMiniZKCluster();
UTIL.getConfiguration().setClass(MockRegistry.REGISTRY_IMPL_CONF_KEY, MockRegistry.class,
DummyConnectionRegistry.class);
CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration());
archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION);
// make hfile archiving node so we can archive files
ZKWatcher watcher = UTIL.getZooKeeperWatcher();
String archivingZNode = ZKTableArchiveClient.getArchiveZNode(UTIL.getConfiguration(), watcher);
ZKUtil.createWithParents(watcher, archivingZNode);
rss = mock(RegionServerServices.class);
POOL = new DirScanPool(UTIL.getConfiguration());
}
HRegion initHRegion (byte [] tableName, String callingMethod,
Configuration conf, byte [] ... families) throws IOException {
TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
new TableDescriptorBuilder.ModifyableTableDescriptor(TableName.valueOf(tableName));
for (byte[] family : families) {
tableDescriptor.setColumnFamily(
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(family));
}
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
Path path = new Path(DIR + callingMethod);
HRegion r = HBaseTestingUtility.createRegionAndWAL(info, path, conf, tableDescriptor);
// this following piece is a hack. currently a coprocessorHost
// is secretly loaded at OpenRegionHandler. we don't really
// start a region server here, so just manually create cphost
// and set it to region.
RegionCoprocessorHost host = new RegionCoprocessorHost(r,
Mockito.mock(RegionServerServices.class), conf);
r.setCoprocessorHost(host);
return r;
}
@Test
public void testRegionCoprocessorHostAllDisabled() throws Exception {
Configuration conf = new Configuration(CONF);
conf.setBoolean(CoprocessorHost.COPROCESSORS_ENABLED_CONF_KEY, false);
HRegion region = mock(HRegion.class);
when(region.getRegionInfo()).thenReturn(REGIONINFO);
when(region.getTableDescriptor()).thenReturn(TABLEDESC);
RegionServerServices rsServices = mock(RegionServerServices.class);
systemCoprocessorLoaded.set(false);
tableCoprocessorLoaded.set(false);
new RegionCoprocessorHost(region, rsServices, conf);
assertFalse("System coprocessors should not have been loaded",
systemCoprocessorLoaded.get());
assertFalse("Table coprocessors should not have been loaded",
tableCoprocessorLoaded.get());
}
@Test
public void testRegionCoprocessorHostTableLoadingDisabled() throws Exception {
Configuration conf = new Configuration(CONF);
conf.setBoolean(CoprocessorHost.COPROCESSORS_ENABLED_CONF_KEY, true); // if defaults change
conf.setBoolean(CoprocessorHost.USER_COPROCESSORS_ENABLED_CONF_KEY, false);
HRegion region = mock(HRegion.class);
when(region.getRegionInfo()).thenReturn(REGIONINFO);
when(region.getTableDescriptor()).thenReturn(TABLEDESC);
RegionServerServices rsServices = mock(RegionServerServices.class);
systemCoprocessorLoaded.set(false);
tableCoprocessorLoaded.set(false);
new RegionCoprocessorHost(region, rsServices, conf);
assertTrue("System coprocessors should have been loaded",
systemCoprocessorLoaded.get());
assertFalse("Table coprocessors should not have been loaded",
tableCoprocessorLoaded.get());
}
/**
* Rough test that Coprocessor Environment is Read-Only.
* Just check a random CP and see that it returns a read-only config.
*/
@Test
public void testReadOnlyConfiguration() throws Exception {
Configuration conf = new Configuration(CONF);
HRegion region = mock(HRegion.class);
when(region.getRegionInfo()).thenReturn(REGIONINFO);
when(region.getTableDescriptor()).thenReturn(TABLEDESC);
RegionServerServices rsServices = mock(RegionServerServices.class);
RegionCoprocessorHost rcp = new RegionCoprocessorHost(region, rsServices, conf);
boolean found = false;
for (String cpStr: rcp.getCoprocessors()) {
CoprocessorEnvironment cpenv = rcp.findCoprocessorEnvironment(cpStr);
if (cpenv != null) {
found = true;
}
Configuration c = cpenv.getConfiguration();
thrown.expect(UnsupportedOperationException.class);
c.set("one.two.three", "four.five.six");
}
assertTrue("Should be at least one CP found", found);
}
@Before
public void setUp() throws Exception {
TableName tableName = TableName.valueOf(getClass().getSimpleName());
TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
tableDescriptor.setColumnFamily(
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam));
RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
Path path = testUtil.getDataTestDir(getClass().getSimpleName());
region = HBaseTestingUtility.createRegionAndWAL(info, path,
testUtil.getConfiguration(), tableDescriptor);
rss = mock(RegionServerServices.class);
List<HRegion> regions = new ArrayList<>(1);
regions.add(region);
Mockito.doReturn(regions).when(rss).getRegions();
}
public static HRegion getIndexRegion(HRegion dataRegion, RegionServerServices rss) throws IOException {
TableName indexTableName =
TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(dataRegion.getTableDesc()
.getName()));
List<HRegion> onlineRegions = rss.getOnlineRegions(indexTableName);
for(HRegion indexRegion : onlineRegions) {
if (Bytes.compareTo(dataRegion.getStartKey(), indexRegion.getStartKey()) == 0) {
return indexRegion;
}
}
return null;
}
public static HRegion getDataRegion(HRegion indexRegion, RegionServerServices rss) throws IOException {
TableName dataTableName = TableName.valueOf(MetaDataUtil.getUserTableName(indexRegion.getTableDesc().getNameAsString()));
List<HRegion> onlineRegions = rss.getOnlineRegions(dataTableName);
for(HRegion region : onlineRegions) {
if (Bytes.compareTo(indexRegion.getStartKey(), region.getStartKey()) == 0) {
return region;
}
}
return null;
}
public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member,
ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
LogRollBackupSubprocedurePool taskManager, byte[] data) {
super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener,
wakeFrequency, timeout);
LOG.info("Constructing a LogRollBackupSubprocedure.");
this.rss = rss;
this.taskManager = taskManager;
if (data != null) {
backupRoot = new String(data);
}
}
/**
* Override setter from {@link SplitLogWorkerCoordination}
*/
@Override
public void init(RegionServerServices server, Configuration conf,
TaskExecutor splitExecutor, SplitLogWorker worker) {
this.server = server;
this.worker = worker;
this.splitTaskExecutor = splitExecutor;
maxConcurrentTasks =
conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
reportPeriod =
conf.getInt("hbase.splitlog.report.period",
conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3);
}
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName =
conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
this.walDir = walDir;
this.walFS = walFS;
this.rootDir = rootDir;
this.rootFS = rootFS;
this.sequenceIdChecker = idChecker;
this.splitLogWorkerCoordination = splitLogWorkerCoordination;
this.rsServices = rsServices;
this.walFactory = factory;
PipelineController controller = new PipelineController();
this.tmpDirName =
conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
// if we limit the number of writers opened for sinking recovered edits
boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
if (splitToHFile) {
entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
outputSink =
new BoundedRecoveredHFilesOutputSink(this, controller, entryBuffers, numWriterThreads);
} else if (splitWriterCreationBounded) {
entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
outputSink =
new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
} else {
entryBuffers = new EntryBuffers(controller, bufferSize);
outputSink = new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
}
}
/**
* Splits a WAL file.
* @return false if it is interrupted by the progress-able.
*/
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory,
RegionServerServices rsServices) throws IOException {
Path rootDir = CommonFSUtils.getRootDir(conf);
FileSystem rootFS = rootDir.getFileSystem(conf);
WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
splitLogWorkerCoordination, rsServices);
return s.splitLogFile(logfile, reporter);
}
public ActivePolicyEnforcement(Map<TableName,SpaceViolationPolicyEnforcement> activePolicies,
Map<TableName,SpaceQuotaSnapshot> snapshots, RegionServerServices rss,
SpaceViolationPolicyEnforcementFactory factory) {
this.activePolicies = activePolicies;
this.snapshots = snapshots;
this.rss = rss;
this.factory = factory;
// Mutable!
this.locallyCachedPolicies = new HashMap<>();
}
public RegionSizeReportingChore(RegionServerServices rsServices) {
super(
RegionSizeReportingChore.class.getSimpleName(), rsServices,
getPeriod(rsServices.getConfiguration()), getInitialDelay(rsServices.getConfiguration()),
getTimeUnit(rsServices.getConfiguration()));
this.rsServices = rsServices;
this.metrics = rsServices.getMetrics();
}
@VisibleForTesting
RegionServerSpaceQuotaManager(
RegionServerServices rsServices, SpaceViolationPolicyEnforcementFactory factory) {
this.rsServices = Objects.requireNonNull(rsServices);
this.factory = factory;
this.enforcedPolicies = new ConcurrentHashMap<>();
this.currentQuotaSnapshots = new AtomicReference<>(new HashMap<>());
// Initialize the size store to not track anything -- create the real one if we're start()'ed
this.regionSizeStore = NoOpRegionSizeStore.getInstance();
}
/**
* Constructs the appropriate {@link SpaceViolationPolicyEnforcement} for tables that are
* in violation of their space quota.
*/
public SpaceViolationPolicyEnforcement create(
RegionServerServices rss, TableName tableName, SpaceQuotaSnapshot snapshot) {
SpaceViolationPolicyEnforcement enforcement;
SpaceQuotaStatus status = snapshot.getQuotaStatus();
if (!status.isInViolation()) {
throw new IllegalArgumentException(tableName + " is not in violation. Snapshot=" + snapshot);
}
switch (status.getPolicy().get()) {
case DISABLE:
enforcement = new DisableTableViolationPolicyEnforcement();
break;
case NO_WRITES_COMPACTIONS:
enforcement = new NoWritesCompactionsViolationPolicyEnforcement();
break;
case NO_WRITES:
enforcement = new NoWritesViolationPolicyEnforcement();
break;
case NO_INSERTS:
enforcement = new NoInsertsViolationPolicyEnforcement();
break;
default:
throw new IllegalArgumentException("Unhandled SpaceViolationPolicy: " + status.getPolicy());
}
enforcement.initialize(rss, tableName, snapshot);
return enforcement;
}
@Before
public void setupMocks() throws Exception {
this.config = new Configuration(false);
// Create all of the mocks
this.region = mock(Region.class);
this.rsServices = mock(RegionServerServices.class);
this.statsWriter = mock(StatisticsWriter.class);
this.callable = mock(StatisticsScannerCallable.class);
this.runTracker = mock(StatisticsCollectionRunTracker.class);
this.mockScanner = mock(StatisticsScanner.class);
this.tracker = mock(StatisticsCollector.class);
this.delegate = mock(InternalScanner.class);
this.regionInfo = mock(RegionInfo.class);
this.env = mock(RegionCoprocessorEnvironment.class);
this.conn = mock(Connection.class);
// Wire up the mocks to the mock StatisticsScanner
when(mockScanner.getStatisticsWriter()).thenReturn(statsWriter);
when(mockScanner.createCallable()).thenReturn(callable);
when(mockScanner.getStatsCollectionRunTracker(any(Configuration.class))).thenReturn(runTracker);
when(mockScanner.getRegion()).thenReturn(region);
when(mockScanner.getConfig()).thenReturn(config);
when(mockScanner.getTracker()).thenReturn(tracker);
when(mockScanner.getDelegate()).thenReturn(delegate);
when(env.getConnection()).thenReturn(conn);
when(mockScanner.getConnection()).thenReturn(conn);
// Wire up the HRegionInfo mock to the Region mock
when(region.getRegionInfo()).thenReturn(regionInfo);
// Always call close() on the mock StatisticsScanner
doCallRealMethod().when(mockScanner).close();
}
public void initialize(RegionServerServices rss) throws KeeperException {
for (RegionServerProcedureManager proc : procedures) {
LOG.debug("Procedure {} initializing", proc.getProcedureSignature());
proc.initialize(rss);
LOG.debug("Procedure {} initialized", proc.getProcedureSignature());
}
}
/**
* This method used internally by the RegionServer to close out regions.
* @param abort If the regionserver is aborting.
*/
public CloseRegionHandler(final Server server,
final RegionServerServices rsServices,
final RegionInfo regionInfo, final boolean abort,
ServerName destination) {
this(server, rsServices, regionInfo, abort,
EventType.M_RS_CLOSE_REGION, destination);
}
protected CloseRegionHandler(final Server server,
final RegionServerServices rsServices, RegionInfo regionInfo,
boolean abort, EventType eventType, ServerName destination) {
super(server, eventType);
this.server = server;
this.rsServices = rsServices;
this.regionInfo = regionInfo;
this.abort = abort;
this.destination = destination;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionServerCoprocessorEnvironment) {
this.regionServerServices = (RegionServerServices) ((RegionServerCoprocessorEnvironment) env).getOnlineRegions();
SpliceLogUtils.info(LOG,"Started SpliceRSRpcServices");
} else {
throw new CoprocessorException("Must be loaded on a RegionServer!");
}
}