org.apache.hadoop.hbase.regionserver.Region#batchMutate ( )源码实例Demo

下面列出了org.apache.hadoop.hbase.regionserver.Region#batchMutate ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
  if (mutations.isEmpty()) {
      return;
  }

    Mutation[] mutationArray = new Mutation[mutations.size()];
  // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
  // flush happen which decrease the memstore size and then writes allowed on the region.
  for (int i = 0; blockingMemstoreSize > 0 && (region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize()) > blockingMemstoreSize
            && i < 30; i++) {
      try {
          checkForRegionClosingOrSplitting();
          Thread.sleep(100);
      } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new IOException(e);
      }
  }
  // TODO: should we use the one that is all or none?
  LOGGER.debug("Committing batch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString());
  region.batchMutate(mutations.toArray(mutationArray));
}
 
源代码2 项目: phoenix   文件: IndexRegionObserver.java
/**
 * We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing
 * sets up the necessary locks and mvcc to allow an atomic update. The Increment is not a
 * real increment, though, it's really more of a Put. We translate the Increment into a
 * list of mutations, at most a single Put and Delete that are the changes upon executing
 * the list of ON DUPLICATE KEY clauses for this row.
 */
@Override
public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
        final Increment inc) throws IOException {
    long start = EnvironmentEdgeManager.currentTimeMillis();
    try {
        List<Mutation> mutations = this.builder.executeAtomicOp(inc);
        if (mutations == null) {
            return null;
        }

        // Causes the Increment to be ignored as we're committing the mutations
        // ourselves below.
        e.bypass();
        // ON DUPLICATE KEY IGNORE will return empty list if row already exists
        // as no action is required in that case.
        if (!mutations.isEmpty()) {
            Region region = e.getEnvironment().getRegion();
            // Otherwise, submit the mutations directly here
              region.batchMutate(mutations.toArray(new Mutation[0]));
        }
        return Result.EMPTY_RESULT;
    } catch (Throwable t) {
        throw ServerUtil.createIOException(
                "Unable to process ON DUPLICATE IGNORE for " + 
                e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString() + 
                "(" + Bytes.toStringBinary(inc.getRow()) + ")", t);
    } finally {
        long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
        if (duration >= slowIndexPrepareThreshold) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(getCallTooSlowMessage("preIncrementAfterRowLock", duration, slowPreIncrementThreshold));
            }
            metricSource.incrementSlowDuplicateKeyCheckCalls();
        }
        metricSource.updateDuplicateKeyCheckTime(duration);
    }
}
 
源代码3 项目: phoenix   文件: Indexer.java
/**
 * We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing
 * sets up the necessary locks and mvcc to allow an atomic update. The Increment is not a
 * real increment, though, it's really more of a Put. We translate the Increment into a
 * list of mutations, at most a single Put and Delete that are the changes upon executing
 * the list of ON DUPLICATE KEY clauses for this row.
 */
@Override
public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
        final Increment inc) throws IOException {
    long start = EnvironmentEdgeManager.currentTimeMillis();
    try {
        List<Mutation> mutations = this.builder.executeAtomicOp(inc);
        if (mutations == null) {
            return null;
        }

        // Causes the Increment to be ignored as we're committing the mutations
        // ourselves below.
        e.bypass();
        // ON DUPLICATE KEY IGNORE will return empty list if row already exists
        // as no action is required in that case.
        if (!mutations.isEmpty()) {
            Region region = e.getEnvironment().getRegion();
            // Otherwise, submit the mutations directly here
              region.batchMutate(mutations.toArray(new Mutation[0]));
        }
        return Result.EMPTY_RESULT;
    } catch (Throwable t) {
        throw ServerUtil.createIOException(
                "Unable to process ON DUPLICATE IGNORE for " + 
                e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString() + 
                "(" + Bytes.toStringBinary(inc.getRow()) + ")", t);
    } finally {
        long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
        if (duration >= slowIndexPrepareThreshold) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(getCallTooSlowMessage("preIncrementAfterRowLock",
                        duration, slowPreIncrementThreshold));
            }
            metricSource.incrementSlowDuplicateKeyCheckCalls();
        }
        metricSource.updateDuplicateKeyCheckTime(duration);
    }
}
 
源代码4 项目: phoenix   文件: IndexUtil.java
public static void writeLocalUpdates(Region region, final List<Mutation> mutations, boolean skipWAL) throws IOException {
    if(skipWAL) {
        for (Mutation m : mutations) {
            m.setDurability(Durability.SKIP_WAL);
        }
    }
    region.batchMutate(mutations.toArray(new Mutation[mutations.size()]));
}