下面列出了怎么用org.apache.hadoop.hbase.ipc.RpcServer的API类实例代码及写法,或者点击链接到github查看源代码。
private String getRemoteAddress() {
InetAddress remoteAddr = null;
try {
remoteAddr = RpcServer.getRemoteAddress().get();
} catch (NoSuchElementException e) {
LOG.info("Unable to get remote Address");
}
if(remoteAddr == null) {
remoteAddr = RpcServer.getRemoteIp();
}
String strAddr = remoteAddr != null ? remoteAddr.getHostAddress() : null;
return strAddr;
}
@Override
public void start(CoprocessorEnvironment env) {
// if running at region
if (env instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment regionEnv = (RegionCoprocessorEnvironment)env;
/* Getting the RpcServer from a RegionCE is wrong. There cannot be an expectation that Region
is hosted inside a RegionServer. If you need RpcServer, then pass in a RegionServerCE.
TODO: FIX.
*/
RegionServerServices rss = ((HasRegionServerServices)regionEnv).getRegionServerServices();
RpcServerInterface server = rss.getRpcServer();
SecretManager<?> mgr = ((RpcServer)server).getSecretManager();
if (mgr instanceof AuthenticationTokenSecretManager) {
secretManager = (AuthenticationTokenSecretManager)mgr;
}
}
}
/**
* Check the quota for the current (rpc-context) user.
* Returns the OperationQuota used to get the available quota and
* to report the data/usage of the operation.
* @param region the region where the operation will be performed
* @param numWrites number of writes to perform
* @param numReads number of short-reads to perform
* @param numScans number of scan to perform
* @return the OperationQuota
* @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded.
*/
private OperationQuota checkQuota(final Region region,
final int numWrites, final int numReads, final int numScans)
throws IOException, RpcThrottlingException {
Optional<User> user = RpcServer.getRequestUser();
UserGroupInformation ugi;
if (user.isPresent()) {
ugi = user.get().getUGI();
} else {
ugi = User.getCurrent().getUGI();
}
TableName table = region.getTableDescriptor().getTableName();
OperationQuota quota = getQuota(ugi, table);
try {
quota.checkQuota(numWrites, numReads, numScans);
} catch (RpcThrottlingException e) {
LOG.debug("Throttling exception for user=" + ugi.getUserName() +
" table=" + table + " numWrites=" + numWrites +
" numReads=" + numReads + " numScans=" + numScans +
": " + e.getMessage());
throw e;
}
return quota;
}
/**
* @return Get remote side's InetAddress
*/
InetAddress getRemoteInetAddress(final int port,
final long serverStartCode) throws UnknownHostException {
// Do it out here in its own little method so can fake an address when
// mocking up in tests.
InetAddress ia = RpcServer.getRemoteIp();
// The call could be from the local regionserver,
// in which case, there is no remote address.
if (ia == null && serverStartCode == startcode) {
InetSocketAddress isa = rpcServices.getSocketAddress();
if (isa != null && isa.getPort() == port) {
ia = isa.getAddress();
}
}
return ia;
}
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
if (this.closed) {
throw new IOException(
"Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
}
MutableLong txidHolder = new MutableLong();
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
txidHolder.setValue(ringBuffer.next());
});
long txid = txidHolder.longValue();
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
ringBuffer.publish(txid);
}
return txid;
}
@Override
public AuthenticationProtos.GetAuthenticationTokenResponse getAuthenticationToken(
RpcController controller, AuthenticationProtos.GetAuthenticationTokenRequest request)
throws ServiceException {
LOG.debug("Authentication token request from " + RpcServer.getRequestUserName().orElse(null));
// Ignore above passed in controller -- it is always null
ServerRpcController serverController = new ServerRpcController();
final BlockingRpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse>
callback = new BlockingRpcCallback<>();
getAuthenticationToken(null, request, callback);
try {
serverController.checkFailed();
return callback.get();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
@Override
public AuthenticationProtos.WhoAmIResponse whoAmI(
RpcController controller, AuthenticationProtos.WhoAmIRequest request)
throws ServiceException {
LOG.debug("whoAmI() request from " + RpcServer.getRequestUserName().orElse(null));
// Ignore above passed in controller -- it is always null
ServerRpcController serverController = new ServerRpcController();
BlockingRpcCallback<AuthenticationProtos.WhoAmIResponse> callback =
new BlockingRpcCallback<>();
whoAmI(null, request, callback);
try {
serverController.checkFailed();
return callback.get();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
protected void checkAccess() throws AccessDeniedException {
if (!spliceTable)
return;
if (!UserGroupInformation.isSecurityEnabled())
return;
User user = RpcServer.getRequestUser().get();
if (user == null || user.getShortName().equalsIgnoreCase("hbase"))
return;
if (RpcUtils.isAccessAllowed())
return;
if (!authTokenEnabled && authManager.authorize(user, Permission.Action.ADMIN))
return;
throw new AccessDeniedException("Insufficient permissions for user " +
user.getShortName());
}
private List<RpcServer.BlockingServiceAndInterface> getServices() {
List<RpcServer.BlockingServiceAndInterface> bssi = new ArrayList<>(1);
bssi.add(new RpcServer.BlockingServiceAndInterface(
AdminProtos.AdminService.newReflectiveBlockingService(this),
AdminProtos.AdminService.BlockingInterface.class));
return bssi;
}
private void requireScannerOwner(ObserverContext<?> ctx, InternalScanner s) throws AccessDeniedException {
if (!RpcServer.isInRpcCallContext()) {
return;
}
User user = getActiveUser(ctx);
String requestUserName = user.getShortName();
String owner = scannerOwners.get(s);
if (owner != null && !owner.equals(requestUserName)) {
throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
}
}
private static User getActiveUser(final UserProvider userProvider, final Token userToken)
throws IOException {
User user = RpcServer.getRequestUser().orElse(userProvider.getCurrent());
if (user == null && userToken != null) {
LOG.warn("No found of user credentials, but a token was got from user request");
} else if (user != null && userToken != null) {
user.addToken(userToken);
}
return user;
}
/**
* Verify, when servicing an RPC, that the caller is the scanner owner. If so, we assume that
* access control is correctly enforced based on the checks performed in preScannerOpen()
*/
private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
if (!RpcServer.isInRpcCallContext())
return;
String requestUName = RpcServer.getRequestUserName().orElse(null);
String owner = scannerOwners.get(s);
if (authorizationEnabled && owner != null && !owner.equals(requestUName)) {
throw new AccessDeniedException("User '" + requestUName + "' is not the scanner owner!");
}
}
private void logResult(boolean isAllowed, String request, String reason, byte[] user,
List<byte[]> labelAuths, String regex) {
if (AUDITLOG.isTraceEnabled()) {
// This is more duplicated code!
List<String> labelAuthsStr = new ArrayList<>();
if (labelAuths != null) {
int labelAuthsSize = labelAuths.size();
labelAuthsStr = new ArrayList<>(labelAuthsSize);
for (int i = 0; i < labelAuthsSize; i++) {
labelAuthsStr.add(Bytes.toString(labelAuths.get(i)));
}
}
User requestingUser = null;
try {
requestingUser = VisibilityUtils.getActiveUser();
} catch (IOException e) {
LOG.warn("Failed to get active system user.");
LOG.debug("Details on failure to get active system user.", e);
}
AUDITLOG.trace("Access " + (isAllowed ? "allowed" : "denied") + " for user " +
(requestingUser != null ? requestingUser.getShortName() : "UNKNOWN") + "; reason: " +
reason + "; remote address: " +
RpcServer.getRemoteAddress().map(InetAddress::toString).orElse("") + "; request: " +
request + "; user: " + (user != null ? Bytes.toShort(user) : "null") + "; labels: " +
labelAuthsStr + "; regex: " + regex);
}
}
/**
* @return User who called RPC method. For non-RPC handling, falls back to system user
* @throws IOException When there is IOE in getting the system user (During non-RPC handling).
*/
public static User getActiveUser() throws IOException {
Optional<User> optionalUser = RpcServer.getRequestUser();
User user;
if (optionalUser.isPresent()) {
user = optionalUser.get();
} else {
user = User.getCurrent();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Current active user name is " + user.getShortName());
}
return user;
}
@Override
public void getAuthenticationToken(RpcController controller,
AuthenticationProtos.GetAuthenticationTokenRequest request,
RpcCallback<AuthenticationProtos.GetAuthenticationTokenResponse> done) {
AuthenticationProtos.GetAuthenticationTokenResponse.Builder response =
AuthenticationProtos.GetAuthenticationTokenResponse.newBuilder();
try {
if (secretManager == null) {
throw new IOException(
"No secret manager configured for token authentication");
}
User currentUser = RpcServer.getRequestUser()
.orElseThrow(() -> new AccessDeniedException("No authenticated user for request!"));
UserGroupInformation ugi = currentUser.getUGI();
if (!isAllowedDelegationTokenOp(ugi)) {
LOG.warn("Token generation denied for user=" + currentUser.getName() + ", authMethod=" +
ugi.getAuthenticationMethod());
throw new AccessDeniedException(
"Token generation only allowed for Kerberos authenticated clients");
}
Token<AuthenticationTokenIdentifier> token =
secretManager.generateToken(currentUser.getName());
response.setToken(ClientTokenUtil.toToken(token)).build();
} catch (IOException ioe) {
CoprocessorRpcUtils.setControllerException(controller, ioe);
}
done.run(response.build());
}
@Override
public void whoAmI(RpcController controller, AuthenticationProtos.WhoAmIRequest request,
RpcCallback<AuthenticationProtos.WhoAmIResponse> done) {
AuthenticationProtos.WhoAmIResponse.Builder response =
AuthenticationProtos.WhoAmIResponse.newBuilder();
RpcServer.getRequestUser().ifPresent(requestUser -> {
response.setUsername(requestUser.getShortName());
AuthenticationMethod method = requestUser.getUGI().getAuthenticationMethod();
if (method != null) {
response.setAuthMethod(method.name());
}
});
done.run(response.build());
}
public static void logResult(AuthResult result) {
if (AUDITLOG.isTraceEnabled()) {
AUDITLOG.trace(
"Access {} for user {}; reason: {}; remote address: {}; request: {}; context: {}",
(result.isAllowed() ? "allowed" : "denied"),
(result.getUser() != null ? result.getUser().getShortName() : "UNKNOWN"),
result.getReason(), RpcServer.getRemoteAddress().map(InetAddress::toString).orElse(""),
result.getRequest(), result.toContextString());
}
}
/**
* Verify, when servicing an RPC, that the caller is the scanner owner.
* If so, we assume that access control is correctly enforced based on
* the checks performed in preScannerOpen()
*/
private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
if (!RpcServer.isInRpcCallContext()) {
return;
}
String requestUserName = RpcServer.getRequestUserName().orElse(null);
String owner = scannerOwners.get(s);
if (authorizationEnabled && owner != null && !owner.equals(requestUserName)) {
throw new AccessDeniedException("User '"+ requestUserName +"' is not the scanner owner!");
}
}
/**
* @deprecated since 2.2.0 and will be removed in 4.0.0. Use
* {@link Admin#grant(UserPermission, boolean)} instead.
* @see Admin#grant(UserPermission, boolean)
* @see <a href="https://issues.apache.org/jira/browse/HBASE-21739">HBASE-21739</a>
*/
@Deprecated
@Override
public void grant(RpcController controller,
AccessControlProtos.GrantRequest request,
RpcCallback<AccessControlProtos.GrantResponse> done) {
final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
AccessControlProtos.GrantResponse response = null;
try {
// verify it's only running at .acl.
if (aclRegion) {
if (!initialized) {
throw new CoprocessorException("AccessController not yet initialized");
}
User caller = RpcServer.getRequestUser().orElse(null);
if (LOG.isDebugEnabled()) {
LOG.debug("Received request from {} to grant access permission {}",
caller.getName(), perm.toString());
}
preGrantOrRevoke(caller, "grant", perm);
// regionEnv is set at #start. Hopefully not null at this point.
regionEnv.getConnection().getAdmin().grant(
new UserPermission(perm.getUser(), perm.getPermission()),
request.getMergeExistingPermissions());
if (AUDITLOG.isTraceEnabled()) {
// audit log should store permission changes in addition to auth results
AUDITLOG.trace("Granted permission " + perm.toString());
}
} else {
throw new CoprocessorException(AccessController.class, "This method "
+ "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
}
response = AccessControlProtos.GrantResponse.getDefaultInstance();
} catch (IOException ioe) {
// pass exception back up
CoprocessorRpcUtils.setControllerException(controller, ioe);
}
done.run(response);
}
/**
* @deprecated since 2.2.0 and will be removed in 4.0.0. Use {@link Admin#revoke(UserPermission)}
* instead.
* @see Admin#revoke(UserPermission)
* @see <a href="https://issues.apache.org/jira/browse/HBASE-21739">HBASE-21739</a>
*/
@Deprecated
@Override
public void revoke(RpcController controller, AccessControlProtos.RevokeRequest request,
RpcCallback<AccessControlProtos.RevokeResponse> done) {
final UserPermission perm = AccessControlUtil.toUserPermission(request.getUserPermission());
AccessControlProtos.RevokeResponse response = null;
try {
// only allowed to be called on _acl_ region
if (aclRegion) {
if (!initialized) {
throw new CoprocessorException("AccessController not yet initialized");
}
User caller = RpcServer.getRequestUser().orElse(null);
if (LOG.isDebugEnabled()) {
LOG.debug("Received request from {} to revoke access permission {}",
caller.getShortName(), perm.toString());
}
preGrantOrRevoke(caller, "revoke", perm);
// regionEnv is set at #start. Hopefully not null here.
regionEnv.getConnection().getAdmin()
.revoke(new UserPermission(perm.getUser(), perm.getPermission()));
if (AUDITLOG.isTraceEnabled()) {
// audit log should record all permission changes
AUDITLOG.trace("Revoked permission " + perm.toString());
}
} else {
throw new CoprocessorException(AccessController.class, "This method "
+ "can only execute at " + PermissionStorage.ACL_TABLE_NAME + " table.");
}
response = AccessControlProtos.RevokeResponse.getDefaultInstance();
} catch (IOException ioe) {
// pass exception back up
CoprocessorRpcUtils.setControllerException(controller, ioe);
}
done.run(response);
}
/**
* Insert procedure may be called by master's rpc call. There are some check about the rpc call
* when mutate region. Here unset the current rpc call and set it back in finally block. See
* HBASE-23895 for more details.
*/
private void runWithoutRpcCall(Runnable runnable) {
Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
try {
runnable.run();
} finally {
rpcCall.ifPresent(RpcServer::setCurrentCall);
}
}
@Override
public GrantResponse grant(RpcController controller, GrantRequest request)
throws ServiceException {
try {
master.checkInitialized();
if (master.cpHost != null && hasAccessControlServiceCoprocessor(master.cpHost)) {
final UserPermission perm =
ShadedAccessControlUtil.toUserPermission(request.getUserPermission());
boolean mergeExistingPermissions = request.getMergeExistingPermissions();
master.cpHost.preGrant(perm, mergeExistingPermissions);
try (Table table = master.getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
PermissionStorage.addUserPermission(getConfiguration(), perm, table,
mergeExistingPermissions);
}
master.cpHost.postGrant(perm, mergeExistingPermissions);
User caller = RpcServer.getRequestUser().orElse(null);
if (AUDITLOG.isTraceEnabled()) {
// audit log should store permission changes in addition to auth results
String remoteAddress = RpcServer.getRemoteAddress().map(InetAddress::toString).orElse("");
AUDITLOG.trace("User {} (remote address: {}) granted permission {}", caller,
remoteAddress, perm);
}
return GrantResponse.getDefaultInstance();
} else {
throw new DoNotRetryIOException(
new UnsupportedOperationException(AccessController.class.getName() + " is not loaded"));
}
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
@Override
public RevokeResponse revoke(RpcController controller, RevokeRequest request)
throws ServiceException {
try {
master.checkInitialized();
if (master.cpHost != null && hasAccessControlServiceCoprocessor(master.cpHost)) {
final UserPermission userPermission =
ShadedAccessControlUtil.toUserPermission(request.getUserPermission());
master.cpHost.preRevoke(userPermission);
try (Table table = master.getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
PermissionStorage.removeUserPermission(master.getConfiguration(), userPermission, table);
}
master.cpHost.postRevoke(userPermission);
User caller = RpcServer.getRequestUser().orElse(null);
if (AUDITLOG.isTraceEnabled()) {
// audit log should record all permission changes
String remoteAddress = RpcServer.getRemoteAddress().map(InetAddress::toString).orElse("");
AUDITLOG.trace("User {} (remote address: {}) revoked permission {}", caller,
remoteAddress, userPermission);
}
return RevokeResponse.getDefaultInstance();
} else {
throw new DoNotRetryIOException(
new UnsupportedOperationException(AccessController.class.getName() + " is not loaded"));
}
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
/**
* Instantiates a new ObserverContext instance if the passed reference is <code>null</code> and
* sets the environment in the new or existing instance. This allows deferring the instantiation
* of a ObserverContext until it is actually needed.
* @param <E> The environment type for the context
* @param env The coprocessor environment to set
* @return An instance of <code>ObserverContext</code> with the environment set
*/
@Deprecated
@VisibleForTesting
// TODO: Remove this method, ObserverContext should not depend on RpcServer
public static <E extends CoprocessorEnvironment> ObserverContext<E> createAndPrepare(E env) {
ObserverContextImpl<E> ctx = new ObserverContextImpl<>(RpcServer.getRequestUser().orElse(null));
ctx.prepare(env);
return ctx;
}
private void clientMetricRegisterAndMark() {
// Mark client metric
String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : null;
if (clientIP == null || clientIP.isEmpty()) {
return;
}
String clientRequestMeter = clientRequestMeterName(clientIP);
clientMetricsLossyCounting.add(clientRequestMeter);
registerAndMarkMeter(clientRequestMeter);
}
/**
* Returns the active user to which authorization checks should be applied.
* If we are in the context of an RPC call, the remote user is used,
* otherwise the currently logged in user is used.
*/
private String getActiveUser() {
Optional<User> user = RpcServer.getRequestUser();
if (!user.isPresent()) {
// for non-rpc handling, fallback to system user
try {
user = Optional.of(userProvider.getCurrent());
} catch (IOException ignore) {
}
}
return user.map(User::getShortName).orElse(null);
}
private User getActiveUser() throws IOException {
// for non-rpc handling, fallback to system user
User user = RpcServer.getRequestUser().orElse(userProvider.getCurrent());
// this is for testing
if (userProvider.isHadoopSecurityEnabled() &&
"simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) {
return User.createUserForTesting(conf, user.getShortName(), new String[] {});
}
return user;
}
@Override
public void logBatchWarning(String firstRegionName, int sum, int rowSizeWarnThreshold) {
if (LOG.isWarnEnabled()) {
LOG.warn("Large batch operation detected (greater than " + rowSizeWarnThreshold
+ ") (HBASE-18023)." + " Requested Number of Rows: " + sum + " Client: "
+ RpcServer.getRequestUserName().orElse(null) + "/"
+ RpcServer.getRemoteAddress().orElse(null)
+ " first region in multi=" + firstRegionName);
}
}
@Override
@QosPriority(priority=HConstants.ADMIN_QOS)
public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
ClearCompactionQueuesRequest request) throws ServiceException {
LOG.debug("Client=" + RpcServer.getRequestUserName().orElse(null) + "/"
+ RpcServer.getRemoteAddress().orElse(null) + " clear compactions queue");
ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
requestCount.increment();
if (clearCompactionQueues.compareAndSet(false,true)) {
try {
checkOpen();
regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
for (String queueName : request.getQueueNameList()) {
LOG.debug("clear " + queueName + " compaction queue");
switch (queueName) {
case "long":
regionServer.compactSplitThread.clearLongCompactionsQueue();
break;
case "short":
regionServer.compactSplitThread.clearShortCompactionsQueue();
break;
default:
LOG.warn("Unknown queue name " + queueName);
throw new IOException("Unknown queue name " + queueName);
}
}
regionServer.getRegionServerCoprocessorHost().postClearCompactionQueues();
} catch (IOException ie) {
throw new ServiceException(ie);
} finally {
clearCompactionQueues.set(false);
}
} else {
LOG.warn("Clear compactions queue is executing by other admin.");
}
return respBuilder.build();
}
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster();
CONF = UTIL.getConfiguration();
CONF.setLong(RpcServer.MAX_REQUEST_SIZE, 102400);
CONN = UTIL.getConnection();
}