下面列出了怎么用org.apache.hadoop.hbase.client.IsolationLevel的API类实例代码及写法,或者点击链接到github查看源代码。
private void assertICV(byte [] row,
byte [] familiy,
byte[] qualifier,
long amount,
boolean fast) throws IOException {
// run a get and see?
Get get = new Get(row);
if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
get.addColumn(familiy, qualifier);
Result result = region.get(get);
assertEquals(1, result.size());
Cell kv = result.rawCells()[0];
long r = Bytes.toLong(CellUtil.cloneValue(kv));
assertEquals(amount, r);
}
public void initialize(InputSplit split, Configuration conf) throws IOException {
this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
this.split = split;
this.rowLimitPerSplit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0);
TableDescriptor htd = split.htd;
RegionInfo hri = this.split.getRegionInfo();
FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);
// region is immutable, this should be fine,
// otherwise we have to set the thread read point
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
// disable caching of data blocks
scan.setCacheBlocks(false);
scan.setScanMetricsEnabled(true);
scanner =
new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
}
public SkeletonClientSideRegionScanner(Configuration conf,
FileSystem fs,
Path rootDir,
HTableDescriptor htd,
HRegionInfo hri,
Scan scan, String hostAndPort) throws IOException {
if (LOG.isDebugEnabled())
SpliceLogUtils.debug(LOG, "init for regionInfo=%s, scan=%s", hri,scan);
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
this.conf = conf;
this.fs = fs;
this.rootDir = rootDir;
this.htd = htd;
this.hri = new SpliceHRegionInfo(hri);
this.scan = scan;
this.hostAndPort = hostAndPort;
}
public static void doScan(HRegion region, Scan scan, List<Cell> result) throws IOException {
InternalScanner scanner = null;
try {
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
scanner = region.getScanner(scan);
result.clear();
scanner.next(result);
} finally {
if (scanner != null) {
scanner.close();
}
}
}
public SnapshotScanner(Configuration conf, FileSystem fs, Path rootDir,
TableDescriptor htd, RegionInfo hri, Scan scan) throws Throwable{
LOGGER.info("Creating SnapshotScanner for region: " + hri);
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
values = new ArrayList<>();
this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);
this.scan = scan;
RegionCoprocessorEnvironment snapshotEnv = getSnapshotContextEnvironment(conf);
// Collect statistics during scan if ANALYZE_TABLE attribute is set
if (ScanUtil.isAnalyzeTable(scan)) {
this.scanner = region.getScanner(scan);
PhoenixConnection connection = (PhoenixConnection) ConnectionUtil.getInputConnection(conf, new Properties());
String tableName = region.getTableDescriptor().getTableName().getNameAsString();
TableName physicalTableName = SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, conf);
Table table = connection.getQueryServices().getTable(physicalTableName.getName());
StatisticsWriter statsWriter = StatisticsWriter.newWriter(connection, tableName, HConstants.LATEST_TIMESTAMP);
statisticsCollector = new DefaultStatisticsCollector(conf, region,
tableName, null, null, null, statsWriter, table);
} else if (scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null) {
RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(snapshotEnv);
this.scanner = regionScannerFactory.getRegionScanner(scan, region.getScanner(scan));
statisticsCollector = new NoOpStatisticsCollector();
} else {
/* future work : Snapshot M/R jobs for aggregate queries*/
throw new UnsupportedOperationException("Snapshot M/R jobs not available for aggregate queries");
}
statisticsCollector.init();
region.startRegionOperation();
}
@Override
public Void call() throws Exception {
// Taking the region read lock prevents the individual region from being closed while a
// snapshot is in progress. This is helpful but not sufficient for preventing races with
// snapshots that involve multiple regions and regionservers. It is still possible to have
// an interleaving such that globally regions are missing, so we still need the verification
// step.
LOG.debug("Starting snapshot operation on " + region);
region.startRegionOperation(Operation.SNAPSHOT);
try {
if (skipFlush) {
/*
* This is to take an online-snapshot without force a coordinated flush to prevent pause
* The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
* should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
* turned on/off based on the flush type.
* To minimized the code change, class name is not changed.
*/
LOG.debug("take snapshot without flush memstore first");
} else {
LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
boolean succeeded = false;
long readPt = region.getReadPoint(IsolationLevel.READ_COMMITTED);
for (int i = 0; i < MAX_RETRIES; i++) {
FlushResult res = region.flush(true);
if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) {
// CANNOT_FLUSH may mean that a flush is already on-going
// we need to wait for that flush to complete
region.waitForFlushes();
if (region.getMaxFlushedSeqId() >= readPt) {
// writes at the start of the snapshot have been persisted
succeeded = true;
break;
}
} else {
succeeded = true;
break;
}
}
if (!succeeded) {
throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts");
}
}
region.addRegionToSnapshot(snapshotDesc, monitor);
if (skipFlush) {
LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed.");
} else {
LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
}
} finally {
LOG.debug("Closing snapshot operation on " + region);
region.closeRegionOperation(Operation.SNAPSHOT);
}
return null;
}
@Override
public void getRegionServerLSN(RpcController controller,
SpliceMessage.GetRegionServerLSNRequest request,
RpcCallback<SpliceMessage.GetRegionServerLSNResponse> done) {
SpliceMessage.GetRegionServerLSNResponse.Builder responseBuilder =
SpliceMessage.GetRegionServerLSNResponse.newBuilder();
List<? extends Region> regions = regionServerServices.getRegions();
String walGroupId = request.hasWalGroupId() ? request.getWalGroupId() : null;
try {
for (Region region : regions) {
HRegion hRegion = (HRegion) region;
NavigableMap<byte[], java.lang.Integer> replicationScope = hRegion.getReplicationScope();
if (region.isReadOnly() || replicationScope.isEmpty()){
// skip regions not enabled for replication
continue;
}
if (walGroupId != null) {
// skip regions for a different wal group
WAL wal = regionServerServices.getWAL(region.getRegionInfo());
if (wal.toString().indexOf(walGroupId) == -1) {
continue;
}
}
long readPoint = ((HRegion) region).getReadPoint(IsolationLevel.READ_COMMITTED);
String encodedRegionName = region.getRegionInfo().getEncodedName();
responseBuilder.addResult(
SpliceMessage.GetRegionServerLSNResponse.Result.
newBuilder().
setLsn(readPoint).
setRegionName(encodedRegionName).
setValid(true).build()
);
}
SpliceMessage.GetRegionServerLSNResponse response = responseBuilder.build();
done.run(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
}