下面列出了怎么用org.apache.hadoop.hbase.coprocessor.RegionServerObserver的API类实例代码及写法,或者点击链接到github查看源代码。
public void preStop(String message, User user) throws IOException {
// While stopping the region server all coprocessors method should be executed first then the
// coprocessor should be cleaned up.
if (coprocEnvironments.isEmpty()) {
return;
}
execShutdown(new RegionServerObserverOperation(user) {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.preStopRegionServer(this);
}
@Override
public void postEnvCall() {
// invoke coprocessor stop method
shutdown(this.getEnvironment());
}
});
}
@BeforeClass
public static void before() throws Exception {
Configuration conf = UTIL.getConfiguration();
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, CustomObserver.class.getName());
conf.setStrings(
CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
MasterSyncObserver.class.getName(), CPMasterObserver.class.getName());
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
conf.setClass("hbase.coprocessor.regionserver.classes", CPRegionServerObserver.class,
RegionServerObserver.class);
StartMiniClusterOption option = StartMiniClusterOption.builder().numMasters(2).build();
UTIL.startMiniCluster(option);
waitForQuotaInitialize(UTIL);
ADMIN = UTIL.getAdmin();
}
private void init(){
if(LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.init()");
}
try {
atlasPluginClassLoader = AtlasPluginClassLoader.getInstance(ATLAS_PLUGIN_TYPE, this.getClass());
@SuppressWarnings("unchecked")
Class<?> cls = Class.forName(ATLAS_HBASE_HOOK_IMPL_CLASSNAME, true, atlasPluginClassLoader);
activatePluginClassLoader();
impl = cls.newInstance();
implMasterObserver = (MasterObserver)impl;
implRegionObserver = (RegionObserver)impl;
implRegionServerObserver = (RegionServerObserver)impl;
implMasterCoprocessor = (MasterCoprocessor)impl;
} catch (Exception e) {
// check what need to be done
LOG.error("Error Enabling RangerHbasePlugin", e);
} finally {
deactivatePluginClassLoader();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.init()");
}
}
public static Configuration setUpConfigForMiniCluster(Configuration conf, ReadOnlyProps overrideProps) {
assertNotNull(conf);
setDefaultTestConfig(conf, overrideProps);
/*
* The default configuration of mini cluster ends up spawning a lot of threads
* that are not really needed by phoenix for test purposes. Limiting these threads
* helps us in running several mini clusters at the same time without hitting
* the threads limit imposed by the OS.
*/
conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5);
conf.setInt("hbase.regionserver.metahandler.count", 2);
conf.setInt(HConstants.MASTER_HANDLER_COUNT, 2);
conf.setClass("hbase.coprocessor.regionserver.classes", LocalIndexMerger.class,
RegionServerObserver.class);
conf.setInt("dfs.namenode.handler.count", 2);
conf.setInt("dfs.namenode.service.handler.count", 2);
conf.setInt("dfs.datanode.handler.count", 2);
conf.setInt("ipc.server.read.threadpool.size", 2);
conf.setInt("ipc.server.handler.threadpool.size", 2);
conf.setInt("hbase.hconnection.threads.max", 2);
conf.setInt("hbase.hconnection.threads.core", 2);
conf.setInt("hbase.htable.threads.max", 2);
conf.setInt("hbase.regionserver.hlog.syncer.count", 2);
conf.setInt("hbase.hlog.asyncer.number", 2);
conf.setInt("hbase.assignment.zkevent.workers", 5);
conf.setInt("hbase.assignment.threads.max", 5);
conf.setInt("hbase.catalogjanitor.interval", 5000);
return conf;
}
public CurrentCoprocessorMethods() {
addMethods(BulkLoadObserver.class);
addMethods(EndpointObserver.class);
addMethods(MasterObserver.class);
addMethods(RegionObserver.class);
addMethods(RegionServerObserver.class);
addMethods(WALObserver.class);
}
public void preRollWALWriterRequest() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.preRollWALWriterRequest(this);
}
});
}
public void postRollWALWriterRequest() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.postRollWALWriterRequest(this);
}
});
}
public void preReplicateLogEntries()
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.preReplicateLogEntries(this);
}
});
}
public void postReplicateLogEntries()
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.postReplicateLogEntries(this);
}
});
}
public ReplicationEndpoint postCreateReplicationEndPoint(final ReplicationEndpoint endpoint)
throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return endpoint;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionServerObserver, ReplicationEndpoint>(
rsObserverGetter, endpoint) {
@Override
public ReplicationEndpoint call(RegionServerObserver observer) throws IOException {
return observer.postCreateReplicationEndPoint(this, getResult());
}
});
}
public void preClearCompactionQueues() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.preClearCompactionQueues(this);
}
});
}
public void postClearCompactionQueues() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.postClearCompactionQueues(this);
}
});
}
public void preExecuteProcedures() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.preExecuteProcedures(this);
}
});
}
public void postExecuteProcedures() throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionServerObserverOperation() {
@Override
public void call(RegionServerObserver observer) throws IOException {
observer.postExecuteProcedures(this);
}
});
}
@Override public Optional<RegionServerObserver> getRegionServerObserver() {
return Optional.of(this);
}
@Override
public Optional<RegionServerObserver> getRegionServerObserver() {
return Optional.of(this);
}
@Override
public Optional<RegionServerObserver> getRegionServerObserver() {
return Optional.of(this);
}
@BeforeClass
public static void setUp() throws Exception {
UTIL1.getConfiguration().setClass(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
HaltCP.class, RegionServerObserver.class);
SyncReplicationTestBase.setUp();
}
@Override
public Optional<RegionServerObserver> getRegionServerObserver() {
return Optional.of(this);
}
@Override
public Optional<RegionServerObserver> getRegionServerObserver() {
return Optional.of(this);
}