下面列出了怎么用org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void postExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {
synchronized (HaltCP.class) {
if (!HALT) {
return;
}
UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
.filter(p -> p instanceof TransitPeerSyncReplicationStateProcedure)
.filter(p -> !p.isFinished()).map(p -> (TransitPeerSyncReplicationStateProcedure) p)
.findFirst().ifPresent(proc -> {
// this is the next state of REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN_VALUE
if (proc.getCurrentStateId() == REOPEN_ALL_REGIONS_IN_PEER_VALUE) {
// tell the main thread to start a new region server
ARRIVE.countDown();
try {
// wait for the region server to online
RESUME.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
HALT = false;
}
});
}
}
@Override
public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException {
if (rmt != null && this.mergedRegion != null) {
RegionServerCoprocessorEnvironment environment = ctx.getEnvironment();
HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
rmt.stepsAfterPONR(rs, rs, this.mergedRegion);
}
}
@Override
public void preRollBackMerge(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
HRegion regionA, HRegion regionB) throws IOException {
HRegionServer rs = (HRegionServer) ctx.getEnvironment().getRegionServerServices();
try {
if (rmt != null) {
rmt.rollback(rs, rs);
rmt = null;
mergedRegion = null;
}
} catch (Exception e) {
LOG.error("Error while rolling back the merge failure for index regions", e);
rs.abort("Abort; we got an error during rollback of index");
}
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
int rmiRegistryPort = -1;
int rmiConnectorPort = -1;
Configuration conf = env.getConfiguration();
if (env instanceof MasterCoprocessorEnvironment) {
// running on Master
rmiRegistryPort =
conf.getInt("master" + RMI_REGISTRY_PORT_CONF_KEY, defMasterRMIRegistryPort);
rmiConnectorPort = conf.getInt("master" + RMI_CONNECTOR_PORT_CONF_KEY, rmiRegistryPort);
LOG.info("Master rmiRegistryPort:" + rmiRegistryPort + ",Master rmiConnectorPort:"
+ rmiConnectorPort);
} else if (env instanceof RegionServerCoprocessorEnvironment) {
// running on RegionServer
rmiRegistryPort =
conf.getInt("regionserver" + RMI_REGISTRY_PORT_CONF_KEY,
defRegionserverRMIRegistryPort);
rmiConnectorPort =
conf.getInt("regionserver" + RMI_CONNECTOR_PORT_CONF_KEY, rmiRegistryPort);
LOG.info("RegionServer rmiRegistryPort:" + rmiRegistryPort
+ ",RegionServer rmiConnectorPort:" + rmiConnectorPort);
} else if (env instanceof RegionCoprocessorEnvironment) {
LOG.error("JMXListener should not be loaded in Region Environment!");
return;
}
synchronized(JMXListener.class) {
if (JMX_CS != null) {
LOG.info("JMXListener has been started at Registry port " + rmiRegistryPort);
}
else {
startConnectorServer(rmiRegistryPort, rmiConnectorPort);
}
}
}
@Test
public void testCoprocessorLoading() throws Exception {
MasterCoprocessorHost cpHost =
TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterCoprocessorHost();
cpHost.load(MyAccessController.class, Coprocessor.PRIORITY_HIGHEST, conf);
AccessController ACCESS_CONTROLLER = cpHost.findCoprocessor(MyAccessController.class);
MasterCoprocessorEnvironment CP_ENV = cpHost.createEnvironment(
ACCESS_CONTROLLER, Coprocessor.PRIORITY_HIGHEST, 1, conf);
RegionServerCoprocessorHost rsHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
.getRegionServerCoprocessorHost();
RegionServerCoprocessorEnvironment RSCP_ENV = rsHost.createEnvironment(
ACCESS_CONTROLLER, Coprocessor.PRIORITY_HIGHEST, 1, conf);
}
@Override
public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {
if (!hasAccess) {
throw new AccessDeniedException("Insufficient permissions to stop region server.");
}
}
@Override
public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> env)
throws IOException {
abortCount.incrementAndGet();
if (!stopAllowed) {
throw new IOException("Stop not allowed");
}
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionServerCoprocessorEnvironment) {
this.regionServerServices = (RegionServerServices) ((RegionServerCoprocessorEnvironment) env).getOnlineRegions();
SpliceLogUtils.info(LOG,"Started SpliceRSRpcServices");
} else {
throw new CoprocessorException("Must be loaded on a RegionServer!");
}
}
@Override
public void preStopRegionServer(ObserverContext<RegionServerCoprocessorEnvironment> env) throws IOException {
LOG.warn("shutting down splice on this node/JVM");
try {
DatabaseLifecycleManager.manager().shutdown();
} catch (Exception e) {
SpliceLogUtils.warn(LOG,"splice machine shut down with error",e);
}
}
@Test(expected = CoprocessorException.class)
public void testStart() throws IOException {
CoprocessorEnvironment env = PowerMockito.mock(RegionServerCoprocessorEnvironment.class);
CubeVisitService service = new CubeVisitService();
service.start(env);
}
@Override
public void preMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
HRegion regionA, HRegion regionB, List<Mutation> metaEntries) throws IOException {
HTableDescriptor tableDesc = regionA.getTableDesc();
if (SchemaUtil.isSystemTable(tableDesc.getName())) {
return;
}
RegionServerServices rss = ctx.getEnvironment().getRegionServerServices();
HRegionServer rs = (HRegionServer) rss;
if (tableDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) == null
|| !Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(tableDesc
.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
TableName indexTable =
TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(tableDesc.getName()));
if (!MetaTableAccessor.tableExists(rs.getConnection(), indexTable)) return;
HRegion indexRegionA = IndexUtil.getIndexRegion(regionA, ctx.getEnvironment());
if (indexRegionA == null) {
LOG.warn("Index region corresponindg to data region " + regionA
+ " not in the same server. So skipping the merge.");
ctx.bypass();
return;
}
HRegion indexRegionB = IndexUtil.getIndexRegion(regionB, ctx.getEnvironment());
if (indexRegionB == null) {
LOG.warn("Index region corresponindg to region " + regionB
+ " not in the same server. So skipping the merge.");
ctx.bypass();
return;
}
try {
rmt = new RegionMergeTransaction(indexRegionA, indexRegionB, false);
if (!rmt.prepare(rss)) {
LOG.error("Prepare for the index regions merge [" + indexRegionA + ","
+ indexRegionB + "] failed. So returning null. ");
ctx.bypass();
return;
}
this.mergedRegion = rmt.stepsBeforePONR(rss, rss, false);
rmt.prepareMutationsForMerge(mergedRegion.getRegionInfo(),
indexRegionA.getRegionInfo(), indexRegionB.getRegionInfo(),
rss.getServerName(), metaEntries, 1);
} catch (Exception e) {
ctx.bypass();
LOG.warn("index regions merge failed with the exception ", e);
if (rmt != null) {
rmt.rollback(rss, rss);
rmt = null;
mergedRegion = null;
}
}
}
}
public static HRegion
getIndexRegion(HRegion dataRegion, RegionServerCoprocessorEnvironment env)
throws IOException {
return getIndexRegion(dataRegion, env.getRegionServerServices());
}
public static HRegion
getDataRegion(HRegion indexRegion, RegionServerCoprocessorEnvironment env)
throws IOException {
return getDataRegion(indexRegion, env.getRegionServerServices());
}
@Test(expected = CoprocessorException.class)
public void testStart() throws IOException {
CoprocessorEnvironment env = PowerMockito.mock(RegionServerCoprocessorEnvironment.class);
CubeVisitService service = new CubeVisitService();
service.start(env);
}
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService);
}
@Override
public void preStopRegionServer(
ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {
requirePermission(ctx, "preStopRegionServer", Action.ADMIN);
}
@Override
public void preRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {
requirePermission(ctx, "preRollLogWriterRequest", Permission.Action.ADMIN);
}
@Override
public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException { }
@Override
public ReplicationEndpoint postCreateReplicationEndPoint(
ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
return endpoint;
}
@Override
public void preReplicateLogEntries(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {
requirePermission(ctx, "replicateLogEntries", Action.WRITE);
}
@Override
public void preClearCompactionQueues(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {
requirePermission(ctx, "preClearCompactionQueues", Permission.Action.ADMIN);
}
@Override
public void preExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {
checkSystemOrSuperUser(getActiveUser(ctx));
}
@Override
public void preExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {
// FIXME: ignore the procedure permission check since in our UT framework master is neither
// the systemuser nor the superuser so we can not call executeProcedures...
}
@SuppressFBWarnings(value="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", justification="intentional")
private DatabaseLifecycleManager startEngine(CoprocessorEnvironment e) throws IOException{
RegionServerServices regionServerServices =(RegionServerServices)((RegionServerCoprocessorEnvironment) e).getOnlineRegions();
rsZnode = regionServerServices.getZooKeeper().getZNodePaths().rsZNode;
regionServerZNode = regionServerServices.getServerName().getServerName();
//ensure that the SI environment is booted properly
HBaseSIEnvironment env=HBaseSIEnvironment.loadEnvironment(new SystemClock(),ZkUtils.getRecoverableZooKeeper());
SIDriver driver = env.getSIDriver();
//make sure the configuration is correct
SConfiguration config=driver.getConfiguration();
DatabaseLifecycleManager manager=DatabaseLifecycleManager.manager();
HBaseRegionLoads.INSTANCE.startWatching();
TransactionsWatcher.INSTANCE.startWatching();
//register the engine boot service
try{
ManagerLoader.load().getEncryptionManager();
HBaseConnectionFactory connFactory = HBaseConnectionFactory.getInstance(driver.getConfiguration());
RegionServerLifecycle distributedStartupSequence=new RegionServerLifecycle(driver.getClock(),connFactory);
manager.registerEngineService(new MonitoredLifecycleService(distributedStartupSequence,config,false));
//register the pipeline driver environment load service
manager.registerGeneralService(new PipelineEnvironmentLoadService() {
@Override
protected PipelineEnvironment loadPipelineEnvironment(ContextFactoryDriver cfDriver) throws IOException {
return HBasePipelineEnvironment.loadEnvironment(new SystemClock(),cfDriver);
}
});
env.txnStore().setOldTransactions(ZkUpgrade.getOldTransactions(config));
//register the network boot service
manager.registerNetworkService(new NetworkLifecycleService(config));
manager.start();
SIDriver.driver().getExecutorService().submit(new SetReplicationRoleTask());
return manager;
}catch(Exception e1){
LOG.error("Unexpected exception registering boot service", e1);
throw new DoNotRetryIOException(e1);
}
}