下面列出了怎么用org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName, TableDescriptor htd) throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postModifyTable()");
}
try {
activatePluginClassLoader();
implMasterObserver.postModifyTable(ctx, tableName, htd);
} finally {
deactivatePluginClassLoader();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postModifyTable()");
}
}
/**
* Perform the validation checks for a coprocessor to determine if the path
* is white listed or not.
* @throws IOException if path is not included in whitelist or a failure
* occurs in processing
* @param ctx as passed in from the coprocessor
* @param htd as passed in from the coprocessor
*/
private static void verifyCoprocessors(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableDescriptor htd) throws IOException {
Collection<String> paths =
ctx.getEnvironment().getConfiguration().getStringCollection(
CP_COPROCESSOR_WHITELIST_PATHS_KEY);
for (CoprocessorDescriptor cp : htd.getCoprocessorDescriptors()) {
if (cp.getJarPath().isPresent()) {
if (paths.stream().noneMatch(p -> {
Path wlPath = new Path(p);
if (validatePath(new Path(cp.getJarPath().get()), wlPath)) {
LOG.debug(String.format("Coprocessor %s found in directory %s",
cp.getClassName(), p));
return true;
}
return false;
})) {
throw new IOException(String.format("Loading %s DENIED in %s",
cp.getClassName(), CP_COPROCESSOR_WHITELIST_PATHS_KEY));
}
}
}
}
@Override
public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String ns) throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.preDeleteNamespace()");
}
try {
activatePluginClassLoader();
implMasterObserver.postDeleteNamespace(ctx, ns);
} finally {
deactivatePluginClassLoader();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.preDeleteNamespace()");
}
}
@Override
public void postCloneSnapshot(ObserverContext<MasterCoprocessorEnvironment> observerContext, SnapshotDescription snapshot, TableDescriptor tableDescriptor) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postCloneSnapshot()");
}
try {
activatePluginClassLoader();
implMasterObserver.postCloneSnapshot(observerContext,snapshot,tableDescriptor);
} finally {
deactivatePluginClassLoader();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postCloneSnapshot()");
}
}
@Override
public void postRestoreSnapshot(ObserverContext<MasterCoprocessorEnvironment> observerContext, SnapshotDescription snapshot, TableDescriptor tableDescriptor) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postRestoreSnapshot()");
}
try {
activatePluginClassLoader();
implMasterObserver.postRestoreSnapshot(observerContext,snapshot,tableDescriptor);
} finally {
deactivatePluginClassLoader();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postRestoreSnapshot()");
}
}
@Override
public void postCompletedCreateTableAction(ObserverContext<MasterCoprocessorEnvironment> c,
TableDescriptor desc, RegionInfo[] regions) throws IOException {
if (needHandleTableHdfsAcl(desc, "createTable " + desc.getTableName())) {
TableName tableName = desc.getTableName();
// 1. Create table directories to make HDFS acls can be inherited
hdfsAclHelper.createTableDirectories(tableName);
// 2. Add table owner HDFS acls
String owner =
desc.getOwnerString() == null ? getActiveUser(c).getShortName() : desc.getOwnerString();
hdfsAclHelper.addTableAcl(tableName, Sets.newHashSet(owner), "create");
// 3. Record table owner permission is synced to HDFS in acl table
SnapshotScannerHDFSAclStorage.addUserTableHdfsAcl(c.getEnvironment().getConnection(), owner,
tableName);
}
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
this.conf = env.getConfiguration();
authorizationEnabled = AccessChecker.isAuthorizationSupported(conf);
if (!authorizationEnabled) {
LOG.warn("The VisibilityController has been loaded with authorization checks disabled.");
}
if (HFile.getFormatVersion(conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
throw new RuntimeException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
+ " is required to persist visibility labels. Consider setting " + HFile.FORMAT_VERSION_KEY
+ " accordingly.");
}
// Do not create for master CPs
if (!(env instanceof MasterCoprocessorEnvironment)) {
visibilityLabelService = VisibilityLabelServiceManager.getInstance()
.getVisibilityLabelService(this.conf);
}
}
@Override
public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
final TableName tableName) throws IOException {
final Configuration conf = c.getEnvironment().getConfiguration();
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try (Table table =
c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
PermissionStorage.removeTablePermissions(conf, tableName, table);
}
return null;
}
});
zkPermissionWatcher.deleteTableACLNode(tableName);
}
@Override
public void preTruncateTable(ObserverContext<MasterCoprocessorEnvironment> c,
final TableName tableName) throws IOException {
requirePermission(c, "truncateTable",
tableName, null, null, Action.ADMIN, Action.CREATE);
final Configuration conf = c.getEnvironment().getConfiguration();
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
List<UserPermission> acls =
PermissionStorage.getUserTablePermissions(conf, tableName, null, null, null, false);
if (acls != null) {
tableAcls.put(tableName, acls);
}
return null;
}
});
}
@Override
public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor)
throws IOException {
User user = getActiveUser(ctx);
if (SnapshotDescriptionUtils.isSnapshotOwner(snapshot, user)
&& hTableDescriptor.getTableName().getNameAsString()
.equals(snapshot.getTableNameAsString())) {
// Snapshot owner is allowed to create a table with the same name as the snapshot he took
AuthResult result = AuthResult.allow("cloneSnapshot " + snapshot.getName(),
"Snapshot owner check allowed", user, null, hTableDescriptor.getTableName(), null);
AccessChecker.logResult(result);
} else {
accessChecker.requirePermission(user, "cloneSnapshot " + snapshot.getName(), null,
Action.ADMIN);
}
}
@Override
public void preCloneSnapshot(final ObserverContext<MasterCoprocessorEnvironment> ctx,
final SnapshotDescription snapshot, final TableDescriptor hTableDescriptor)
throws IOException {
if (cloneCount != null) {
cloneCount.incrementAndGet();
}
}
@Override
public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> env,
TableDescriptor desc, RegionInfo[] regions) throws IOException {
if (desc.getTableName().equals(TABLE)) {
throw new AccessDeniedException("Don't allow creation of table");
}
}
@Override
public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableDescriptor tableDescriptor, RegionInfo[] hRegionInfos) throws IOException {
LOG.info("==> HBaseAtlasCoprocessor.postCreateTable()");
hbaseAtlasHook.sendHBaseTableOperation(tableDescriptor, null, HBaseAtlasHook.OPERATION.CREATE_TABLE, observerContext);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postCreateTable()");
}
}
@Override
public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> observerContext, TableName tableName, TableDescriptor tableDescriptor) throws IOException {
LOG.info("==> HBaseAtlasCoprocessor.postModifyTable()");
hbaseAtlasHook.sendHBaseTableOperation(tableDescriptor, tableName, HBaseAtlasHook.OPERATION.ALTER_TABLE, observerContext);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postModifyTable()");
}
}
@Override
public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> observerContext, NamespaceDescriptor namespaceDescriptor) throws IOException {
LOG.info("==> HBaseAtlasCoprocessor.postCreateNamespace()");
hbaseAtlasHook.sendHBaseNameSpaceOperation(namespaceDescriptor, null, HBaseAtlasHook.OPERATION.CREATE_NAMESPACE, observerContext);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postCreateNamespace()");
}
}
@Override
public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> observerContext, String s) throws IOException {
LOG.info("==> HBaseAtlasCoprocessor.postDeleteNamespace()");
hbaseAtlasHook.sendHBaseNameSpaceOperation(null, s, HBaseAtlasHook.OPERATION.DELETE_NAMESPACE, observerContext);
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postDeleteNamespace()");
}
}
@Override
public void postCompletedDeleteTableAction(
final ObserverContext<MasterCoprocessorEnvironment> ctx,
final TableName tableName) throws IOException {
// the AccessController test, some times calls only and directly the
// postCompletedDeleteTableAction()
if (tableDeletionLatch != null) {
tableDeletionLatch.countDown();
}
}
@Override
public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
String namespace) throws IOException {
if (retryNum.getAndIncrement() < DEFAULT_RETRIES_NUMBER) {
throw new IOException("call fail");
}
}
@Override
public void postCompletedSnapshotAction(ObserverContext<MasterCoprocessorEnvironment> c,
SnapshotDescription snapshot, TableDescriptor tableDescriptor) throws IOException {
if (needHandleTableHdfsAcl(tableDescriptor, "snapshot " + snapshot.getName())) {
// Add HDFS acls of users with table read permission to snapshot files
hdfsAclHelper.snapshotAcl(snapshot);
}
}
@Override
public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
String namespace) throws IOException {
Threads.sleep(DEFAULT_RPC_TIMEOUT / 2);
if (sleepTime.addAndGet(DEFAULT_RPC_TIMEOUT / 2) < DEFAULT_OPERATION_TIMEOUT) {
throw new IOException("call fail");
}
}
@Override
public void postDeleteTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName) throws IOException {
if (balancer != null && balancer.isTableColocated(tableName)) {
balancer.removeTablesFromColocation(tableName);
}
}
@Test
public void testCoprocessorLoading() throws Exception {
MasterCoprocessorHost cpHost =
TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterCoprocessorHost();
cpHost.load(MyAccessController.class, Coprocessor.PRIORITY_HIGHEST, conf);
AccessController ACCESS_CONTROLLER = cpHost.findCoprocessor(MyAccessController.class);
MasterCoprocessorEnvironment CP_ENV = cpHost.createEnvironment(
ACCESS_CONTROLLER, Coprocessor.PRIORITY_HIGHEST, 1, conf);
RegionServerCoprocessorHost rsHost = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)
.getRegionServerCoprocessorHost();
RegionServerCoprocessorEnvironment RSCP_ENV = rsHost.createEnvironment(
ACCESS_CONTROLLER, Coprocessor.PRIORITY_HIGHEST, 1, conf);
}
@Override
public void preDeleteSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
SnapshotDescription snapshot) throws IOException {
if (failures.contains(Failure.PRE_DELETE_SNAPSHOT_FAILURE)) {
throw new IOException("preDeleteSnapshot");
}
}
@Override
public void postDeleteSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
SnapshotDescription snapshot) throws IOException {
if (failures.contains(Failure.POST_DELETE_SNAPSHOT_FAILURE)) {
throw new IOException("postDeleteSnapshot");
}
}
@Override
public void preMergeRegionsCommitAction(
ObserverContext<MasterCoprocessorEnvironment> ctx,
RegionInfo[] regionsToMerge, List<Mutation> metaEntries) {
mergeCommitArrive.countDown();
LOG.error("mergeCommitArrive countdown");
}
@Override
public void preCreateTableAction(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableDescriptor desc, RegionInfo[] regions) throws IOException {
if (throwExceptionInPreCreateTableAction) {
throw new IOException("Throw exception as it is demanded.");
}
}
@Override
public void postRevoke(ObserverContext<MasterCoprocessorEnvironment> c,
UserPermission userPermission) throws IOException {
if (checkInitialized("revoke " + userPermission)) {
try (Table aclTable =
c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
String userName = userPermission.getUser();
Configuration conf = c.getEnvironment().getConfiguration();
switch (userPermission.getAccessScope()) {
case GLOBAL:
UserPermission userGlobalPerm = getUserGlobalPermission(conf, userName);
if (userGlobalPerm == null || !hdfsAclHelper.containReadAction(userGlobalPerm)) {
removeUserGlobalHdfsAcl(aclTable, userName, userPermission);
}
break;
case NAMESPACE:
NamespacePermission nsPerm = (NamespacePermission) userPermission.getPermission();
UserPermission userNsPerm =
getUserNamespacePermission(conf, userName, nsPerm.getNamespace());
if (userNsPerm == null || !hdfsAclHelper.containReadAction(userNsPerm)) {
removeUserNamespaceHdfsAcl(aclTable, userName, nsPerm.getNamespace(), userPermission);
}
break;
case TABLE:
TablePermission tPerm = (TablePermission) userPermission.getPermission();
if (needHandleTableHdfsAcl(tPerm)) {
TableName tableName = tPerm.getTableName();
UserPermission userTablePerm = getUserTablePermission(conf, userName, tableName);
if (userTablePerm == null || !hdfsAclHelper.containReadAction(userTablePerm)) {
removeUserTableHdfsAcl(aclTable, userName, tableName, userPermission);
}
}
break;
default:
throw new IllegalArgumentException(
"Illegal user permission scope " + userPermission.getAccessScope());
}
}
}
}
@Override
public TableDescriptor preModifyTable(ObserverContext<MasterCoprocessorEnvironment> c,
TableName tableName, TableDescriptor currentDesc, TableDescriptor newDesc)
throws IOException {
// TODO: potentially check if this is a add/modify/delete column operation
requirePermission(c, "modifyTable", tableName, null, null, Action.ADMIN, Action.CREATE);
return newDesc;
}
@Override
public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName)
throws IOException {
if (Bytes.equals(tableName.getName(), PermissionStorage.ACL_GLOBAL_NAME)) {
// We have to unconditionally disallow disable of the ACL table when we are installed,
// even if not enforcing authorizations. We are still allowing grants and revocations,
// checking permissions and logging audit messages, etc. If the ACL table is not
// available we will fail random actions all over the place.
throw new AccessDeniedException("Not allowed to disable " + PermissionStorage.ACL_TABLE_NAME
+ " table with AccessController installed");
}
requirePermission(c, "disableTable",
tableName, null, null, Action.ADMIN, Action.CREATE);
}
@Override
public void postCompletedDeleteTableAction(
final ObserverContext<MasterCoprocessorEnvironment> ctx,
final TableName tableName)
throws IOException {
// the AccessController test, some times calls only and directly the postDeleteTableHandler()
if (tableDeletionLatch != null) {
tableDeletionLatch.countDown();
}
}