下面列出了怎么用org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c)
throws IOException {
RegionCoprocessorEnvironment env = c.getEnvironment();
final Region region = env.getRegion();
if (region == null) {
LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
} else {
RegionInfo regionInfo = region.getRegionInfo();
if (regionInfo.getTable().isSystemTable()) {
checkSystemOrSuperUser(getActiveUser(c));
} else {
requirePermission(c, "preOpen", Action.ADMIN);
}
}
}
@Override
public void start(final CoprocessorEnvironment e) throws IOException{
try {
RegionCoprocessorEnvironment rce = ((RegionCoprocessorEnvironment) e);
String tableName = rce.getRegion().getTableDescriptor().getTableName().getQualifierAsString();
TableType table = EnvUtils.getTableType(HConfiguration.getConfiguration(), rce);
switch (table) {
case DERBY_SYS_TABLE:
conglomId = -1; //bypass index management on derby system tables
break;
case USER_TABLE:
conglomId = Long.parseLong(tableName);
break;
default:
return; //disregard table environments which are not user or system tables
}
} catch (Throwable t) {
throw CoprocessorUtils.getIOException(t);
}
}
@Override
public InternalScanner preCompact(
org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c,
Store store, InternalScanner scanner, ScanType scanType,
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
// Get the latest tx snapshot state for the compaction
TransactionVisibilityState snapshot = cache.getLatestState();
// Record tx state before the compaction
if (compactionState != null) {
compactionState.record(request, snapshot);
}
// Also make sure to use the same snapshot for the compaction
InternalScanner s =
createStoreScanner(c.getEnvironment(), "compaction", snapshot, scanner, scanType);
if (s != null) {
return s;
}
return scanner;
}
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
// Fail for the primary replica, but not for meta
if (throwException) {
if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment()
.getRegion().getRegionInfo());
throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
+ " not running");
}
} else {
LOG.info("Get, We're replica region " + replicaId);
}
}
@Override
public InternalScanner preFlush(
final ObserverContext<RegionCoprocessorEnvironment> e,
final Store store,
final InternalScanner scanner) throws IOException {
if (opStore == null) {
return super.preFlush(e, store, scanner);
}
return super.preFlush(
e,
store,
wrapScannerWithOps(
e.getEnvironment().getRegionInfo().getTable(),
scanner,
null,
ServerOpScope.MINOR_COMPACTION,
INTERNAL_SCANNER_FACTORY));
}
@Override
public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
if (this.disabled) {
return;
}
BatchMutateContext context = getBatchMutateContext(c);
if (context == null) {
return;
}
try {
for (RowLock rowLock : context.rowLocks) {
rowLock.release();
}
this.builder.batchCompleted(miniBatchOp);
if (success) { // The pre-index and data table updates are successful, and now, do post index updates
doPost(c, context);
}
} finally {
removeBatchMutateContext(c);
}
}
private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
throws IOException, SQLException {
HRegion region = env.getRegion();
Cache<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = metaDataCache.getIfPresent(cacheKey);
// We always cache the latest version - fault in if not in cache
if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp)) != null) {
return table;
}
// if not found then check if newer table already exists and add delete marker for timestamp
// found
if (table == null
&& (table = buildDeletedTable(key, cacheKey, region, clientTimeStamp)) != null) {
return table;
}
return null;
}
@Test
public void preStoreScannerOpen() throws Exception {
MemstoreAwareObserver mao = new MemstoreAwareObserver();
// create scan, call preStoreScannerOpen
// env and scan share same start and end keys (partition hit)
byte[] startKey = createByteArray(13);
byte[] endKey = createByteArray(24);
ObserverContext<RegionCoprocessorEnvironment> fakeCtx = mockRegionEnv(startKey, endKey);
RegionScanner preScanner = mock(RegionScanner.class);
RegionScanner postScanner = mao.postScannerOpen(fakeCtx, mockScan(startKey, endKey), preScanner);
assertNotNull(postScanner);
assertNotEquals(preScanner, postScanner);
postScanner.close();
}
/**
* Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the
* prune related properties after clearing the state by calling {@link #resetPruneState}.
*
* @param env {@link RegionCoprocessorEnvironment} of this region
*/
protected void initializePruneState(RegionCoprocessorEnvironment env) {
Configuration conf = getConfiguration(env);
if (conf != null) {
pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
if (Boolean.TRUE.equals(pruneEnable)) {
TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
compactionState = new CompactionState(env, pruneTable, pruneFlushInterval);
if (LOG.isDebugEnabled()) {
TableName name = env.getRegion().getRegionInfo().getTable();
LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s:%s. Compaction state will " +
"be recorded in table %s:%s", name.getNamespaceAsString(), name.getNameAsString(),
pruneTable.getNamespaceAsString(), pruneTable.getNameAsString()));
}
}
}
}
@Override
public RegionScanner preScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan,
final RegionScanner s) throws IOException {
if (opStore != null) {
final TableName tableName = e.getEnvironment().getRegionInfo().getTable();
if (!tableName.isSystemTable()) {
final String namespace = tableName.getNamespaceAsString();
final String qualifier = tableName.getQualifierAsString();
final Collection<HBaseServerOp> serverOps =
opStore.getOperations(namespace, qualifier, ServerOpScope.SCAN);
for (final HBaseServerOp op : serverOps) {
op.preScannerOpen(scan);
}
}
}
return super.preScannerOpen(e, scan, s);
}
@Override
public void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c) {
Multimap<HTableInterfaceReference, Mutation> updates = failedIndexEdits.getEdits(c.getEnvironment().getRegion());
if (this.disabled) {
super.postOpen(c);
return;
}
LOG.info("Found some outstanding index updates that didn't succeed during"
+ " WAL replay - attempting to replay now.");
//if we have no pending edits to complete, then we are done
if (updates == null || updates.size() == 0) {
return;
}
// do the usual writer stuff, killing the server again, if we can't manage to make the index
// writes succeed again
try {
writer.writeAndKillYourselfOnFailure(updates);
} catch (IOException e) {
LOG.error("Exception thrown instead of killing server during index writing", e);
}
}
@Override
public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan) {
final HashJoinInfo joinInfo = HashJoinInfo.deserializeHashJoinFromScan(scan);
if (joinInfo != null) {
TenantCache cache = GlobalCache.getTenantCache(c.getEnvironment(), null);
int count = joinInfo.getJoinIds().length;
for (int i = 0; i < count; i++) {
ImmutableBytesPtr joinId = joinInfo.getJoinIds()[i];
if (!ByteUtil.contains(lastRemovedJoinIds,joinId)) {
lastRemovedJoinIds.add(joinId);
cache.removeServerCache(joinId);
}
}
}
}
/**
* Refresh the properties related to transaction pruning. This method needs to be invoked if there is change in the
* prune related properties after clearing the state by calling {@link #resetPruneState}.
*
* @param env {@link RegionCoprocessorEnvironment} of this region
*/
protected void initializePruneState(RegionCoprocessorEnvironment env) {
Configuration conf = getConfiguration(env);
if (conf != null) {
pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
if (Boolean.TRUE.equals(pruneEnable)) {
TableName pruneTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
long pruneFlushInterval = TimeUnit.SECONDS.toMillis(conf.getLong(
TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
compactionState = new CompactionState(env, pruneTable, pruneFlushInterval);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Automatic invalid list pruning is enabled for table %s. Compaction state " +
"will be recorded in table %s",
env.getRegionInfo().getTable().getNameWithNamespaceInclAsString(),
pruneTable.getNameWithNamespaceInclAsString()));
}
}
}
}
@Override
public void start(CoprocessorEnvironment ce) throws IOException {
LOG.info("Starting enrichment coprocessor");
if (ce instanceof RegionCoprocessorEnvironment) {
this.coprocessorEnv = (RegionCoprocessorEnvironment) ce;
} else {
throw new CoprocessorException("Enrichment coprocessor must be loaded on a table region.");
}
LOG.info("Checking if internal cache initialized");
if (null == this.cache) {
LOG.info("Cache null, initializing");
LOG.info("Getting global config from Zookeeper");
String zkUrl = getZookeeperUrl(this.coprocessorEnv.getConfiguration());
if (null == globalConfigService) {
globalConfigService = getGlobalConfigService(zkUrl);
}
globalConfig = globalConfigService.get();
Configuration config = this.coprocessorEnv.getConfiguration();
CacheWriter<String, String> cacheWriter = null;
try {
String hbaseTableProviderName = (String) globalConfig
.get(EnrichmentConfigurations.TABLE_PROVIDER);
String tableName = (String) globalConfig.get(EnrichmentConfigurations.TABLE_NAME);
String columnFamily = (String) globalConfig.get(EnrichmentConfigurations.COLUMN_FAMILY);
cacheWriter = new HBaseCacheWriter(config, TableProvider
.create(hbaseTableProviderName, HTableProvider::new), tableName, columnFamily,
COLUMN_QUALIFIER);
} catch (ClassNotFoundException | InstantiationException | InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
throw new IOException("Unable to instantiate cache writer", e);
}
this.cache = Caffeine.newBuilder().writer(cacheWriter).build();
LOG.info("Finished initializing cache");
}
LOG.info("Finished starting enrichment coprocessor");
}
public static boolean isLocalIndexStoreFilesConsistent(RegionCoprocessorEnvironment environment, Store store) {
byte[] startKey = environment.getRegion().getRegionInfo().getStartKey();
byte[] endKey = environment.getRegion().getRegionInfo().getEndKey();
byte[] indexKeyEmbedded = startKey.length == 0 ? new byte[endKey.length] : startKey;
for (StoreFile file : store.getStorefiles()) {
if (file.getFirstKey().isPresent() && file.getFirstKey().get() != null) {
byte[] fileFirstRowKey = CellUtil.cloneRow(file.getFirstKey().get());
if ((fileFirstRowKey != null && Bytes.compareTo(fileFirstRowKey, 0,
indexKeyEmbedded.length, indexKeyEmbedded, 0, indexKeyEmbedded.length) != 0)) {
return false; }
}
}
return true;
}
/**
* @param tableName
* TODO
* @param clientTimeStamp
* TODO
* @return the {@link StatisticsWriter} for the given primary table.
* @throws IOException
* if the table cannot be created due to an underlying HTable creation error
*/
public static StatisticsWriter newWriter(RegionCoprocessorEnvironment env, String tableName, long clientTimeStamp)
throws IOException {
Configuration configuration = env.getConfiguration();
long newClientTimeStamp = determineClientTimeStamp(configuration, clientTimeStamp);
Table statsWriterTable = ConnectionFactory.getConnection(ConnectionType.DEFAULT_SERVER_CONNECTION, env).getTable(
SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES, env.getConfiguration()));
Table statsReaderTable = ServerUtil.getHTableForCoprocessorScan(env, statsWriterTable);
StatisticsWriter statsTable = new StatisticsWriter(statsReaderTable, statsWriterTable, tableName,
newClientTimeStamp);
return statsTable;
}
@Override
public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) throws IOException {
try {
if (abortRequested) {
// If we are aborting don't wait for scanners to finish
return;
}
while (true) {
MemstoreAware latest = memstoreAware.get();
RegionServerServices regionServerServices = (RegionServerServices)c.getEnvironment().getOnlineRegions();
boolean shuttingDown = !regionServerServices.isClusterUp() || regionServerServices.isStopping() || regionServerServices.isAborted();
if (latest.currentScannerCount>0 && !shuttingDown) {
SpliceLogUtils.warn(LOG, "preClose Delayed waiting for scanners to complete scannersRemaining=%d",latest.currentScannerCount);
try {
Thread.sleep(1000); // Have Split sleep for a second
} catch (InterruptedException e1) {
throw new IOException(e1);
}
} else {
if (memstoreAware.compareAndSet(latest, MemstoreAware.changeSplitMerge(latest, true)))
break;
}
}
} catch (Throwable t) {
throw CoprocessorUtils.getIOException(t);
}
}
private RegionScanner doPostScannerObserver(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
byte[] coprocessorEnableBytes = scan.getAttribute(COPROCESSOR_ENABLE);
if (coprocessorEnableBytes == null || coprocessorEnableBytes.length == 0 || coprocessorEnableBytes[0] == 0) {
return innerScanner;
}
byte[] typeBytes = scan.getAttribute(TYPE);
CoprocessorRowType type = CoprocessorRowType.deserialize(typeBytes);
byte[] projectorBytes = scan.getAttribute(PROJECTOR);
CoprocessorProjector projector = CoprocessorProjector.deserialize(projectorBytes);
byte[] aggregatorBytes = scan.getAttribute(AGGREGATORS);
ObserverAggregators aggregators = ObserverAggregators.deserialize(aggregatorBytes);
byte[] filterBytes = scan.getAttribute(FILTER);
CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
// start/end region operation & sync on scanner is suggested by the
// javadoc of RegionScanner.nextRaw()
// FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
HRegion region = ctxt.getEnvironment().getRegion();
region.startRegionOperation();
try {
synchronized (innerScanner) {
return new AggregationScanner(type, filter, projector, aggregators, innerScanner);
}
} finally {
region.closeRegionOperation();
}
}
@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
WALEdit edit, boolean writeToWAL) throws IOException {
if (this.disabled) {
super.postDelete(e, delete, edit, writeToWAL);
return;
}
doPost(edit,delete, writeToWAL);
}
@Override
public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan) throws IOException {
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
// Slow down with the primary meta region scan
if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
if (slowDownPrimaryMetaScan) {
LOG.info("Scan with primary meta region, slow down a bit");
try {
Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
} catch (InterruptedException ie) {
// Ingore
}
}
// Fail for the primary replica
if (throwException) {
LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment()
.getRegion().getRegionInfo());
throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
+ " not running");
} else {
LOG.info("Scan, We're replica region " + replicaId);
}
} else {
LOG.info("Scan, We're replica region " + replicaId);
}
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) throws IOException {
try {
if(tableEnvMatch){
Tracer.compact();
}
} catch (Throwable t) {
throw CoprocessorUtils.getIOException(t);
}
}
@Override
public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit,
Durability durability) throws IOException {
// Translate deletes into our own delete tombstones
// Since HBase deletes cannot be undone, we need to translate deletes into special puts, which allows
// us to rollback the changes (by a real delete) if the transaction fails
// Deletes that are part of a transaction rollback do not need special handling.
// They will never be rolled back, so are performed as normal HBase deletes.
if (isRollbackOperation(delete)) {
return;
}
Transaction tx = getFromOperation(delete);
ensureValidTxLifetime(e.getEnvironment(), delete, tx);
// Other deletes are client-initiated and need to be translated into our own tombstones
// TODO: this should delegate to the DeleteStrategy implementation.
Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp());
for (byte[] family : delete.getFamilyCellMap().keySet()) {
List<Cell> familyCells = delete.getFamilyCellMap().get(family);
if (isFamilyDelete(familyCells)) {
deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(),
HConstants.EMPTY_BYTE_ARRAY);
} else {
for (Cell cell : familyCells) {
deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(),
HConstants.EMPTY_BYTE_ARRAY);
}
}
}
for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) {
deleteMarkers.setAttribute(entry.getKey(), entry.getValue());
}
e.getEnvironment().getRegion().put(deleteMarkers);
// skip normal delete handling
e.bypass();
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env) throws IOException {
boolean oldCoproc = region.getTableDescriptor().hasCoprocessor(Indexer.class.getCanonicalName());
byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
IndexTool.IndexVerifyType verifyType = (valueBytes != null) ?
IndexTool.IndexVerifyType.fromValue(valueBytes):IndexTool.IndexVerifyType.NONE;
if(oldCoproc && verifyType == IndexTool.IndexVerifyType.ONLY) {
return new IndexerRegionScanner(innerScanner, region, scan, env);
}
if (!scan.isRaw()) {
Scan rawScan = new Scan(scan);
rawScan.setRaw(true);
rawScan.setMaxVersions();
rawScan.getFamilyMap().clear();
// For rebuilds we use count (*) as query for regular tables which ends up setting the FKOF on scan
// This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col
// For rebuilds we need all columns and all versions
if (scan.getFilter() instanceof FirstKeyOnlyFilter) {
rawScan.setFilter(null);
} else if (scan.getFilter() != null) {
// Override the filter so that we get all versions
rawScan.setFilter(new AllVersionsIndexRebuildFilter(scan.getFilter()));
}
rawScan.setCacheBlocks(false);
for (byte[] family : scan.getFamilyMap().keySet()) {
rawScan.addFamily(family);
}
innerScanner.close();
RegionScanner scanner = region.getScanner(rawScan);
return new IndexRebuildRegionScanner(scanner, region, scan, env, this);
}
return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
Configuration conf = env.getConfiguration();
this.accessCheckEnabled = conf.getBoolean(QueryServices.PHOENIX_ACLS_ENABLED,
QueryServicesOptions.DEFAULT_PHOENIX_ACLS_ENABLED);
if (!this.accessCheckEnabled) {
LOGGER.warn(
"PhoenixAccessController has been loaded with authorization checks disabled.");
}
this.execPermissionsCheckEnabled = conf.getBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY,
AccessControlConstants.DEFAULT_EXEC_PERMISSION_CHECKS);
if (env instanceof PhoenixMetaDataControllerEnvironment) {
this.env = (PhoenixMetaDataControllerEnvironment)env;
} else {
throw new IllegalArgumentException(
"Not a valid environment, should be loaded by PhoenixMetaDataControllerEnvironment");
}
ZKWatcher zk = null;
RegionCoprocessorEnvironment regionEnv = this.env.getRegionCoprocessorEnvironment();
if (regionEnv instanceof HasRegionServerServices) {
zk = ((HasRegionServerServices) regionEnv).getRegionServerServices().getZooKeeper();
}
accessChecker = new AccessChecker(env.getConfiguration(), zk);
// set the user-provider.
this.userProvider = UserProvider.instantiate(env.getConfiguration());
// init superusers and add the server principal (if using security)
// or process owner as default super user.
Superusers.initialize(env.getConfiguration());
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
this.region = env.getRegion();
this.delegate = new KillServerOnFailurePolicy();
this.delegate.setup(parent, env);
}
public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) {
this.env = environment;
this.table = table;
this.update = update;
this.memstore = new IndexMemStore();
this.scannerBuilder = new ScannerBuilder(memstore, update);
this.columnSet = new CoveredColumns();
}
private void ensureNoUpdatesWhenCoveredByDelete(RegionCoprocessorEnvironment env, IndexCodec codec, List<Cell> currentState,
Delete d) throws IOException {
LocalHBaseState table = new SimpleTableState(Result.create(currentState));
LocalTableState state = new LocalTableState(env, table, d);
state.setCurrentTimestamp(d.getTimeStamp());
// now we shouldn't see anything when getting the index update
state.addPendingUpdates(KeyValueUtil.ensureKeyValues(d.getFamilyCellMap().get(FAMILY)));
Iterable<IndexUpdate> updates = codec.getIndexUpserts(state);
for (IndexUpdate update : updates) {
assertFalse("Had some index updates, though it should have been covered by the delete",
update.isValid());
}
}
private void doPostWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context)
throws IOException {
//short circuit, if we don't need to do any work
if (context == null || context.indexUpdates.isEmpty()) {
return;
}
// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Completing index writes")) {
Span current = scope.getSpan();
if (current == null) {
current = NullSpan.INSTANCE;
}
long start = EnvironmentEdgeManager.currentTimeMillis();
current.addTimelineAnnotation("Actually doing index update for first time");
writer.writeAndHandleFailure(context.indexUpdates, false, context.clientVersion);
long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
if (duration >= slowIndexWriteThreshold) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(getCallTooSlowMessage("indexWrite",
duration, slowIndexWriteThreshold));
}
metricSource.incrementNumSlowIndexWriteCalls();
}
metricSource.updateIndexWriteTime(duration);
}
}