下面列出了org.apache.hadoop.hbase.regionserver.Region#batchMutate ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
if (mutations.isEmpty()) {
return;
}
Mutation[] mutationArray = new Mutation[mutations.size()];
// When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
// flush happen which decrease the memstore size and then writes allowed on the region.
for (int i = 0; blockingMemstoreSize > 0 && (region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize()) > blockingMemstoreSize
&& i < 30; i++) {
try {
checkForRegionClosingOrSplitting();
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
// TODO: should we use the one that is all or none?
LOGGER.debug("Committing batch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString());
region.batchMutate(mutations.toArray(mutationArray));
}
/**
* We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing
* sets up the necessary locks and mvcc to allow an atomic update. The Increment is not a
* real increment, though, it's really more of a Put. We translate the Increment into a
* list of mutations, at most a single Put and Delete that are the changes upon executing
* the list of ON DUPLICATE KEY clauses for this row.
*/
@Override
public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment inc) throws IOException {
long start = EnvironmentEdgeManager.currentTimeMillis();
try {
List<Mutation> mutations = this.builder.executeAtomicOp(inc);
if (mutations == null) {
return null;
}
// Causes the Increment to be ignored as we're committing the mutations
// ourselves below.
e.bypass();
// ON DUPLICATE KEY IGNORE will return empty list if row already exists
// as no action is required in that case.
if (!mutations.isEmpty()) {
Region region = e.getEnvironment().getRegion();
// Otherwise, submit the mutations directly here
region.batchMutate(mutations.toArray(new Mutation[0]));
}
return Result.EMPTY_RESULT;
} catch (Throwable t) {
throw ServerUtil.createIOException(
"Unable to process ON DUPLICATE IGNORE for " +
e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString() +
"(" + Bytes.toStringBinary(inc.getRow()) + ")", t);
} finally {
long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
if (duration >= slowIndexPrepareThreshold) {
if (LOG.isDebugEnabled()) {
LOG.debug(getCallTooSlowMessage("preIncrementAfterRowLock", duration, slowPreIncrementThreshold));
}
metricSource.incrementSlowDuplicateKeyCheckCalls();
}
metricSource.updateDuplicateKeyCheckTime(duration);
}
}
/**
* We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing
* sets up the necessary locks and mvcc to allow an atomic update. The Increment is not a
* real increment, though, it's really more of a Put. We translate the Increment into a
* list of mutations, at most a single Put and Delete that are the changes upon executing
* the list of ON DUPLICATE KEY clauses for this row.
*/
@Override
public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment inc) throws IOException {
long start = EnvironmentEdgeManager.currentTimeMillis();
try {
List<Mutation> mutations = this.builder.executeAtomicOp(inc);
if (mutations == null) {
return null;
}
// Causes the Increment to be ignored as we're committing the mutations
// ourselves below.
e.bypass();
// ON DUPLICATE KEY IGNORE will return empty list if row already exists
// as no action is required in that case.
if (!mutations.isEmpty()) {
Region region = e.getEnvironment().getRegion();
// Otherwise, submit the mutations directly here
region.batchMutate(mutations.toArray(new Mutation[0]));
}
return Result.EMPTY_RESULT;
} catch (Throwable t) {
throw ServerUtil.createIOException(
"Unable to process ON DUPLICATE IGNORE for " +
e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString() +
"(" + Bytes.toStringBinary(inc.getRow()) + ")", t);
} finally {
long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
if (duration >= slowIndexPrepareThreshold) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(getCallTooSlowMessage("preIncrementAfterRowLock",
duration, slowPreIncrementThreshold));
}
metricSource.incrementSlowDuplicateKeyCheckCalls();
}
metricSource.updateDuplicateKeyCheckTime(duration);
}
}
public static void writeLocalUpdates(Region region, final List<Mutation> mutations, boolean skipWAL) throws IOException {
if(skipWAL) {
for (Mutation m : mutations) {
m.setDurability(Durability.SKIP_WAL);
}
}
region.batchMutate(mutations.toArray(new Mutation[mutations.size()]));
}