下面列出了怎么用org.apache.hadoop.hbase.coprocessor.ObserverContext的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void preGetTableDescriptors(ObserverContext<MasterCoprocessorEnvironment> ctx,
List<TableName> tableNamesList, List<TableDescriptor> descriptors,
String regex) throws IOException {
// We are delegating the authorization check to postGetTableDescriptors as we don't have
// any concrete set of table names when a regex is present or the full list is requested.
if (regex == null && tableNamesList != null && !tableNamesList.isEmpty()) {
// Otherwise, if the requestor has ADMIN or CREATE privs for all listed tables, the
// request can be granted.
try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
for (TableName tableName : tableNamesList) {
// Skip checks for a table that does not exist
if (!admin.tableExists(tableName)) {
continue;
}
requirePermission(ctx, "getTableDescriptors", tableName, null, null, Action.ADMIN,
Action.CREATE);
}
}
}
}
@Override
public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName, TableDescriptor htd) throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postModifyTable()");
}
try {
activatePluginClassLoader();
implMasterObserver.postModifyTable(ctx, tableName, htd);
} finally {
deactivatePluginClassLoader();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postModifyTable()");
}
}
@Override
public void postDeleteNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, String ns) throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.preDeleteNamespace()");
}
try {
activatePluginClassLoader();
implMasterObserver.postDeleteNamespace(ctx, ns);
} finally {
deactivatePluginClassLoader();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.preDeleteNamespace()");
}
}
@Override
public void postModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, NamespaceDescriptor ns) throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.preModifyNamespace()");
}
try {
activatePluginClassLoader();
implMasterObserver.preModifyNamespace(ctx, ns);
} finally {
deactivatePluginClassLoader();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.preModifyNamespace()");
}
}
@Override
public void postCloneSnapshot(ObserverContext<MasterCoprocessorEnvironment> observerContext, SnapshotDescription snapshot, TableDescriptor tableDescriptor) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> HBaseAtlasCoprocessor.postCloneSnapshot()");
}
try {
activatePluginClassLoader();
implMasterObserver.postCloneSnapshot(observerContext,snapshot,tableDescriptor);
} finally {
deactivatePluginClassLoader();
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postCloneSnapshot()");
}
}
@Override
public void postExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
throws IOException {
synchronized (HaltCP.class) {
if (!HALT) {
return;
}
UTIL1.getMiniHBaseCluster().getMaster().getProcedures().stream()
.filter(p -> p instanceof TransitPeerSyncReplicationStateProcedure)
.filter(p -> !p.isFinished()).map(p -> (TransitPeerSyncReplicationStateProcedure) p)
.findFirst().ifPresent(proc -> {
// this is the next state of REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN_VALUE
if (proc.getCurrentStateId() == REOPEN_ALL_REGIONS_IN_PEER_VALUE) {
// tell the main thread to start a new region server
ARRIVE.countDown();
try {
// wait for the region server to online
RESUME.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
HALT = false;
}
});
}
}
private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e,
boolean isGet) {
CountDownLatch latch = getCdl().get();
try {
System.out.println(latch.getCount() + " is the count " + isGet);
if (latch.getCount() > 0) {
if (isGet) {
countOfGets.incrementAndGet();
} else {
countOfNext.incrementAndGet();
}
LOG.info("Waiting for the counterCountDownLatch");
latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
if (latch.getCount() > 0) {
throw new RuntimeException("Can't wait more");
}
}
} catch (InterruptedException e1) {
LOG.error(e1.toString(), e1);
}
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
throws IOException {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
// Record tx state before the compaction
if (compactionState != null) {
compactionState.record(request, snapshot);
}
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = s) { }
// Also make sure to use the same snapshot for the compaction
return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
}
@Override
public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment> c, TableName tableName,
TableDescriptor oldDesc, TableDescriptor currentDesc) throws IOException {
final Configuration conf = c.getEnvironment().getConfiguration();
// default the table owner to current user, if not specified.
final String owner = (currentDesc.getOwnerString() != null) ? currentDesc.getOwnerString() :
getActiveUser(c).getShortName();
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
UserPermission userperm = new UserPermission(owner,
Permission.newBuilder(currentDesc.getTableName()).withActions(Action.values()).build());
try (Table table =
c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
PermissionStorage.addUserPermission(conf, userperm, table);
}
return null;
}
});
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
// Record whether the region is empty after a flush
HRegion region = e.getEnvironment().getRegion();
// After a flush, if the memstore size is zero and there are no store files for any stores in the region
// then the region must be empty
long numStoreFiles = numStoreFilesForRegion(e);
long memstoreSize = region.getMemstoreSize().get();
LOG.debug(String.format("Region %s: memstore size = %s, num store files = %s",
region.getRegionInfo().getRegionNameAsString(), memstoreSize, numStoreFiles));
if (memstoreSize == 0 && numStoreFiles == 0) {
if (compactionState != null) {
compactionState.persistRegionEmpty(System.currentTimeMillis());
}
}
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
throws IOException {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
// Record tx state before the compaction
if (compactionState != null) {
compactionState.record(request, snapshot);
}
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = s) { }
// Also make sure to use the same snapshot for the compaction
return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s,
CompactionRequest request)
throws IOException {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
// Record tx state before the compaction
if (compactionState != null) {
compactionState.record(request, snapshot);
}
// silently close the passed scanner as we're returning a brand-new one
try (InternalScanner temp = s) { }
// Also make sure to use the same snapshot for the compaction
return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs);
}
@Override
public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
if (checkTagPresence) {
if (results.size() > 0) {
// Check tag presence in the 1st cell in 1st Result
Result result = results.get(0);
CellScanner cellScanner = result.cellScanner();
if (cellScanner.advance()) {
Cell cell = cellScanner.current();
tags = PrivateCellUtil.getTags(cell);
}
}
}
return hasMore;
}
@Override
public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> c,
final TableName tableName) throws IOException {
final Configuration conf = c.getEnvironment().getConfiguration();
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try (Table table =
c.getEnvironment().getConnection().getTable(PermissionStorage.ACL_TABLE_NAME)) {
PermissionStorage.removeTablePermissions(conf, tableName, table);
}
return null;
}
});
zkPermissionWatcher.deleteTableACLNode(tableName);
}
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e,
Get get, List<Cell> results) throws IOException {
byte[] errorType = get.getAttribute(SHOULD_ERROR_ATTRIBUTE);
if (errorType != null) {
ErrorType type = ErrorType.valueOf(Bytes.toString(errorType));
switch (type) {
case CALL_QUEUE_TOO_BIG:
throw new CallQueueTooBigException("Failing for test");
case MULTI_ACTION_RESULT_TOO_LARGE:
throw new MultiActionResultTooLarge("Failing for test");
case FAILED_SANITY_CHECK:
throw new FailedSanityCheckException("Failing for test");
case NOT_SERVING_REGION:
throw new NotServingRegionException("Failing for test");
case REGION_MOVED:
throw new RegionMovedException(e.getEnvironment().getServerName(), 1);
case SCANNER_RESET:
throw new ScannerResetException("Failing for test");
case UNKNOWN_SCANNER:
throw new UnknownScannerException("Failing for test");
case REGION_TOO_BUSY:
throw new RegionTooBusyException("Failing for test");
case OUT_OF_ORDER_SCANNER_NEXT:
throw new OutOfOrderScannerNextException("Failing for test");
default:
throw new DoNotRetryIOException("Failing for test");
}
}
}
@Override
public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
BatchMutateContext context = getBatchMutateContext(c);
if (context == null || context.indexUpdates == null) {
return;
}
// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Starting to write index updates")) {
Span current = scope.getSpan();
if (current == null) {
current = NullSpan.INSTANCE;
}
if (success) { // if miniBatchOp was successfully written, write index updates
if (!context.indexUpdates.isEmpty()) {
this.writer.write(context.indexUpdates, false, context.clientVersion);
}
current.addTimelineAnnotation("Wrote index updates");
}
} catch (Throwable t) {
String msg = "Failed to write index updates:" + context.indexUpdates;
LOGGER.error(msg, t);
ServerUtil.throwIOException(msg, t);
} finally {
removeBatchMutateContext(c);
}
}
@Override
public void postCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> observerContext, NamespaceDescriptor namespaceDescriptor) throws IOException {
LOG.info("==> HBaseAtlasCoprocessor.postCreateNamespace()");
hbaseAtlasHook.sendHBaseNameSpaceOperation(namespaceDescriptor, null, HBaseAtlasHook.OPERATION.CREATE_NAMESPACE, observerContext);
if (LOG.isDebugEnabled()) {
LOG.debug("<== HBaseAtlasCoprocessor.postCreateNamespace()");
}
}
@Override
public void preGetRSGroupInfoOfTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
TableName tableName) throws IOException {
accessChecker.requirePermission(getActiveUser(ctx), "getRSGroupInfoOfTable",
null, Permission.Action.ADMIN);
//todo: should add check for table existence
}
@Override
public void preGetNamespaceDescriptor(ObserverContext<MasterCoprocessorEnvironment> ctx,
String namespace) throws IOException {
Threads.sleep(DEFAULT_RPC_TIMEOUT / 2);
if (sleepTime.addAndGet(DEFAULT_RPC_TIMEOUT / 2) < DEFAULT_OPERATION_TIMEOUT) {
throw new IOException("call fail");
}
}
@Override
public void postCompletedCreateTableAction(
final ObserverContext<MasterCoprocessorEnvironment> ctx,
TableDescriptor desc, RegionInfo[] regions) throws IOException {
// the AccessController test, some times calls only and directly the
// postCompletedCreateTableAction()
if (tableCreationLatch != null) {
tableCreationLatch.countDown();
}
}
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Put put, final WALEdit edit,
final Durability durability) throws IOException {
if (put.getAttribute(TEST_ATTRIBUTE) == null) {
throw new DoNotRetryIOException("Put should preserve attributes");
}
if (put.getDurability() != Durability.USE_DEFAULT) {
throw new DoNotRetryIOException("Durability is not propagated correctly");
}
}
@Override
public void postCompletedCreateTableAction(
final ObserverContext<MasterCoprocessorEnvironment> ctx,
final TableDescriptor desc,
final RegionInfo[] regions) throws IOException {
// the AccessController test, some times calls only and directly the
// postCompletedCreateTableAction()
if (tableCreationLatch != null) {
tableCreationLatch.countDown();
}
}
@Override
public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
throws IOException {
// If authorization is not enabled, we don't care about reserved tags
if (!authorizationEnabled) {
return null;
}
for (CellScanner cellScanner = increment.cellScanner(); cellScanner.advance();) {
if (!checkForReservedVisibilityTagPresence(cellScanner.current())) {
throw new FailedSanityCheckException("Increment contains cell with reserved type tag");
}
}
return null;
}
private void doPostWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context)
throws IOException {
//short circuit, if we don't need to do any work
if (context == null || context.indexUpdates.isEmpty()) {
return;
}
// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Completing index writes")) {
Span current = scope.getSpan();
if (current == null) {
current = NullSpan.INSTANCE;
}
long start = EnvironmentEdgeManager.currentTimeMillis();
current.addTimelineAnnotation("Actually doing index update for first time");
writer.writeAndHandleFailure(context.indexUpdates, false, context.clientVersion);
long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
if (duration >= slowIndexWriteThreshold) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(getCallTooSlowMessage("indexWrite",
duration, slowIndexWriteThreshold));
}
metricSource.incrementNumSlowIndexWriteCalls();
}
metricSource.updateIndexWriteTime(duration);
}
}
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
if (TRACKER != null) {
assertSame(tracker, TRACKER);
}
return scanner;
}
protected void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
try {
if (sleepTime.get() > 0) {
LOG.info("Sleeping for " + sleepTime.get() + " ms");
Thread.sleep(sleepTime.get());
}
} catch (InterruptedException e1) {
LOG.error(e1.toString(), e1);
}
}
}
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result)
throws IOException {
Scan scan =
new Scan().withStartRow(get.getRow()).withStopRow(get.getRow(), true).readAllVersions();
NavigableMap<byte[], NavigableMap<byte[], MutableLong>> sums =
new TreeMap<>(Bytes.BYTES_COMPARATOR);
get.getFamilyMap().forEach((cf, cqs) -> {
NavigableMap<byte[], MutableLong> ss = new TreeMap<>(Bytes.BYTES_COMPARATOR);
sums.put(cf, ss);
cqs.forEach(cq -> {
ss.put(cq, new MutableLong(0));
scan.addColumn(cf, cq);
});
});
List<Cell> cells = new ArrayList<>();
try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
boolean moreRows;
do {
moreRows = scanner.next(cells);
for (Cell cell : cells) {
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
long value = Bytes.toLong(cell.getValueArray(), cell.getValueOffset());
sums.get(family).get(qualifier).add(value);
}
cells.clear();
} while (moreRows);
}
sums.forEach((cf, m) -> m.forEach((cq, s) -> result
.add(createCell(get.getRow(), cf, cq, HConstants.LATEST_TIMESTAMP, s.longValue()))));
c.bypass();
}
@Override
public void preGetSchema(ObserverContext<PhoenixMetaDataControllerEnvironment> ctx, String schemaName)
throws IOException {
if (!accessCheckEnabled) { return; }
for (MasterObserver observer : getAccessControllers()) {
observer.preListNamespaceDescriptors(getMasterObsevrverContext(),
Arrays.asList(NamespaceDescriptor.create(schemaName).build()));
}
}
@Override
public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
slowdownCode(e, false);
if (getLatch != null && getLatch.getCount() > 0) {
try {
getLatch.await();
} catch (InterruptedException e1) {
}
}
return hasMore;
}
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit,
Durability durability) throws IOException {
// Translate deletes into our own delete tombstones
// Since HBase deletes cannot be undone, we need to translate deletes into special puts, which allows
// us to rollback the changes (by a real delete) if the transaction fails
// Deletes that are part of a transaction rollback do not need special handling.
// They will never be rolled back, so are performed as normal HBase deletes.
if (isRollbackOperation(delete)) {
return;
}
Transaction tx = getFromOperation(delete);
ensureValidTxLifetime(e.getEnvironment(), delete, tx);
// Other deletes are client-initiated and need to be translated into our own tombstones
// TODO: this should delegate to the DeleteStrategy implementation.
Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
for (byte[] family : delete.getFamilyCellMap().keySet()) {
List<Cell> familyCells = delete.getFamilyCellMap().get(family);
if (isFamilyDelete(familyCells)) {
deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
HConstants.EMPTY_BYTE_ARRAY);
} else {
for (Cell cell : familyCells) {
deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
HConstants.EMPTY_BYTE_ARRAY);
}
}
}
for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
}
e.getEnvironment().getRegion().put(deleteMarkers);
// skip normal delete handling
e.bypass();
}