下面列出了org.apache.hadoop.hbase.coprocessor.ObserverContext#getEnvironment ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r)
throws IOException {
HRegion region = e.getEnvironment().getRegion();
TableName table = region.getRegionInfo().getTable();
StatisticsCollector stats = null;
try {
boolean useCurrentTime =
e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME);
// Provides a means of clients controlling their timestamps to not use current time
// when background tasks are updating stats. Instead we track the max timestamp of
// the cells and use that.
long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP;
stats = new StatisticsCollector(e.getEnvironment(), table.getNameAsString(), clientTimeStamp);
stats.splitStats(region, l, r);
} catch (IOException ioe) {
if(logger.isWarnEnabled()) {
logger.warn("Error while collecting stats during split for " + table,ioe);
}
} finally {
if (stats != null) stats.close();
}
}
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
justification="NPE should never happen; if it does it is a bigger issue")
public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
RegionCoprocessorEnvironment env = ctx.getEnvironment();
Configuration c = env.getConfiguration();
if (pairs == null || pairs.isEmpty() ||
!c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
LOG.debug("Skipping recording bulk load entries in preCommitStoreFile for bulkloaded "
+ "data replication.");
return;
}
// This is completely cheating AND getting a HRegionServer from a RegionServerEnvironment is
// just going to break. This is all private. Not allowed. Regions shouldn't assume they are
// hosted in a RegionServer. TODO: fix.
RegionServerServices rss = ((HasRegionServerServices)env).getRegionServerServices();
Replication rep = (Replication)((HRegionServer)rss).getReplicationSourceService();
rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs);
}
@Override
public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c)
throws IOException {
RegionCoprocessorEnvironment env = c.getEnvironment();
final Region region = env.getRegion();
if (region == null) {
LOG.error("NULL region from RegionCoprocessorEnvironment in preOpen()");
} else {
RegionInfo regionInfo = region.getRegionInfo();
if (regionInfo.getTable().isSystemTable()) {
checkSystemOrSuperUser(getActiveUser(c));
} else {
requirePermission(c, "preOpen", Action.ADMIN);
}
}
}
@Override
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
RegionCoprocessorEnvironment env = c.getEnvironment();
final Region region = env.getRegion();
if (region == null) {
LOG.error("NULL region from RegionCoprocessorEnvironment in postOpen()");
return;
}
if (PermissionStorage.isAclRegion(region)) {
aclRegion = true;
try {
initialize(env);
} catch (IOException ex) {
// if we can't obtain permissions, it's better to fail
// than perform checks incorrectly
throw new RuntimeException("Failed to initialize permissions cache", ex);
}
} else {
initialized = true;
}
}
@Override
public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> c)
throws IOException {
if (c.getEnvironment().getConfiguration()
.getBoolean(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, false)) {
MasterCoprocessorEnvironment mEnv = c.getEnvironment();
if (!(mEnv instanceof HasMasterServices)) {
throw new IOException("Does not implement HMasterServices");
}
masterServices = ((HasMasterServices) mEnv).getMasterServices();
hdfsAclHelper = new SnapshotScannerHDFSAclHelper(masterServices.getConfiguration(),
masterServices.getConnection());
pathHelper = hdfsAclHelper.getPathHelper();
hdfsAclHelper.setCommonDirectoryPermission();
initialized = true;
userProvider = UserProvider.instantiate(c.getEnvironment().getConfiguration());
} else {
LOG.warn("Try to initialize the coprocessor SnapshotScannerHDFSAclController but failure "
+ "because the config " + SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE
+ " is false.");
}
}
@Override
public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Scan scan, RegionScanner s) throws IOException {
if (scan.getAttribute(CHECK_VERIFY_COLUMN) == null) {
return s;
}
return new GlobalIndexScanner(c.getEnvironment(), scan, s, metricsSource);
}
@Override
public void postMergeCommit(ObserverContext<RegionServerCoprocessorEnvironment> ctx,
HRegion regionA, HRegion regionB, HRegion mergedRegion) throws IOException {
if (rmt != null && this.mergedRegion != null) {
RegionServerCoprocessorEnvironment environment = ctx.getEnvironment();
HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
rmt.stepsAfterPONR(rs, rs, this.mergedRegion);
}
}
@Override
public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
final RegionCoprocessorEnvironment env = e.getEnvironment();
SelfHealingTask task = new SelfHealingTask(e.getEnvironment(), timeMaxInterval);
executor.scheduleWithFixedDelay(task, initialDelay, timeInterval, TimeUnit.MILLISECONDS);
}
/**
* Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and
* re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
* for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
* the same from a custom filter.
* @param offset starting position in the rowkey.
* @param scan
* @param tupleProjector
* @param dataRegion
* @param indexMaintainer
* @param viewConstants
*/
RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
final RegionScanner s, final int offset, final Scan scan,
final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
final Region dataRegion, final IndexMaintainer indexMaintainer,
final byte[][] viewConstants, final TupleProjector projector,
final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) {
RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(c.getEnvironment());
return regionScannerFactory.getWrappedScanner(c.getEnvironment(), s, null, null, offset, scan, dataColumns, tupleProjector,
dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex);
}
/**
* Replaces the RegionScanner s with a RegionScanner that groups by the key formed by the list
* of expressions from the scan and returns the aggregated rows of each group. For example,
* given the following original rows in the RegionScanner: KEY COL1 row1 a row2 b row3 a row4 a
* the following rows will be returned for COUNT(*): KEY COUNT a 3 b 1 The client is required to
* do a sort and a final aggregation, since multiple rows with the same key may be returned from
* different regions.
*/
@Override
protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Scan scan, RegionScanner s) throws IOException {
boolean keyOrdered = false;
byte[] expressionBytes = scan.getAttribute(UNORDERED_GROUP_BY_EXPRESSIONS);
if (expressionBytes == null) {
expressionBytes = scan.getAttribute(KEY_ORDERED_GROUP_BY_EXPRESSIONS);
if (expressionBytes == null) {
return s;
}
keyOrdered = true;
}
List<Expression> expressions = deserializeGroupByExpressions(expressionBytes);
ServerAggregators aggregators =
ServerAggregators.deserialize(scan
.getAttribute(GroupedAggregateRegionObserver.AGGREGATORS), c
.getEnvironment().getConfiguration());
final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
RegionScanner innerScanner = s;
if (p != null || j != null) {
innerScanner =
new HashJoinRegionScanner(s, p, j, ScanUtil.getTenantId(scan),
c.getEnvironment());
}
if (keyOrdered) { // Optimize by taking advantage that the rows are
// already in the required group by key order
return scanOrdered(c, scan, innerScanner, expressions, aggregators);
} else { // Otherwse, collect them all up in an in memory map
return scanUnordered(c, scan, innerScanner, expressions, aggregators);
}
}
@Override
public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Delete delete, final WALEdit edit, final Durability durability)
throws IOException {
// An ACL on a delete is useless, we shouldn't allow it
if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
throw new DoNotRetryIOException("ACL on delete has no effect: " + delete.toString());
}
// Require WRITE permissions on all cells covered by the delete. Unlike
// for Puts we need to check all visible prior versions, because a major
// compaction could remove them. If the user doesn't have permission to
// overwrite any of the visible versions ('visible' defined as not covered
// by a tombstone already) then we have to disallow this operation.
RegionCoprocessorEnvironment env = c.getEnvironment();
Map<byte[],? extends Collection<Cell>> families = delete.getFamilyCellMap();
User user = getActiveUser(c);
AuthResult authResult = permissionGranted(OpType.DELETE,
user, env, families, Action.WRITE);
AccessChecker.logResult(authResult);
if (!authResult.isAllowed()) {
if (cellFeaturesEnabled && !compatibleEarlyTermination) {
delete.setAttribute(CHECK_COVERING_PERM, TRUE);
} else if (authorizationEnabled) {
throw new AccessDeniedException("Insufficient permissions " +
authResult.toContextString());
}
}
}
@Override
public boolean preCheckAndDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOperator op,
final ByteArrayComparable comparator, final Delete delete,
final boolean result) throws IOException {
// An ACL on a delete is useless, we shouldn't allow it
if (delete.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL) != null) {
throw new DoNotRetryIOException("ACL on checkAndDelete has no effect: " +
delete.toString());
}
// Require READ and WRITE permissions on the table, CF, and the KV covered
// by the delete
RegionCoprocessorEnvironment env = c.getEnvironment();
Map<byte[],? extends Collection<byte[]>> families = makeFamilyMap(family, qualifier);
User user = getActiveUser(c);
AuthResult authResult = permissionGranted(
OpType.CHECK_AND_DELETE, user, env, families, Action.READ, Action.WRITE);
AccessChecker.logResult(authResult);
if (!authResult.isAllowed()) {
if (cellFeaturesEnabled && !compatibleEarlyTermination) {
delete.setAttribute(CHECK_COVERING_PERM, TRUE);
} else if (authorizationEnabled) {
throw new AccessDeniedException("Insufficient permissions " +
authResult.toContextString());
}
}
return result;
}
@Override
public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
throws IOException {
User user = getActiveUser(c);
checkForReservedTagPresence(user, append);
// Require WRITE permission to the table, CF, and the KV to be appended
RegionCoprocessorEnvironment env = c.getEnvironment();
Map<byte[],? extends Collection<Cell>> families = append.getFamilyCellMap();
AuthResult authResult = permissionGranted(OpType.APPEND, user,
env, families, Action.WRITE);
AccessChecker.logResult(authResult);
if (!authResult.isAllowed()) {
if (cellFeaturesEnabled && !compatibleEarlyTermination) {
append.setAttribute(CHECK_COVERING_PERM, TRUE);
} else if (authorizationEnabled) {
throw new AccessDeniedException("Insufficient permissions " +
authResult.toContextString());
}
}
byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
if (bytes != null) {
if (cellFeaturesEnabled) {
addCellPermissions(bytes, append.getFamilyCellMap());
} else {
throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
}
}
return null;
}
@Override
public void preSplitBeforePONR(ObserverContext<RegionCoprocessorEnvironment> ctx,
byte[] splitKey, List<Mutation> metaEntries) throws IOException {
RegionCoprocessorEnvironment environment = ctx.getEnvironment();
HTableDescriptor tableDesc = ctx.getEnvironment().getRegion().getTableDesc();
if (SchemaUtil.isSystemTable(tableDesc.getName())) {
return;
}
RegionServerServices rss = ctx.getEnvironment().getRegionServerServices();
if (tableDesc.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES) == null
|| !Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(tableDesc
.getValue(MetaDataUtil.IS_LOCAL_INDEX_TABLE_PROP_BYTES)))) {
TableName indexTable =
TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(tableDesc.getName()));
if (!MetaTableAccessor.tableExists(rss.getConnection(), indexTable)) return;
HRegion indexRegion = IndexUtil.getIndexRegion(environment);
if (indexRegion == null) {
LOG.warn("Index region corresponindg to data region " + environment.getRegion()
+ " not in the same server. So skipping the split.");
ctx.bypass();
return;
}
try {
int encodedVersion = VersionUtil.encodeVersion(environment.getHBaseVersion());
if(encodedVersion >= SPLIT_TXN_MINIMUM_SUPPORTED_VERSION) {
st = new SplitTransaction(indexRegion, splitKey);
st.useZKForAssignment =
environment.getConfiguration().getBoolean("hbase.assignment.usezk",
true);
} else {
st = new IndexSplitTransaction(indexRegion, splitKey);
}
if (!st.prepare()) {
LOG.error("Prepare for the table " + indexRegion.getTableDesc().getNameAsString()
+ " failed. So returning null. ");
ctx.bypass();
return;
}
indexRegion.forceSplit(splitKey);
daughterRegions = st.stepsBeforePONR(rss, rss, false);
HRegionInfo copyOfParent = new HRegionInfo(indexRegion.getRegionInfo());
copyOfParent.setOffline(true);
copyOfParent.setSplit(true);
// Put for parent
Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
MetaTableAccessor.addDaughtersToPut(putParent,
daughterRegions.getFirst().getRegionInfo(),
daughterRegions.getSecond().getRegionInfo());
metaEntries.add(putParent);
// Puts for daughters
Put putA = MetaTableAccessor.makePutFromRegionInfo(
daughterRegions.getFirst().getRegionInfo());
Put putB = MetaTableAccessor.makePutFromRegionInfo(
daughterRegions.getSecond().getRegionInfo());
st.addLocation(putA, rss.getServerName(), 1);
st.addLocation(putB, rss.getServerName(), 1);
metaEntries.add(putA);
metaEntries.add(putB);
} catch (Exception e) {
ctx.bypass();
LOG.warn("index region splitting failed with the exception ", e);
if (st != null){
st.rollback(rss, rss);
st = null;
daughterRegions = null;
}
}
}
}
/**
* Used for an aggregate query in which the key order does not necessarily match the group by
* key order. In this case, we must collect all distinct groups within a region into a map,
* aggregating as we go.
* @param limit TODO
*/
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
final RegionScanner scanner, final List<Expression> expressions,
final ServerAggregators aggregators, long limit) throws IOException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations(
"Grouped aggregation over unordered rows with scan " + scan
+ ", group by " + expressions + ", aggregators " + aggregators,
ScanUtil.getCustomAnnotations(scan)));
}
RegionCoprocessorEnvironment env = c.getEnvironment();
Configuration conf = env.getConfiguration();
int estDistVals = conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES);
byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
if (estDistValsBytes != null) {
// Allocate 1.5x estimation
estDistVals = Math.max(MIN_DISTINCT_VALUES,
(int) (Bytes.toInt(estDistValsBytes) * 1.5f));
}
Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
final boolean spillableEnabled =
conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
GroupByCache groupByCache =
GroupByCacheFactory.INSTANCE.newCache(
env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan),
aggregators, estDistVals);
boolean success = false;
try {
boolean hasMore;
Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(LogUtil.addCustomAnnotations(
"Spillable groupby enabled: " + spillableEnabled,
ScanUtil.getCustomAnnotations(scan)));
}
Region region = c.getEnvironment().getRegion();
boolean acquiredLock = false;
try {
region.startRegionOperation();
acquiredLock = true;
synchronized (scanner) {
do {
List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there are
// more values after the
// ones returned
hasMore = scanner.nextRaw(results);
if (!results.isEmpty()) {
result.setKeyValues(results);
ImmutableBytesPtr key =
TupleUtil.getConcatenatedValue(result, expressions);
Aggregator[] rowAggregators = groupByCache.cache(key);
// Aggregate values here
aggregators.aggregate(rowAggregators, result);
}
} while (hasMore && groupByCache.size() < limit);
}
} finally {
if (acquiredLock) region.closeRegionOperation();
}
RegionScanner regionScanner = groupByCache.getScanner(scanner);
// Do not sort here, but sort back on the client instead
// The reason is that if the scan ever extends beyond a region
// (which can happen if we're basing our parallelization split
// points on old metadata), we'll get incorrect query results.
success = true;
return regionScanner;
} finally {
if (!success) {
Closeables.closeQuietly(groupByCache);
}
}
}
@Override
protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable {
NonAggregateRegionScannerFactory nonAggregateROUtil = new NonAggregateRegionScannerFactory(c.getEnvironment());
return nonAggregateROUtil.getRegionScanner(scan, s);
}
/**
* Used for an aggregate query in which the key order does not necessarily match the group by
* key order. In this case, we must collect all distinct groups within a region into a map,
* aggregating as we go.
* @param limit TODO
*/
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
final RegionScanner s, final List<Expression> expressions,
final ServerAggregators aggregators, long limit) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan
+ ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan)));
}
RegionCoprocessorEnvironment env = c.getEnvironment();
Configuration conf = env.getConfiguration();
int estDistVals = conf.getInt(GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB, DEFAULT_GROUPBY_ESTIMATED_DISTINCT_VALUES);
byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES);
if (estDistValsBytes != null) {
// Allocate 1.5x estimation
estDistVals = Math.max(MIN_DISTINCT_VALUES,
(int) (Bytes.toInt(estDistValsBytes) * 1.5f));
}
final boolean spillableEnabled =
conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
GroupByCache groupByCache =
GroupByCacheFactory.INSTANCE.newCache(
env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan),
aggregators, estDistVals);
boolean success = false;
try {
boolean hasMore;
MultiKeyValueTuple result = new MultiKeyValueTuple();
if (logger.isDebugEnabled()) {
logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan)));
}
HRegion region = c.getEnvironment().getRegion();
region.startRegionOperation();
try {
do {
List<Cell> results = new ArrayList<Cell>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there are
// more values after the
// ones returned
hasMore = s.nextRaw(results);
if (!results.isEmpty()) {
result.setKeyValues(results);
ImmutableBytesWritable key =
TupleUtil.getConcatenatedValue(result, expressions);
Aggregator[] rowAggregators = groupByCache.cache(key);
// Aggregate values here
aggregators.aggregate(rowAggregators, result);
}
} while (hasMore && groupByCache.size() < limit);
} finally {
region.closeRegionOperation();
}
RegionScanner regionScanner = groupByCache.getScanner(s);
// Do not sort here, but sort back on the client instead
// The reason is that if the scan ever extends beyond a region
// (which can happen if we're basing our parallelization split
// points on old metadata), we'll get incorrect query results.
success = true;
return regionScanner;
} finally {
if (!success) {
Closeables.closeQuietly(groupByCache);
}
}
}
/**
* Replaces the RegionScanner s with a RegionScanner that groups by the key formed by the list
* of expressions from the scan and returns the aggregated rows of each group. For example,
* given the following original rows in the RegionScanner: KEY COL1 row1 a row2 b row3 a row4 a
* the following rows will be returned for COUNT(*): KEY COUNT a 3 b 1 The client is required to
* do a sort and a final aggregation, since multiple rows with the same key may be returned from
* different regions.
*/
@Override
protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Scan scan, RegionScanner s) throws IOException {
boolean keyOrdered = false;
byte[] expressionBytes = scan.getAttribute(BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS);
if (expressionBytes == null) {
expressionBytes = scan.getAttribute(BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS);
keyOrdered = true;
}
int offset = 0;
boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
if (ScanUtil.isLocalIndex(scan)) {
/*
* For local indexes, we need to set an offset on row key expressions to skip
* the region start key.
*/
Region region = c.getEnvironment().getRegion();
offset = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length :
region.getRegionInfo().getEndKey().length;
ScanUtil.setRowKeyOffset(scan, offset);
}
List<Expression> expressions = deserializeGroupByExpressions(expressionBytes, 0);
final TenantCache tenantCache = GlobalCache.getTenantCache(c.getEnvironment(), ScanUtil.getTenantId(scan));
try (MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
ServerAggregators aggregators =
ServerAggregators.deserialize(scan
.getAttribute(BaseScannerRegionObserver.AGGREGATORS), c
.getEnvironment().getConfiguration(), em);
RegionScanner innerScanner = s;
boolean useProto = false;
byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
useProto = localIndexBytes != null;
if (localIndexBytes == null) {
localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
}
List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
TupleProjector tupleProjector = null;
byte[][] viewConstants = null;
ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) {
if (dataColumns != null) {
tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
}
ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
innerScanner =
getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector,
c.getEnvironment().getRegion(), indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
}
if (j != null) {
innerScanner =
new HashJoinRegionScanner(innerScanner, p, j, ScanUtil.getTenantId(scan),
c.getEnvironment(), useQualifierAsIndex, useNewValueColumnQualifier);
}
long limit = Long.MAX_VALUE;
byte[] limitBytes = scan.getAttribute(GROUP_BY_LIMIT);
if (limitBytes != null) {
limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, SortOrder.getDefault());
}
if (keyOrdered) { // Optimize by taking advantage that the rows are
// already in the required group by key order
return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit);
} else { // Otherwse, collect them all up in an in memory map
return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit);
}
}
}
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
Mutation m = miniBatchOp.getOperation(0);
if (!codec.isEnabled(m)) {
return;
}
PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaDataBuilder(c.getEnvironment()).getIndexMetaData(miniBatchOp);
if ( indexMetaData.getClientVersion() >= MetaDataProtocol.MIN_TX_CLIENT_SIDE_MAINTENANCE
&& !indexMetaData.hasLocalIndexes()) { // Still generate index updates server side for local indexes
return;
}
BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
setBatchMutateContext(c, context);
Collection<Pair<Mutation, byte[]>> indexUpdates = null;
// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
Span current = scope.getSpan();
if (current == null) {
current = NullSpan.INSTANCE;
}
RegionCoprocessorEnvironment env = c.getEnvironment();
PhoenixTransactionContext txnContext = indexMetaData.getTransactionContext();
if (txnContext == null) {
throw new NullPointerException("Expected to find transaction in metadata for " + env.getRegionInfo().getTable().getNameAsString());
}
PhoenixTxIndexMutationGenerator generator = new PhoenixTxIndexMutationGenerator(env.getConfiguration(), indexMetaData,
env.getRegionInfo().getTable().getName(),
env.getRegionInfo().getStartKey(),
env.getRegionInfo().getEndKey());
try (Table htable = env.getConnection().getTable(env.getRegionInfo().getTable())) {
// get the index updates for all elements in this batch
indexUpdates = generator.getIndexUpdates(htable, getMutationIterator(miniBatchOp));
}
byte[] tableName = c.getEnvironment().getRegionInfo().getTable().getName();
Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
while(indexUpdatesItr.hasNext()) {
Pair<Mutation, byte[]> next = indexUpdatesItr.next();
if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
// These mutations will not go through the preDelete hooks, so we
// must manually convert them here.
Mutation mutation = TransactionUtil.convertIfDelete(next.getFirst());
localUpdates.add(mutation);
indexUpdatesItr.remove();
}
}
if (!localUpdates.isEmpty()) {
miniBatchOp.addOperationsFromCP(0,
localUpdates.toArray(new Mutation[localUpdates.size()]));
}
if (!indexUpdates.isEmpty()) {
context.indexUpdates = indexUpdates;
}
current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", context.indexUpdates.size());
} catch (Throwable t) {
String msg = "Failed to update index with entries:" + indexUpdates;
LOGGER.error(msg, t);
ServerUtil.throwIOException(msg, t);
}
}
/**
* Used for an aggregate query in which the key order does not necessarily match the group by
* key order. In this case, we must collect all distinct groups within a region into a map,
* aggregating as we go.
*/
private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
final RegionScanner s, final List<Expression> expressions,
final ServerAggregators aggregators) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("Grouped aggregation over unordered rows with scan " + scan
+ ", group by " + expressions + ", aggregators " + aggregators);
}
int estDistVals = DEFAULT_ESTIMATED_DISTINCT_VALUES;
byte[] estDistValsBytes = scan.getAttribute(ESTIMATED_DISTINCT_VALUES);
if (estDistValsBytes != null) {
// Allocate 1.5x estimation
estDistVals = Math.min(MIN_DISTINCT_VALUES,
(int) (Bytes.toInt(estDistValsBytes) * 1.5f));
}
RegionCoprocessorEnvironment env = c.getEnvironment();
Configuration conf = env.getConfiguration();
final boolean spillableEnabled =
conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE);
GroupByCache groupByCache =
GroupByCacheFactory.INSTANCE.newCache(
env, ScanUtil.getTenantId(scan),
aggregators, estDistVals);
boolean success = false;
try {
boolean hasMore;
MultiKeyValueTuple result = new MultiKeyValueTuple();
if (logger.isDebugEnabled()) {
logger.debug("Spillable groupby enabled: " + spillableEnabled);
}
HRegion region = c.getEnvironment().getRegion();
MultiVersionConsistencyControl.setThreadReadPoint(s.getMvccReadPoint());
region.startRegionOperation();
try {
do {
List<KeyValue> results = new ArrayList<KeyValue>();
// Results are potentially returned even when the return
// value of s.next is false
// since this is an indication of whether or not there are
// more values after the
// ones returned
hasMore = s.nextRaw(results, null);
if (!results.isEmpty()) {
result.setKeyValues(results);
ImmutableBytesWritable key =
TupleUtil.getConcatenatedValue(result, expressions);
Aggregator[] rowAggregators = groupByCache.cache(key);
// Aggregate values here
aggregators.aggregate(rowAggregators, result);
}
} while (hasMore);
} finally {
region.closeRegionOperation();
}
RegionScanner regionScanner = groupByCache.getScanner(s);
// Do not sort here, but sort back on the client instead
// The reason is that if the scan ever extends beyond a region
// (which can happen if we're basing our parallelization split
// points on old metadata), we'll get incorrect query results.
success = true;
return regionScanner;
} finally {
if (!success) {
Closeables.closeQuietly(groupByCache);
}
}
}