下面列出了怎么用org.apache.hadoop.hbase.security.UserProvider的API类实例代码及写法,或者点击链接到github查看源代码。
HConnectionKey(Configuration conf) {
Map<String, String> m = new HashMap<>();
if (conf != null) {
for (String property : CONNECTION_PROPERTIES) {
String value = conf.get(property);
if (value != null) {
m.put(property, value);
}
}
}
this.properties = Collections.unmodifiableMap(m);
try {
UserProvider provider = UserProvider.instantiate(conf);
User currentUser = provider.getCurrent();
if (currentUser != null) {
username = currentUser.getName();
}
} catch (IOException ioe) {
LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe);
}
}
/**
* Check that checkAndPut fails if the cell does not exist, then put in the cell, then check that
* the checkAndPut succeeds.
*/
public static void doTestCheckAndPut() throws Exception {
ThriftHBaseServiceHandler handler =
new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
handler.createTable(tableAname, getColumnDescriptors());
try {
List<Mutation> mutations = new ArrayList<>(1);
mutations.add(new Mutation(false, columnAname, valueAname, true));
Mutation putB = (new Mutation(false, columnBname, valueBname, true));
assertFalse(handler.checkAndPut(tableAname, rowAname, columnAname, valueAname, putB, null));
handler.mutateRow(tableAname, rowAname, mutations, null);
assertTrue(handler.checkAndPut(tableAname, rowAname, columnAname, valueAname, putB, null));
TRowResult rowResult = handler.getRow(tableAname, rowAname, null).get(0);
assertEquals(rowAname, rowResult.row);
assertEquals(valueBname, rowResult.columns.get(columnBname).value);
} finally {
handler.disableTable(tableAname);
handler.deleteTable(tableAname);
}
}
/**
* Sets the security firstly for getting the correct default realm.
*/
@BeforeClass
public static void beforeClass() throws Exception {
UserProvider.setUserProviderForTesting(UTIL.getConfiguration(),
HadoopSecurityEnabledUserProviderForTesting.class);
setUpKdcServer();
SecureTestUtil.enableSecurity(UTIL.getConfiguration());
UTIL.getConfiguration().setBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY, true);
VisibilityTestUtil.enableVisiblityLabels(UTIL.getConfiguration());
SecureTestUtil.verifyConfiguration(UTIL.getConfiguration());
setUpClusterKdc();
UTIL.startMiniCluster();
UTIL.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
UTIL.waitUntilAllRegionsAssigned(VisibilityConstants.LABELS_TABLE_NAME);
UTIL.waitTableEnabled(PermissionStorage.ACL_TABLE_NAME, 50000);
UTIL.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME, 50000);
SecureTestUtil.grantGlobal(UTIL, USER_ADMIN,
Permission.Action.ADMIN,
Permission.Action.CREATE,
Permission.Action.EXEC,
Permission.Action.READ,
Permission.Action.WRITE);
addLabels(UTIL.getConfiguration(), Arrays.asList(USER_OWNER),
Arrays.asList(PRIVATE, CONFIDENTIAL, SECRET, TOPSECRET));
}
@Override
public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> c)
throws IOException {
if (c.getEnvironment().getConfiguration()
.getBoolean(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, false)) {
MasterCoprocessorEnvironment mEnv = c.getEnvironment();
if (!(mEnv instanceof HasMasterServices)) {
throw new IOException("Does not implement HMasterServices");
}
masterServices = ((HasMasterServices) mEnv).getMasterServices();
hdfsAclHelper = new SnapshotScannerHDFSAclHelper(masterServices.getConfiguration(),
masterServices.getConnection());
pathHelper = hdfsAclHelper.getPathHelper();
hdfsAclHelper.setCommonDirectoryPermission();
initialized = true;
userProvider = UserProvider.instantiate(c.getEnvironment().getConfiguration());
} else {
LOG.warn("Try to initialize the coprocessor SnapshotScannerHDFSAclController but failure "
+ "because the config " + SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE
+ " is false.");
}
}
private void preCheckPermission() throws IOException {
if (shouldIgnorePreCheckPermission()) {
return;
}
Path hbaseDir = CommonFSUtils.getRootDir(getConf());
FileSystem fs = hbaseDir.getFileSystem(getConf());
UserProvider userProvider = UserProvider.instantiate(getConf());
UserGroupInformation ugi = userProvider.getCurrent().getUGI();
FileStatus[] files = fs.listStatus(hbaseDir);
for (FileStatus file : files) {
try {
fs.access(file.getPath(), FsAction.WRITE);
} catch (AccessControlException ace) {
LOG.warn("Got AccessDeniedException when preCheckPermission ", ace);
errors.reportError(ERROR_CODE.WRONG_USAGE, "Current user " + ugi.getUserName()
+ " does not have write perms to " + file.getPath()
+ ". Please rerun hbck as hdfs user " + file.getOwner());
throw ace;
}
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// set the always on security provider
UserProvider.setUserProviderForTesting(util.getConfiguration(),
HadoopSecurityEnabledUserProviderForTesting.class);
// setup configuration
SecureTestUtil.enableSecurity(util.getConfiguration());
util.getConfiguration().setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
MAX_FILES_PER_REGION_PER_FAMILY);
// change default behavior so that tag values are returned with normal rpcs
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
KeyValueCodecWithTags.class.getCanonicalName());
util.startMiniCluster();
// Wait for the ACL table to become available
util.waitTableEnabled(PermissionStorage.ACL_TABLE_NAME);
setupNamespace();
}
/**
* test for ConnectionCache cleaning expired Connection
*/
@Test
public void testConnectionChore() throws Exception {
UTIL.startMiniCluster();
//1s for clean interval & 5s for maxIdleTime
ConnectionCache cache = new ConnectionCache(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()), 1000, 5000);
ConnectionCache.ConnectionInfo info = cache.getCurrentConnection();
assertEquals(false, info.connection.isClosed());
Thread.sleep(7000);
assertEquals(true, info.connection.isClosed());
UTIL.shutdownMiniCluster();
}
public static void initCredentials(JobConf job) throws IOException {
UserProvider userProvider = UserProvider.instantiate(job);
if (userProvider.isHadoopSecurityEnabled()) {
// propagate delegation related props from launcher job to MR job
if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
}
}
if (userProvider.isHBaseSecurityEnabled()) {
Connection conn = ConnectionFactory.createConnection(job);
try {
// login the server principal (if using secure Hadoop)
User user = userProvider.getCurrent();
TokenUtil.addTokenForJob(conn, job, user);
} catch (InterruptedException ie) {
LOG.error("Interrupted obtaining user authentication token", ie);
Thread.currentThread().interrupt();
} finally {
conn.close();
}
}
}
/**
* Obtain an authentication token, for the specified cluster, on behalf of the current user
* and add it to the credentials for the given map reduce job.
*
* @param job The job that requires the permission.
* @param conf The configuration to use in connecting to the peer cluster
* @throws IOException When the authentication token cannot be obtained.
*/
public static void initCredentialsForCluster(Job job, Configuration conf)
throws IOException {
UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
if (userProvider.isHBaseSecurityEnabled()) {
try {
Connection peerConn = ConnectionFactory.createConnection(conf);
try {
TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
} finally {
peerConn.close();
}
} catch (InterruptedException e) {
LOG.info("Interrupted obtaining user authentication token");
Thread.interrupted();
}
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
setUpBaseConf(TEST_UTIL.getConfiguration());
// Setup separate test-data directory for MR cluster and set corresponding configurations.
// Otherwise, different test classes running MR cluster can step on each other.
TEST_UTIL.getDataTestDir();
// set the always on security provider
UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
HadoopSecurityEnabledUserProviderForTesting.class);
// setup configuration
SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.startMiniMapReduceCluster();
// Wait for the ACL table to become available
TEST_UTIL.waitTableEnabled(PermissionStorage.ACL_TABLE_NAME);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
setUpBaseConf(TEST_UTIL.getConfiguration());
// Setup separate test-data directory for MR cluster and set corresponding configurations.
// Otherwise, different test classes running MR cluster can step on each other.
TEST_UTIL.getDataTestDir();
// set the always on security provider
UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
HadoopSecurityEnabledUserProviderForTesting.class);
// setup configuration
SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.startMiniMapReduceCluster();
// Wait for the ACL table to become available
TEST_UTIL.waitTableEnabled(PermissionStorage.ACL_TABLE_NAME);
}
/**
* Constructor with existing configuration
* @param conf existing configuration
* @param userProvider the login user provider
* @throws IOException
*/
RESTServlet(final Configuration conf,
final UserProvider userProvider) throws IOException {
this.realUser = userProvider.getCurrent().getUGI();
this.conf = conf;
registerCustomFilter(conf);
int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
connectionCache = new ConnectionCache(
conf, userProvider, cleanInterval, maxIdleTime);
if (supportsProxyuser()) {
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
}
metrics = new MetricsREST();
pauseMonitor = new JvmPauseMonitor(conf, metrics.getSource());
pauseMonitor.start();
}
@Test
public void testMetrics() throws IOException {
final String path = "/" + TABLE + "/" + ROW_4 + "/" + COLUMN_1;
Response response = client.put(path, Constants.MIMETYPE_BINARY,
Bytes.toBytes(VALUE_4));
assertEquals(200, response.getCode());
Thread.yield();
response = client.get(path, Constants.MIMETYPE_JSON);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
response = deleteRow(TABLE, ROW_4);
assertEquals(200, response.getCode());
UserProvider userProvider = UserProvider.instantiate(conf);
METRICS_ASSERT.assertCounterGt("requests", 2L,
RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());
METRICS_ASSERT.assertCounterGt("successfulGet", 0L,
RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());
METRICS_ASSERT.assertCounterGt("successfulPut", 0L,
RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());
METRICS_ASSERT.assertCounterGt("successfulDelete", 0L,
RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());
}
public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {
UserProvider provider = UserProvider.instantiate(hbaseConfig);
if (UserGroupInformation.isSecurityEnabled()) {
String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
if (keytab != null) {
hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
}
String userName = (String) conf.get(STORM_USER_NAME_KEY);
if (userName != null) {
hbaseConfig.set(STORM_USER_NAME_KEY, userName);
}
provider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY,
InetAddress.getLocalHost().getCanonicalHostName());
}
return provider;
}
public HBaseServiceHandler(final Configuration c,
final UserProvider userProvider) throws IOException {
this.conf = c;
int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
connectionCache = new ConnectionCache(
conf, userProvider, cleanInterval, maxIdleTime);
}
private ThriftHBaseServiceHandler createHandler() throws TException {
try {
Configuration conf = UTIL.getConfiguration();
return new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
} catch (IOException ie) {
throw new TException(ie);
}
}
private ThriftHBaseServiceHandler createHandler() throws TException {
try {
Configuration conf = UTIL.getConfiguration();
return new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
} catch (IOException ie) {
throw new TException(ie);
}
}
@Test
public void testSlowLogResponses() throws Exception {
// start a thrift server
HBaseThriftTestingUtility THRIFT_TEST_UTIL = new HBaseThriftTestingUtility();
Configuration configuration = UTIL.getConfiguration();
configuration.setBoolean("hbase.regionserver.slowlog.buffer.enabled", true);
THRIFT_TEST_UTIL.startThriftServer(configuration, ThriftServerType.ONE);
ThriftHBaseServiceHandler thriftHBaseServiceHandler =
new ThriftHBaseServiceHandler(configuration,
UserProvider.instantiate(configuration));
Collection<ServerName> serverNames = UTIL.getAdmin().getRegionServers();
Set<TServerName> tServerNames =
ThriftUtilities.getServerNamesFromHBase(new HashSet<>(serverNames));
List<Boolean> clearedResponses =
thriftHBaseServiceHandler.clearSlowLogResponses(tServerNames);
clearedResponses.forEach(Assert::assertTrue);
TLogQueryFilter tLogQueryFilter = new TLogQueryFilter();
tLogQueryFilter.setLimit(15);
Assert.assertEquals(tLogQueryFilter.getFilterByOperator(), TFilterByOperator.OR);
LogQueryFilter logQueryFilter = ThriftUtilities.getSlowLogQueryFromThrift(tLogQueryFilter);
Assert.assertEquals(logQueryFilter.getFilterByOperator(), LogQueryFilter.FilterByOperator.OR);
tLogQueryFilter.setFilterByOperator(TFilterByOperator.AND);
logQueryFilter = ThriftUtilities.getSlowLogQueryFromThrift(tLogQueryFilter);
Assert.assertEquals(logQueryFilter.getFilterByOperator(), LogQueryFilter.FilterByOperator.AND);
List<TOnlineLogRecord> tLogRecords =
thriftHBaseServiceHandler.getSlowLogResponses(tServerNames, tLogQueryFilter);
assertEquals(tLogRecords.size(), 0);
}
/**
* Tests for creating, enabling, disabling, and deleting tables. Also
* tests that creating a table with an invalid column name yields an
* IllegalArgument exception.
*/
public void doTestTableCreateDrop() throws Exception {
ThriftHBaseServiceHandler handler =
new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
doTestTableCreateDrop(handler);
}
public void doTestIncrements() throws Exception {
ThriftHBaseServiceHandler handler =
new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
createTestTables(handler);
doTestIncrements(handler);
dropTestTables(handler);
}
/**
* Tests adding a series of Mutations and BatchMutations, including a
* delete mutation. Also tests data retrieval, and getting back multiple
* versions.
*/
public void doTestTableMutations() throws Exception {
ThriftHBaseServiceHandler handler =
new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
doTestTableMutations(handler);
}
/**
* For HBASE-2556
* Tests for GetTableRegions
*/
public void doTestGetTableRegions() throws Exception {
ThriftHBaseServiceHandler handler =
new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
doTestGetTableRegions(handler);
}
/**
* Appends the value to a cell and checks that the cell value is updated properly.
*/
public static void doTestAppend() throws Exception {
ThriftHBaseServiceHandler handler =
new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
handler.createTable(tableAname, getColumnDescriptors());
try {
List<Mutation> mutations = new ArrayList<>(1);
mutations.add(new Mutation(false, columnAname, valueAname, true));
handler.mutateRow(tableAname, rowAname, mutations, null);
List<ByteBuffer> columnList = new ArrayList<>(1);
columnList.add(columnAname);
List<ByteBuffer> valueList = new ArrayList<>(1);
valueList.add(valueBname);
TAppend append = new TAppend(tableAname, rowAname, columnList, valueList);
handler.append(append);
TRowResult rowResult = handler.getRow(tableAname, rowAname, null).get(0);
assertEquals(rowAname, rowResult.row);
assertArrayEquals(Bytes.add(valueAname.array(), valueBname.array()),
rowResult.columns.get(columnAname).value.array());
} finally {
handler.disableTable(tableAname);
handler.deleteTable(tableAname);
}
}
@Test
public void testGetThriftServerType() throws Exception {
ThriftHBaseServiceHandler handler =
new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
assertEquals(TThriftServerType.ONE, handler.getThriftServerType());
}
public static Map<byte[], Response> run(final Configuration conf, TableName tableName,
Scan scan, Path dir) throws Throwable {
FileSystem fs = dir.getFileSystem(conf);
UserProvider userProvider = UserProvider.instantiate(conf);
checkDir(fs, dir);
FsDelegationToken fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
fsDelegationToken.acquireDelegationToken(fs);
try {
final ExportProtos.ExportRequest request = getConfiguredRequest(conf, dir,
scan, fsDelegationToken.getUserToken());
try (Connection con = ConnectionFactory.createConnection(conf);
Table table = con.getTable(tableName)) {
Map<byte[], Response> result = new TreeMap<>(Bytes.BYTES_COMPARATOR);
table.coprocessorService(ExportProtos.ExportService.class,
scan.getStartRow(),
scan.getStopRow(),
(ExportProtos.ExportService service) -> {
ServerRpcController controller = new ServerRpcController();
Map<byte[], ExportProtos.ExportResponse> rval = new TreeMap<>(Bytes.BYTES_COMPARATOR);
CoprocessorRpcUtils.BlockingRpcCallback<ExportProtos.ExportResponse>
rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
service.export(controller, request, rpcCallback);
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return rpcCallback.get();
}).forEach((k, v) -> result.put(k, new Response(v)));
return result;
} catch (Throwable e) {
fs.delete(dir, true);
throw e;
}
} finally {
fsDelegationToken.releaseDelegationToken();
}
}
@Override
public void start(CoprocessorEnvironment environment) throws IOException {
if (environment instanceof RegionCoprocessorEnvironment) {
env = (RegionCoprocessorEnvironment) environment;
userProvider = UserProvider.instantiate(env.getConfiguration());
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
SecureWriter(final Configuration conf, final UserProvider userProvider,
final Token userToken, final List<SequenceFile.Writer.Option> opts)
throws IOException {
User user = getActiveUser(userProvider, userToken);
try {
SequenceFile.Writer sequenceFileWriter =
user.runAs((PrivilegedExceptionAction<SequenceFile.Writer>) () ->
SequenceFile.createWriter(conf,
opts.toArray(new SequenceFile.Writer.Option[opts.size()])));
privilegedWriter = new PrivilegedWriter(user, sequenceFileWriter);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
private static User getActiveUser(final UserProvider userProvider, final Token userToken)
throws IOException {
User user = RpcServer.getRequestUser().orElse(userProvider.getCurrent());
if (user == null && userToken != null) {
LOG.warn("No found of user credentials, but a token was got from user request");
} else if (user != null && userToken != null) {
user.addToken(userToken);
}
return user;
}
public HFileReplicator(Configuration sourceClusterConf,
String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
AsyncClusterConnection connection, List<String> sourceClusterIds) throws IOException {
this.sourceClusterConf = sourceClusterConf;
this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
this.bulkLoadHFileMap = tableQueueMap;
this.conf = conf;
this.connection = connection;
this.sourceClusterIds = sourceClusterIds;
userProvider = UserProvider.instantiate(conf);
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
this.hbaseStagingDir =
new Path(CommonFSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
this.maxCopyThreads =
this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("HFileReplicationCopier-%1$d-" + this.sourceBaseNamespaceDirPath).
build());
this.copiesPerThread =
conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
sinkFs = FileSystem.get(conf);
}
private AsyncClusterConnection getConnection() throws IOException {
// See https://en.wikipedia.org/wiki/Double-checked_locking
AsyncClusterConnection connection = sharedConn;
if (connection == null) {
synchronized (sharedConnLock) {
connection = sharedConn;
if (connection == null) {
connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
UserProvider.instantiate(conf).getCurrent());
sharedConn = connection;
}
}
}
return connection;
}