下面列出了怎么用org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Map<String, Object> count(String task) {
MappingConfig config = hbaseMapping.get(task);
String hbaseTable = config.getHbaseMapping().getHbaseTable();
long rowCount = 0L;
try {
HTable table = (HTable) hbaseTemplate.getConnection().getTable(TableName.valueOf(hbaseTable));
Scan scan = new Scan();
scan.setFilter(new FirstKeyOnlyFilter());
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
rowCount += result.size();
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
Map<String, Object> res = new LinkedHashMap<>();
res.put("hbaseTable", hbaseTable);
res.put("count", rowCount);
return res;
}
public static boolean isReallyEmptyRegion(HConnection connection,
String tableName, HRegionInfo regionInfo) throws IOException {
boolean emptyRegion = false;
// verify really empty region by scanning records
try (HTableInterface table = connection.getTable(tableName)) {
Scan scan = new Scan(regionInfo.getStartKey(), regionInfo.getEndKey());
FilterList filterList = new FilterList();
filterList.addFilter(new KeyOnlyFilter());
filterList.addFilter(new FirstKeyOnlyFilter());
scan.setFilter(filterList);
scan.setCacheBlocks(false);
scan.setSmall(true);
scan.setCaching(1);
try (ResultScanner scanner = table.getScanner(scan)) {
if (scanner.next() == null) emptyRegion = true;
}
}
return emptyRegion;
}
@Override
public void run() {
try (HTableInterface table = connection.getTable(tableName.getBytes())) {
// Do not use Get not to increase read request count metric.
// Use Scan.
Scan scan = new Scan("".getBytes(), "".getBytes());
FilterList filterList = new FilterList();
filterList.addFilter(new KeyOnlyFilter());
filterList.addFilter(new FirstKeyOnlyFilter());
scan.setFilter(filterList);
//noinspection EmptyTryBlock
try(ResultScanner ignored = table.getScanner(scan)) {
}
return;
} catch (IOException ignore) {
}
clean(tableName);
}
public static boolean isReallyEmptyRegion(HConnection connection,
String tableName, HRegionInfo regionInfo) throws IOException {
boolean emptyRegion = false;
// verify really empty region by scanning records
try (HTableInterface table = connection.getTable(tableName)) {
Scan scan = new Scan(regionInfo.getStartKey(), regionInfo.getEndKey());
FilterList filterList = new FilterList();
filterList.addFilter(new KeyOnlyFilter());
filterList.addFilter(new FirstKeyOnlyFilter());
scan.setFilter(filterList);
scan.setCacheBlocks(false);
scan.setSmall(true);
scan.setCaching(1);
try (ResultScanner scanner = table.getScanner(scan)) {
if (scanner.next() == null) emptyRegion = true;
}
}
return emptyRegion;
}
@Override
public void run() {
try (HTableInterface table = connection.getTable(tableName.getBytes())) {
// Do not use Get not to increase read request count metric.
// Use Scan.
Scan scan = new Scan("".getBytes(), "".getBytes());
FilterList filterList = new FilterList();
filterList.addFilter(new KeyOnlyFilter());
filterList.addFilter(new FirstKeyOnlyFilter());
scan.setFilter(filterList);
//noinspection EmptyTryBlock
try(ResultScanner ignored = table.getScanner(scan)) {
}
return;
} catch (IOException ignore) {
}
clean(tableName);
}
public static boolean isReallyEmptyRegion(HConnection connection,
String tableName, HRegionInfo regionInfo) throws IOException {
boolean emptyRegion = false;
// verify really empty region by scanning records
try (HTableInterface table = connection.getTable(tableName)) {
Scan scan = new Scan(regionInfo.getStartKey(), regionInfo.getEndKey());
FilterList filterList = new FilterList();
filterList.addFilter(new KeyOnlyFilter());
filterList.addFilter(new FirstKeyOnlyFilter());
scan.setFilter(filterList);
scan.setCacheBlocks(false);
scan.setSmall(true);
scan.setCaching(1);
try (ResultScanner scanner = table.getScanner(scan)) {
if (scanner.next() == null) emptyRegion = true;
}
}
return emptyRegion;
}
@Override
public void run() {
try (HTableInterface table = connection.getTable(tableName.getBytes())) {
// Do not use Get not to increase read request count metric.
// Use Scan.
Scan scan = new Scan("".getBytes(), "".getBytes());
FilterList filterList = new FilterList();
filterList.addFilter(new KeyOnlyFilter());
filterList.addFilter(new FirstKeyOnlyFilter());
scan.setFilter(filterList);
//noinspection EmptyTryBlock
try(ResultScanner ignored = table.getScanner(scan)) {
}
return;
} catch (IOException ignore) {
}
clean(tableName);
}
public static boolean isReallyEmptyRegion(HConnection connection,
String tableName, HRegionInfo regionInfo) throws IOException {
boolean emptyRegion = false;
// verify really empty region by scanning records
try (HTableInterface table = connection.getTable(tableName)) {
Scan scan = new Scan(regionInfo.getStartKey(), regionInfo.getEndKey());
FilterList filterList = new FilterList();
filterList.addFilter(new KeyOnlyFilter());
filterList.addFilter(new FirstKeyOnlyFilter());
scan.setFilter(filterList);
scan.setCacheBlocks(false);
scan.setSmall(true);
scan.setCaching(1);
try (ResultScanner scanner = table.getScanner(scan)) {
if (scanner.next() == null) emptyRegion = true;
}
}
return emptyRegion;
}
@Override
public void run() {
try (HTableInterface table = connection.getTable(tableName.getBytes())) {
// Do not use Get not to increase read request count metric.
// Use Scan.
Scan scan = new Scan("".getBytes(), "".getBytes());
FilterList filterList = new FilterList();
filterList.addFilter(new KeyOnlyFilter());
filterList.addFilter(new FirstKeyOnlyFilter());
scan.setFilter(filterList);
//noinspection EmptyTryBlock
try(ResultScanner ignored = table.getScanner(scan)) {
}
return;
} catch (IOException ignore) {
}
clean(tableName);
}
private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region,
long clientTimeStamp) throws IOException {
if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
return null;
}
Scan scan = MetaDataUtil.newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
scan.setFilter(new FirstKeyOnlyFilter());
scan.setRaw(true);
List<Cell> results = Lists.<Cell> newArrayList();
try (RegionScanner scanner = region.getScanner(scan);) {
scanner.next(results);
}
// HBase ignores the time range on a raw scan (HBASE-7362)
if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
Cell kv = results.get(0);
if (kv.getTypeByte() == Type.Delete.getCode()) {
Cache<ImmutableBytesPtr, PTable> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = newDeletedTableMarker(kv.getTimestamp());
metaDataCache.put(cacheKey, table);
return table;
}
}
return null;
}
@After
public void tearDown() throws IOException {
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
ResultScanner scanner = table.getScanner(new Scan().addFamily(HConstants.CATALOG_FAMILY)
.addFamily(HConstants.REPLICATION_BARRIER_FAMILY).setFilter(new FirstKeyOnlyFilter()))) {
for (;;) {
Result result = scanner.next();
if (result == null) {
break;
}
TableName tableName = RegionInfo.getTable(result.getRow());
if (!tableName.isSystemTable()) {
table.delete(new Delete(result.getRow()));
}
}
}
}
private PTable buildDeletedTable(byte[] key, ImmutableBytesPtr cacheKey, HRegion region, long clientTimeStamp) throws IOException {
if (clientTimeStamp == HConstants.LATEST_TIMESTAMP) {
return null;
}
Scan scan = newTableRowsScan(key, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
scan.setFilter(new FirstKeyOnlyFilter());
scan.setRaw(true);
RegionScanner scanner = region.getScanner(scan);
List<KeyValue> results = Lists.<KeyValue>newArrayList();
scanner.next(results);
// HBase ignores the time range on a raw scan (HBASE-7362)
if (!results.isEmpty() && results.get(0).getTimestamp() > clientTimeStamp) {
KeyValue kv = results.get(0);
if (kv.isDelete()) {
Map<ImmutableBytesPtr,PTable> metaDataCache = GlobalCache.getInstance(this.getEnvironment()).getMetaDataCache();
PTable table = newDeletedTableMarker(kv.getTimestamp());
metaDataCache.put(cacheKey, table);
return table;
}
}
return null;
}
@Override
public Map<String, Object> count(String task) {
MappingConfig config = hbaseMapping.get(task);
String hbaseTable = config.getHbaseMapping().getHbaseTable();
long rowCount = 0L;
try {
HTable table = (HTable) hbaseTemplate.getConnection().getTable(TableName.valueOf(hbaseTable));
Scan scan = new Scan();
scan.setFilter(new FirstKeyOnlyFilter());
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
rowCount += result.size();
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
Map<String, Object> res = new LinkedHashMap<>();
res.put("hbaseTable", hbaseTable);
res.put("count", rowCount);
return res;
}
List<WorkflowInstanceExecutionData> executionData(WorkflowId workflowId, String offset, int limit)
throws IOException {
try (final Table eventsTable = connection.getTable(EVENTS_TABLE_NAME)) {
final Scan scan = new Scan()
.setRowPrefixFilter(Bytes.toBytes(workflowId.toKey() + '#'))
.setFilter(new FirstKeyOnlyFilter());
if (!Strings.isNullOrEmpty(offset)) {
final WorkflowInstance offsetInstance = WorkflowInstance.create(workflowId, offset);
scan.setStartRow(Bytes.toBytes(offsetInstance.toKey() + '#'));
}
final Set<WorkflowInstance> workflowInstancesSet = Sets.newHashSet();
try (ResultScanner scanner = eventsTable.getScanner(scan)) {
Result result = scanner.next();
while (result != null) {
final String key = new String(result.getRow());
final int lastHash = key.lastIndexOf('#');
final WorkflowInstance wfi = WorkflowInstance.parseKey(key.substring(0, lastHash));
workflowInstancesSet.add(wfi);
if (workflowInstancesSet.size() == limit) {
break;
}
result = scanner.next();
}
}
return executionData(workflowInstancesSet);
}
}
List<WorkflowInstanceExecutionData> executionData(WorkflowId workflowId, String start, String stop)
throws IOException {
try (final Table eventsTable = connection.getTable(EVENTS_TABLE_NAME)) {
final Scan scan = new Scan()
.setRowPrefixFilter(Bytes.toBytes(workflowId.toKey() + '#'))
.setFilter(new FirstKeyOnlyFilter());
final WorkflowInstance startRow = WorkflowInstance.create(workflowId, start);
scan.setStartRow(Bytes.toBytes(startRow.toKey() + '#'));
if (!Strings.isNullOrEmpty(stop)) {
final WorkflowInstance stopRow = WorkflowInstance.create(workflowId, stop);
scan.setStopRow(Bytes.toBytes(stopRow.toKey() + '#'));
}
final Set<WorkflowInstance> workflowInstancesSet = Sets.newHashSet();
try (ResultScanner scanner = eventsTable.getScanner(scan)) {
Result result = scanner.next();
while (result != null) {
final String key = new String(result.getRow());
final int lastHash = key.lastIndexOf('#');
final WorkflowInstance wfi = WorkflowInstance.parseKey(key.substring(0, lastHash));
workflowInstancesSet.add(wfi);
result = scanner.next();
}
}
return executionData(workflowInstancesSet);
}
}
public static void main(String[] args) {
BigtableOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setFilter(new FirstKeyOnlyFilter());
CloudBigtableScanConfiguration config =
new CloudBigtableScanConfiguration.Builder()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId())
.withScan(scan)
.build();
// [START bigtable_beam_helloworld_read_transforms]
p.apply(Read.from(CloudBigtableIO.read(config)))
.apply(
ParDo.of(
new DoFn<Result, Void>() {
@ProcessElement
public void processElement(@Element Result row, OutputReceiver<Void> out) {
System.out.println(Bytes.toString(row.getRow()));
}
}));
// [END bigtable_beam_helloworld_read_transforms]
p.run().waitUntilFinish();
}
private void testGetTenantIdExpression(boolean isSalted) throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
conn.setAutoCommit(true);
String tableName = "FOO_" + (isSalted ? "SALTED" : "UNSALTED");
conn.createStatement().execute("CREATE TABLE " + tableName + " (k1 VARCHAR NOT NULL, k2 VARCHAR, CONSTRAINT PK PRIMARY KEY(K1,K2)) MULTI_TENANT=true" + (isSalted ? ",SALT_BUCKETS=3" : ""));
conn.createStatement().execute("CREATE SEQUENCE s1");
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('t1','x')");
conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('t2','y')");
Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, "t1");
Connection tsconn = DriverManager.getConnection(getUrl(), props);
tsconn.createStatement().execute("CREATE SEQUENCE s1");
Expression e1 = PhoenixRuntime.getTenantIdExpression(tsconn, PhoenixDatabaseMetaData.SEQUENCE_FULLNAME);
HTableInterface htable1 = tsconn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
assertTenantIds(e1, htable1, new FirstKeyOnlyFilter(), new String[] {"", "t1"} );
tsconn.createStatement().execute("CREATE VIEW A.BAR(V1 VARCHAR) AS SELECT * FROM " + tableName);
Expression e2 = PhoenixRuntime.getTenantIdExpression(tsconn, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
HTableInterface htable2 = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
assertTenantIds(e2, htable2, getUserTableAndViewsFilter(), new String[] {"", "t1"} );
Expression e3 = PhoenixRuntime.getTenantIdExpression(conn, tableName);
HTableInterface htable3 = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName));
assertTenantIds(e3, htable3, new FirstKeyOnlyFilter(), new String[] {"t1", "t2"} );
conn.createStatement().execute("CREATE TABLE BAS (k1 VARCHAR PRIMARY KEY)");
Expression e4 = PhoenixRuntime.getTenantIdExpression(conn, "BAS");
assertNull(e4);
tsconn.createStatement().execute("CREATE INDEX I1 ON A.BAR(V1)");
Expression e5 = PhoenixRuntime.getTenantIdExpression(tsconn, "A.I1");
HTableInterface htable5 = tsconn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX + tableName));
assertTenantIds(e5, htable5, new FirstKeyOnlyFilter(), new String[] {"t1"} );
conn.createStatement().execute("CREATE INDEX I2 ON " + tableName + "(k2)");
Expression e6 = PhoenixRuntime.getTenantIdExpression(conn, "I2");
HTableInterface htable6 = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("I2"));
assertTenantIds(e6, htable6, new FirstKeyOnlyFilter(), new String[] {"t1", "t2"} );
}
@Test
public void testMultiCFProjection() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
String ddl = "CREATE TABLE multiCF (k integer primary key, a.a varchar, b.b varchar)";
conn.createStatement().execute(ddl);
String query = "SELECT COUNT(*) FROM multiCF";
QueryPlan plan = getQueryPlan(query,Collections.emptyList());
plan.iterator();
Scan scan = plan.getContext().getScan();
assertTrue(scan.getFilter() instanceof FirstKeyOnlyFilter);
assertEquals(1, scan.getFamilyMap().size());
}
public static List<String> getTableEncodedRegionNamesForSerialReplication(Connection conn,
TableName tableName) throws IOException {
List<String> list = new ArrayList<>();
scanMeta(conn,
ClientMetaTableAccessor.getTableStartRowForMeta(tableName, QueryType.REPLICATION),
ClientMetaTableAccessor.getTableStopRowForMeta(tableName, QueryType.REPLICATION),
QueryType.REPLICATION, new FirstKeyOnlyFilter(), Integer.MAX_VALUE, r -> {
list.add(RegionInfo.encodeRegionName(r.getRow()));
return true;
});
return list;
}
/**
* Tries to scan a row from passed region
*/
private void isSuccessfulScan(RegionInfo region) throws IOException {
Scan scan = new Scan().withStartRow(region.getStartKey()).setRaw(true).setOneRowLimit()
.setMaxResultSize(1L).setCaching(1).setFilter(new FirstKeyOnlyFilter())
.setCacheBlocks(false);
try (Table table = conn.getTable(region.getTable());
ResultScanner scanner = table.getScanner(scan)) {
scanner.next();
} catch (IOException e) {
LOG.error("Could not scan region:" + region.getEncodedName(), e);
throw e;
}
}
@Test
public void testFilterAllRecords() throws IOException {
Scan scan = new Scan();
scan.setBatch(1);
scan.setCaching(1);
// Filter out any records
scan.setFilter(new FilterList(new FirstKeyOnlyFilter(), new InclusiveStopFilter(new byte[0])));
try (Table table = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
try (ResultScanner s = table.getScanner(scan)) {
assertNull(s.next());
}
}
}
@Test
public void testReverseScanWithoutPadding() throws Exception {
byte[] row1 = Bytes.toBytes("a");
byte[] row2 = Bytes.toBytes("ab");
byte[] row3 = Bytes.toBytes("b");
Put put1 = new Put(row1);
put1.addColumn(cfName, cqName, HConstants.EMPTY_BYTE_ARRAY);
Put put2 = new Put(row2);
put2.addColumn(cfName, cqName, HConstants.EMPTY_BYTE_ARRAY);
Put put3 = new Put(row3);
put3.addColumn(cfName, cqName, HConstants.EMPTY_BYTE_ARRAY);
region.put(put1);
region.put(put2);
region.put(put3);
region.flush(true);
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setReversed(true);
scan.setFilter(new FirstKeyOnlyFilter());
scan.addFamily(cfName);
RegionScanner scanner = region.getScanner(scan);
List<Cell> res = new ArrayList<>();
int count = 1;
while (scanner.next(res)) {
count++;
}
assertEquals("b", Bytes.toString(res.get(0).getRowArray(), res.get(0).getRowOffset(),
res.get(0).getRowLength()));
assertEquals("ab", Bytes.toString(res.get(1).getRowArray(), res.get(1).getRowOffset(),
res.get(1).getRowLength()));
assertEquals("a", Bytes.toString(res.get(2).getRowArray(), res.get(2).getRowOffset(),
res.get(2).getRowLength()));
assertEquals(3, count);
}
@Test
public void testReverseScanWithPadding() throws Exception {
byte[] terminator = new byte[] { -1 };
byte[] row1 = Bytes.add(invert(Bytes.toBytes("a")), terminator);
byte[] row2 = Bytes.add(invert(Bytes.toBytes("ab")), terminator);
byte[] row3 = Bytes.add(invert(Bytes.toBytes("b")), terminator);
Put put1 = new Put(row1);
put1.addColumn(cfName, cqName, HConstants.EMPTY_BYTE_ARRAY);
Put put2 = new Put(row2);
put2.addColumn(cfName, cqName, HConstants.EMPTY_BYTE_ARRAY);
Put put3 = new Put(row3);
put3.addColumn(cfName, cqName, HConstants.EMPTY_BYTE_ARRAY);
region.put(put1);
region.put(put2);
region.put(put3);
region.flush(true);
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setReversed(true);
scan.setFilter(new FirstKeyOnlyFilter());
scan.addFamily(cfName);
RegionScanner scanner = region.getScanner(scan);
List<Cell> res = new ArrayList<>();
int count = 1;
while (scanner.next(res)) {
count++;
}
assertEquals(3, count);
}
/**
* Test partial Result re-assembly in the presence of different filters. The Results from the
* partial scanner should match the Results returned from a scanner that receives all of the
* results in one RPC to the server. The partial scanner is tested with a variety of different
* result sizes (all of which are less than the size necessary to fetch an entire row)
* @throws Exception
*/
@Test
public void testPartialResultsWithColumnFilter() throws Exception {
testPartialResultsWithColumnFilter(new FirstKeyOnlyFilter());
testPartialResultsWithColumnFilter(new ColumnPrefixFilter(Bytes.toBytes("testQualifier5")));
testPartialResultsWithColumnFilter(new ColumnRangeFilter(Bytes.toBytes("testQualifer1"), true,
Bytes.toBytes("testQualifier7"), true));
Set<byte[]> qualifiers = new LinkedHashSet<>();
qualifiers.add(Bytes.toBytes("testQualifier5"));
testPartialResultsWithColumnFilter(new FirstKeyValueMatchingQualifiersFilter(qualifiers));
}
@Test
public void testMayHaveMoreCellsInRowReturnsTrueAndSetBatch() throws IOException {
Table table = createTestTable(TableName.valueOf(name.getMethodName()), ROWS, FAMILIES,
QUALIFIERS, VALUE);
Scan scan = new Scan();
scan.setBatch(1);
scan.setFilter(new FirstKeyOnlyFilter());
ResultScanner scanner = table.getScanner(scan);
Result result;
while ((result = scanner.next()) != null) {
assertTrue(result.rawCells() != null);
assertEquals(1, result.rawCells().length);
}
}
/**
* Sets filter {@link FilterBase} to the {@link Scan} instance.
* If provided rowRangeList contains more than one element,
* method sets filter which is instance of {@link MultiRowRangeFilter}.
* Otherwise, method sets filter which is instance of {@link FirstKeyOnlyFilter}.
* If rowRangeList contains exactly one element, startRow and stopRow are set to the scan.
* @param scan
* @param rowRangeList
*/
private static void setScanFilter(Scan scan, List<MultiRowRangeFilter.RowRange> rowRangeList) {
final int size = rowRangeList == null ? 0 : rowRangeList.size();
if (size <= 1) {
scan.setFilter(new FirstKeyOnlyFilter());
}
if (size == 1) {
MultiRowRangeFilter.RowRange range = rowRangeList.get(0);
scan.withStartRow(range.getStartRow()); //inclusive
scan.withStopRow(range.getStopRow()); //exclusive
} else if (size > 1) {
scan.setFilter(new MultiRowRangeFilter(rowRangeList));
}
}
@Test
public void testFirstKeyOnlyFilter() throws Exception {
Scan s = new Scan();
s.setFilter(new FirstKeyOnlyFilter());
// Expected KVs, the first KV from each of the remaining 6 rows
KeyValue [] kvs = {
new KeyValue(ROWS_ONE[0], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
new KeyValue(ROWS_ONE[2], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
new KeyValue(ROWS_ONE[3], FAMILIES[0], QUALIFIERS_ONE[0], VALUES[0]),
new KeyValue(ROWS_TWO[0], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
new KeyValue(ROWS_TWO[2], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1]),
new KeyValue(ROWS_TWO[3], FAMILIES[0], QUALIFIERS_TWO[0], VALUES[1])
};
verifyScanFull(s, kvs);
}
/**
* Returns a count of the rows in the region where this coprocessor is loaded.
*/
@Override
public void getRowCount(RpcController controller, CountRequest request,
RpcCallback<CountResponse> done) {
Scan scan = new Scan();
scan.setFilter(new FirstKeyOnlyFilter());
CountResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<>();
boolean hasMore = false;
byte[] lastRow = null;
long count = 0;
do {
hasMore = scanner.next(results);
for (Cell kv : results) {
byte[] currentRow = CellUtil.cloneRow(kv);
if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
lastRow = currentRow;
count++;
}
}
results.clear();
} while (hasMore);
response = CountResponse.newBuilder()
.setCount(count).build();
} catch (IOException ioe) {
CoprocessorRpcUtils.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
public static void main(String[] args) {
CountOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(CountOptions.class);
String PROJECT_ID = options.getBigtableProjectId();
String INSTANCE_ID = options.getBigtableInstanceId();
String TABLE_ID = options.getBigtableTableId();
// [START bigtable_dataflow_connector_scan_config]
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setFilter(new FirstKeyOnlyFilter());
// CloudBigtableTableConfiguration contains the project, zone, cluster and table to connect to.
// You can supply an optional Scan() to filter the rows that will be read.
CloudBigtableScanConfiguration config =
new CloudBigtableScanConfiguration.Builder()
.withProjectId(PROJECT_ID)
.withInstanceId(INSTANCE_ID)
.withTableId(TABLE_ID)
.withScan(scan)
.build();
Pipeline p = Pipeline.create(options);
p.apply(Read.from(CloudBigtableIO.read(config)))
.apply(Count.<Result>globally())
.apply(ParDo.of(stringifier))
.apply(TextIO.write().to(options.getResultLocation()));
// [END bigtable_dataflow_connector_scan_config]
p.run().waitUntilFinish();
// Once this is done, you can get the result file via "gsutil cp <resultLocation>-00000-of-00001"
}
private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env) throws IOException {
boolean oldCoproc = region.getTableDescriptor().hasCoprocessor(Indexer.class.getCanonicalName());
byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
IndexTool.IndexVerifyType verifyType = (valueBytes != null) ?
IndexTool.IndexVerifyType.fromValue(valueBytes):IndexTool.IndexVerifyType.NONE;
if(oldCoproc && verifyType == IndexTool.IndexVerifyType.ONLY) {
return new IndexerRegionScanner(innerScanner, region, scan, env);
}
if (!scan.isRaw()) {
Scan rawScan = new Scan(scan);
rawScan.setRaw(true);
rawScan.setMaxVersions();
rawScan.getFamilyMap().clear();
// For rebuilds we use count (*) as query for regular tables which ends up setting the FKOF on scan
// This filter doesn't give us all columns and skips to the next row as soon as it finds 1 col
// For rebuilds we need all columns and all versions
if (scan.getFilter() instanceof FirstKeyOnlyFilter) {
rawScan.setFilter(null);
} else if (scan.getFilter() != null) {
// Override the filter so that we get all versions
rawScan.setFilter(new AllVersionsIndexRebuildFilter(scan.getFilter()));
}
rawScan.setCacheBlocks(false);
for (byte[] family : scan.getFamilyMap().keySet()) {
rawScan.addFamily(family);
}
innerScanner.close();
RegionScanner scanner = region.getScanner(rawScan);
return new IndexRebuildRegionScanner(scanner, region, scan, env, this);
}
return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
}