下面列出了org.apache.hadoop.hbase.client.ResultScanner#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void rowFilterTest() {
Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("rowkey1")));
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE, Arrays.asList(filter));
ResultScanner scanner = HBaseUtil
.getScanner("FileTable", "rowkey1", "rowkey3", filterList);
if (scanner != null) {
scanner.forEach(result -> {
System.out.println("rowkey=" + Bytes.toString(result.getRow()));
System.out.println("fileName=" + Bytes
.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name"))));
});
scanner.close();
}
}
/**
* Returns all rows from the hbase:meta table for a given user table
*
* @throws IOException When reading the rows fails.
*/
public List<byte[]> getMetaTableRows(TableName tableName) throws IOException {
// TODO: Redo using MetaTableAccessor.
Table t = getConnection().getTable(TableName.META_TABLE_NAME);
List<byte[]> rows = new ArrayList<>();
ResultScanner s = t.getScanner(new Scan());
for (Result result : s) {
RegionInfo info = CatalogFamilyFormat.getRegionInfo(result);
if (info == null) {
LOG.error("No region info for row " + Bytes.toString(result.getRow()));
// TODO figure out what to do for this new hosed case.
continue;
}
if (info.getTable().equals(tableName)) {
LOG.info("getMetaTableRows: row -> " +
Bytes.toStringBinary(result.getRow()) + info);
rows.add(result.getRow());
}
}
s.close();
t.close();
return rows;
}
@Override
public List<TResult> getScannerResults(ByteBuffer table, TScan scan, int numRows)
throws TIOError, TException {
Table htable = getTable(table);
List<TResult> results = null;
ResultScanner scanner = null;
try {
scanner = htable.getScanner(scanFromThrift(scan));
results = resultsFromHBase(scanner.next(numRows));
} catch (IOException e) {
throw getTIOError(e);
} finally {
if (scanner != null) {
scanner.close();
}
closeTable(htable);
}
return results;
}
@Override
boolean testRow(int i) throws IOException {
byte[] value = generateData(this.rand, getValueLength(this.rand));
Scan scan = constructScan(value);
ResultScanner scanner = null;
try {
scanner = this.table.getScanner(scan);
for (Result r = null; (r = scanner.next()) != null;) {
updateValueSize(r);
}
} finally {
if (scanner != null) {
updateScanMetrics(scanner.getScanMetrics());
scanner.close();
}
}
return true;
}
@Test
public void testCheckpointRollback() throws Exception {
// start a transaction, using checkpoints between writes
transactionContext.start();
transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
transactionContext.checkpoint();
transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
transactionContext.checkpoint();
transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
transactionContext.abort();
transactionContext.start();
verifyRow(transactionAwareHTable, TestBytes.row, null);
verifyRow(transactionAwareHTable, TestBytes.row2, null);
verifyRow(transactionAwareHTable, TestBytes.row3, null);
Scan scan = new Scan();
ResultScanner scanner = transactionAwareHTable.getScanner(scan);
assertNull(scanner.next());
scanner.close();
transactionContext.finish();
}
/**
* Ensure that we only see Results marked as partial when the allowPartial flag is set
* @throws Exception
*/
@Test
public void testAllowPartialResults() throws Exception {
Scan scan = new Scan();
scan.setAllowPartialResults(true);
scan.setMaxResultSize(1);
ResultScanner scanner = TABLE.getScanner(scan);
Result result = scanner.next();
assertTrue(result != null);
assertTrue(result.mayHaveMoreCellsInRow());
assertTrue(result.rawCells() != null);
assertTrue(result.rawCells().length == 1);
scanner.close();
scan.setAllowPartialResults(false);
scanner = TABLE.getScanner(scan);
result = scanner.next();
assertTrue(result != null);
assertTrue(!result.mayHaveMoreCellsInRow());
assertTrue(result.rawCells() != null);
assertTrue(result.rawCells().length == NUM_COLS);
scanner.close();
}
@Test
public void testCheckpointInvalidate() throws Exception {
// start a transaction, using checkpoints between writes
transactionContext.start();
Transaction origTx = transactionContext.getCurrentTransaction();
transactionAwareHTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
transactionContext.checkpoint();
Transaction checkpointTx1 = transactionContext.getCurrentTransaction();
transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
transactionContext.checkpoint();
Transaction checkpointTx2 = transactionContext.getCurrentTransaction();
transactionAwareHTable.put(new Put(TestBytes.row3).add(TestBytes.family, TestBytes.qualifier, TestBytes.value));
TransactionSystemClient txClient = new InMemoryTxSystemClient(txManager);
txClient.invalidate(transactionContext.getCurrentTransaction().getTransactionId());
// check that writes are not visible
TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
txContext2.start();
Transaction newTx = txContext2.getCurrentTransaction();
// all 3 writes pointers from the previous transaction should now be excluded
assertTrue(newTx.isExcluded(origTx.getWritePointer()));
assertTrue(newTx.isExcluded(checkpointTx1.getWritePointer()));
assertTrue(newTx.isExcluded(checkpointTx2.getWritePointer()));
verifyRow(txTable2, TestBytes.row, null);
verifyRow(txTable2, TestBytes.row2, null);
verifyRow(txTable2, TestBytes.row3, null);
Scan scan = new Scan();
ResultScanner scanner = txTable2.getScanner(scan);
assertNull(scanner.next());
scanner.close();
txContext2.finish();
}
/**
* Retrieves all events added after the given event key (with sequence numbers
* greater than the given key). If no new events are found returns an empty
* list.
* @param lastSeen
* @return
*/
public List<FlowEvent> getFlowEventsSince(FlowEventKey lastSeen)
throws IOException {
// rows must match the FlowKey portion + SEP
byte[] keyPrefix =
Bytes.add(flowKeyConverter.toBytes(lastSeen), Constants.SEP_BYTES);
// start at the next following sequence number
FlowEventKey nextEvent = new FlowEventKey(lastSeen.getCluster(),
lastSeen.getUserName(), lastSeen.getAppId(), lastSeen.getRunId(),
lastSeen.getSequence() + 1);
byte[] startKey = keyConverter.toBytes(nextEvent);
Scan scan = new Scan(startKey);
scan.setFilter(new WhileMatchFilter(new PrefixFilter(keyPrefix)));
List<FlowEvent> results = new ArrayList<FlowEvent>();
ResultScanner scanner = null;
Table eventTable = null;
try {
eventTable = hbaseConnection
.getTable(TableName.valueOf(Constants.FLOW_EVENT_TABLE));
scanner = eventTable.getScanner(scan);
for (Result r : scanner) {
FlowEvent event = createEventFromResult(r);
if (event != null) {
results.add(event);
}
}
} finally {
try {
if (scanner != null) {
scanner.close();
}
} finally {
if (eventTable != null) {
eventTable.close();
}
}
}
return results;
}
@Test
public void testRead() throws Exception {
// get action
AccessTestAction getAction = new AccessTestAction() {
@Override
public Object run() throws Exception {
Get g = new Get(TEST_ROW);
g.addFamily(TEST_FAMILY);
try(Connection conn = ConnectionFactory.createConnection(conf);
Table t = conn.getTable(TEST_TABLE)) {
t.get(g);
}
return null;
}
};
verifyRead(getAction);
// action for scanning
AccessTestAction scanAction = new AccessTestAction() {
@Override
public Object run() throws Exception {
Scan s = new Scan();
s.addFamily(TEST_FAMILY);
try(Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TEST_TABLE)) {
ResultScanner scanner = table.getScanner(s);
try {
for (Result r = scanner.next(); r != null; r = scanner.next()) {
// do nothing
}
} finally {
scanner.close();
}
}
return null;
}
};
verifyRead(scanAction);
}
/**
* Load all permissions from the region server holding {@code _acl_},
* primarily intended for testing purposes.
*/
static Map<byte[], ListMultimap<String, UserPermission>> loadAll(
Configuration conf) throws IOException {
Map<byte[], ListMultimap<String, UserPermission>> allPerms =
new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
// do a full scan of _acl_, filtering on only first table region rows
Scan scan = new Scan();
scan.addFamily(ACL_LIST_FAMILY);
ResultScanner scanner = null;
// TODO: Pass in a Connection rather than create one each time.
try (Connection connection = ConnectionFactory.createConnection(conf)) {
try (Table table = connection.getTable(ACL_TABLE_NAME)) {
scanner = table.getScanner(scan);
try {
for (Result row : scanner) {
ListMultimap<String, UserPermission> resultPerms =
parsePermissions(row.getRow(), row, null, null, null, false);
allPerms.put(row.getRow(), resultPerms);
}
} finally {
if (scanner != null) {
scanner.close();
}
}
}
}
return allPerms;
}
private static int addToEachStartKey(final int expected) throws IOException {
Table t = TEST_UTIL.getConnection().getTable(TABLENAME);
Table meta = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
int rows = 0;
Scan scan = new Scan();
scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
ResultScanner s = meta.getScanner(scan);
for (Result r = null; (r = s.next()) != null;) {
RegionInfo hri = CatalogFamilyFormat.getRegionInfo(r);
if (hri == null) break;
if (!hri.getTable().equals(TABLENAME)) {
continue;
}
// If start key, add 'aaa'.
if(!hri.getTable().equals(TABLENAME)) {
continue;
}
byte [] row = getStartKey(hri);
Put p = new Put(row);
p.setDurability(Durability.SKIP_WAL);
p.addColumn(getTestFamily(), getTestQualifier(), row);
t.put(p);
rows++;
}
s.close();
Assert.assertEquals(expected, rows);
t.close();
meta.close();
return rows;
}
public static int assertScanContentTimestamp(final Table in, final long ts)
throws IOException {
Scan scan = new Scan().withStartRow(HConstants.EMPTY_START_ROW);
scan.addFamily(FAMILY_NAME);
scan.setTimeRange(0, ts);
ResultScanner scanner = in.getScanner(scan);
int count = 0;
try {
// TODO FIX
// HStoreKey key = new HStoreKey();
// TreeMap<byte [], Cell>value =
// new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
// while (scanner.next(key, value)) {
// assertTrue(key.getTimestamp() <= ts);
// // Content matches the key or HConstants.LATEST_TIMESTAMP.
// // (Key does not match content if we 'put' with LATEST_TIMESTAMP).
// long l = Bytes.toLong(value.get(COLUMN).getValue());
// assertTrue(key.getTimestamp() == l ||
// HConstants.LATEST_TIMESTAMP == l);
// count++;
// value.clear();
// }
} finally {
scanner.close();
}
return count;
}
private static List<Result> readTable(String tableId, Scan scan) throws Exception {
ResultScanner scanner = scanTable(tableId, scan);
List<Result> results = new ArrayList<>();
for (Result result : scanner) {
results.add(result);
}
scanner.close();
return results;
}
@Test
public void testDropLocalIndexShouldDeleteDataFromLocalIndexTable() throws Exception {
createBaseTable(TestUtil.DEFAULT_DATA_TABLE_NAME, null, "('e','i','o')");
Connection conn1 = DriverManager.getConnection(getUrl());
try {
conn1.createStatement().execute("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_NAME + " values('b',1,2,4,'z')");
conn1.createStatement().execute("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_NAME + " values('f',1,2,3,'a')");
conn1.createStatement().execute("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_NAME + " values('j',2,4,2,'a')");
conn1.createStatement().execute("UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_NAME + " values('q',3,1,1,'c')");
conn1.commit();
conn1.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_NAME + "(v1)");
conn1.createStatement().execute("DROP INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_NAME);
HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
HTable indexTable = new HTable(admin.getConfiguration() ,TableName.valueOf(MetaDataUtil.getLocalIndexTableName(TestUtil.DEFAULT_DATA_TABLE_NAME)));
Pair<byte[][], byte[][]> startEndKeys = indexTable.getStartEndKeys();
byte[][] startKeys = startEndKeys.getFirst();
byte[][] endKeys = startEndKeys.getSecond();
// No entry should be present in local index table after drop index.
for (int i = 0; i < startKeys.length; i++) {
Scan s = new Scan();
s.setStartRow(startKeys[i]);
s.setStopRow(endKeys[i]);
ResultScanner scanner = indexTable.getScanner(s);
int count = 0;
for(Result r:scanner){
count++;
}
scanner.close();
assertEquals(0, count);
}
indexTable.close();
} finally {
conn1.close();
}
}
protected static void runSmallBatchTest() throws IOException, InterruptedException {
// normal Batch tests
loadData("", row);
Scan scan = new Scan();
ResultScanner scanner1 = htable1.getScanner(scan);
Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
scanner1.close();
assertEquals(NB_ROWS_IN_BATCH, res1.length);
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
}
@Test
public void testBuildIndexWhenUserTableAlreadyHasData() throws Exception {
String tableName = schemaName + "." + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
String indexTableName = schemaName + "." + indexName;
TableName physicalTableName = SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped);
String indexPhysicalTableName = physicalTableName.getNameAsString();
createBaseTable(tableName, null, "('e','i','o')");
Connection conn1 = DriverManager.getConnection(getUrl());
conn1.createStatement().execute("UPSERT INTO "+tableName+" values('b',1,2,4,'z')");
conn1.createStatement().execute("UPSERT INTO "+tableName+" values('f',1,2,3,'z')");
conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')");
conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')");
conn1.commit();
conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexTableName);
assertTrue(rs.next());
assertEquals(4, rs.getInt(1));
Admin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
org.apache.hadoop.hbase.client.Connection hbaseConn = admin.getConnection();
Table indexTable = hbaseConn.getTable(TableName.valueOf(indexPhysicalTableName));
Pair<byte[][], byte[][]> startEndKeys = hbaseConn.getRegionLocator(TableName.valueOf(indexPhysicalTableName)).getStartEndKeys();
byte[][] startKeys = startEndKeys.getFirst();
byte[][] endKeys = startEndKeys.getSecond();
for (int i = 0; i < startKeys.length; i++) {
Scan s = new Scan();
s.addFamily(QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES);
s.setStartRow(startKeys[i]);
s.setStopRow(endKeys[i]);
ResultScanner scanner = indexTable.getScanner(s);
int count = 0;
for(Result r:scanner){
count++;
}
scanner.close();
assertEquals(1, count);
}
indexTable.close();
}
/**
* @param cluster for which to return the last ProcessRecord.
* @param compareOp to apply to the processState argument. If
* {@link CompareOp#NO_OP} is passed, then no filter is used at all,
* and processState argument is ignored.
* @param processState return rows where the compareOp applies.
* @param maxCount the maximum number of results to return.
* @param processFileSubstring return rows where the process file path
* contains this string. If <code>null</code> or empty string, then
* no filtering is applied.
* @return the last process record that is not in {@link ProcessState#CREATED}
* state. Note that no records with a maxModificationTime of 0
* (beginning of time) will be returned
* @throws IOException
*/
public List<ProcessRecord> getProcessRecords(String cluster,
CompareOp compareOp, ProcessState processState, int maxCount,
String processFileSubstring) throws IOException {
Scan scan = new Scan();
// Pull data only for our cluster
scan.setStartRow(
keyConv.toBytes(new ProcessRecordKey(cluster, Long.MAX_VALUE)));
// Records are sorted in reverse order, so the last one for this cluster
// would be the one with a modification time at the beginning of time.
scan.setStopRow(keyConv.toBytes(new ProcessRecordKey(cluster, 0)));
scan.addColumn(Constants.INFO_FAM_BYTES,
Constants.MIN_MOD_TIME_MILLIS_COLUMN_BYTES);
scan.addColumn(Constants.INFO_FAM_BYTES,
Constants.PROCESSED_JOB_FILES_COLUMN_BYTES);
scan.addColumn(Constants.INFO_FAM_BYTES,
Constants.PROCESS_FILE_COLUMN_BYTES);
scan.addColumn(Constants.INFO_FAM_BYTES,
Constants.PROCESSING_STATE_COLUMN_BYTES);
scan.addColumn(Constants.INFO_FAM_BYTES, Constants.MIN_JOB_ID_COLUMN_BYTES);
scan.addColumn(Constants.INFO_FAM_BYTES, Constants.MAX_JOB_ID_COLUMN_BYTES);
scan.setMaxVersions(1);
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
// Filter on process state only when needed.
if (!NO_OP.equals(compareOp)) {
byte[] filterColumnValue = Bytes.toBytes(processState.getCode());
Filter processingStatefilter = new SingleColumnValueFilter(
Constants.INFO_FAM_BYTES, Constants.PROCESSING_STATE_COLUMN_BYTES,
compareOp, filterColumnValue);
filterList.addFilter(processingStatefilter);
}
// Filter on process file only when needed
if (processFileSubstring != null && processFileSubstring.length() > 0) {
SubstringComparator ssc = new SubstringComparator(processFileSubstring);
Filter processFileFilter =
new SingleColumnValueFilter(Constants.INFO_FAM_BYTES,
Constants.PROCESS_FILE_COLUMN_BYTES, EQUAL, ssc);
filterList.addFilter(processFileFilter);
}
// Add filters only if any filter was actually needed.
if (filterList.getFilters().size() > 0) {
scan.setFilter(filterList);
}
ResultScanner scanner = null;
List<ProcessRecord> records = null;
Table processRecordTable = null;
try {
processRecordTable = hbaseConnection
.getTable(TableName.valueOf(Constants.JOB_FILE_PROCESS_TABLE));
scanner = processRecordTable.getScanner(scan);
records = createFromResults(scanner, maxCount);
} finally {
if (scanner != null) {
scanner.close();
}
if (processRecordTable != null) {
processRecordTable.close();
}
}
return records;
}
/**
* Given a min and max jobId, get a {@link Scan} to go through all the records
* loaded in the {@link Constants#HISTORY_RAW_TABLE}, get all the rowkeys and
* create a list of scans with batchSize number of rows in the rawTable.
* <p>
* Note that this can be a somewhat slow operation as the
* {@link Constants#HISTORY_RAW_TABLE} will have to be scanned.
*
* @param cluster on which the Hadoop jobs ran.
* @param minJobId used to start the scan. If null then there is no min limit
* on JobId.
* @param maxJobId used to end the scan (inclusive). If null then there is no
* max limit on jobId.
* @param reprocess Reprocess those records that may have been processed
* already. Otherwise successfully processed jobs are skipped.
* @param batchSize
*
* @return a scan of jobIds between the specified min and max. Retrieves only
* one version of each column.
* @throws IOException
* @throws RowKeyParseException when rows returned from the Raw table do not
* conform to the expected row key.
*/
public List<Scan> getHistoryRawTableScans(String cluster, String minJobId,
String maxJobId, boolean reprocess, int batchSize)
throws IOException, RowKeyParseException {
List<Scan> scans = new LinkedList<Scan>();
// Get all the values in the scan so that we can evenly chop them into
// batch size chunks.
// The problem is that processRecords min and max can have vastly
// overlapping ranges, and in addition, they may have a minJobId of a long
// running Hadoop job that is processed much later. Many jobIds that are
// of shorter jobs that have already been processed will in between the
// min and max, but since the scan returns only the records that are not
// already processed, the returned list may have large gaps.
Scan scan =
getHistoryRawTableScan(cluster, minJobId, maxJobId, reprocess, false);
SortedSet<JobId> orderedJobIds = new TreeSet<JobId>();
ResultScanner scanner = null;
Table rawTable = null;
try {
rawTable = hbaseConnection
.getTable(TableName.valueOf(Constants.HISTORY_RAW_TABLE));
LOG.info("Scanning " + Constants.HISTORY_RAW_TABLE + " table from "
+ minJobId + " to " + maxJobId);
scanner = rawTable.getScanner(scan);
for (Result result : scanner) {
JobId qualifiedJobId = getQualifiedJobIdFromResult(result);
orderedJobIds.add(qualifiedJobId);
}
} finally {
try {
if (scanner != null) {
scanner.close();
}
} finally {
if (rawTable != null) {
rawTable.close();
}
}
}
// Now chop the set into chunks.
List<Range<JobId>> ranges = BatchUtil.getRanges(orderedJobIds, batchSize);
LOG.info("Dividing " + orderedJobIds.size() + " jobs in " + ranges.size()
+ " ranges.");
for (Range<JobId> range : ranges) {
Scan rawScan =
getHistoryRawTableScan(cluster, range.getMin().getJobIdString(),
range.getMax().getJobIdString(), reprocess, true);
scans.add(rawScan);
}
return scans;
}
/**
* Runs a major compaction, and then waits until the compaction is complete before returning.
*
* @param tableName name of the table to be compacted
*/
public static void doMajorCompaction(Connection conn, String tableName) throws Exception {
tableName = SchemaUtil.normalizeIdentifier(tableName);
// We simply write a marker row, request a major compaction, and then wait until the marker
// row is gone
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), tableName));
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
MutationState mutationState = pconn.getMutationState();
if (table.isTransactional()) {
mutationState.startTransaction(table.getTransactionProvider());
}
try (Table htable = mutationState.getHTable(table)) {
byte[] markerRowKey = Bytes.toBytes("TO_DELETE");
Put put = new Put(markerRowKey);
put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
htable.put(put);
Delete delete = new Delete(markerRowKey);
delete.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
htable.delete(delete);
htable.close();
if (table.isTransactional()) {
mutationState.commit();
}
Admin hbaseAdmin = services.getAdmin();
hbaseAdmin.flush(TableName.valueOf(tableName));
hbaseAdmin.majorCompact(TableName.valueOf(tableName));
hbaseAdmin.close();
boolean compactionDone = false;
while (!compactionDone) {
Thread.sleep(6000L);
Scan scan = new Scan();
scan.setStartRow(markerRowKey);
scan.setStopRow(Bytes.add(markerRowKey, new byte[] { 0 }));
scan.setRaw(true);
try (Table htableForRawScan = services.getTable(Bytes.toBytes(tableName))) {
ResultScanner scanner = htableForRawScan.getScanner(scan);
List<Result> results = Lists.newArrayList(scanner);
LOGGER.info("Results: " + results);
compactionDone = results.isEmpty();
scanner.close();
}
LOGGER.info("Compaction done: " + compactionDone);
// need to run compaction after the next txn snapshot has been written so that compaction can remove deleted rows
if (!compactionDone && table.isTransactional()) {
hbaseAdmin = services.getAdmin();
hbaseAdmin.flush(TableName.valueOf(tableName));
hbaseAdmin.majorCompact(TableName.valueOf(tableName));
hbaseAdmin.close();
}
}
}
}
@Before
public void setUpBase() throws Exception {
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
.getRegionServerThreads()) {
UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
}
// Initialize the peer after wal rolling, so that we will abandon the stuck WALs.
super.setUpBase();
int rowCount = UTIL1.countRows(tableName);
UTIL1.deleteTableData(tableName);
// truncating the table will send one Delete per row to the slave cluster
// in an async fashion, which is why we cannot just call deleteTableData on
// utility2 since late writes could make it to the slave in some way.
// Instead, we truncate the first table and wait for all the Deletes to
// make it to the slave.
Scan scan = new Scan();
int lastCount = 0;
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
fail("Waited too much time for truncate");
}
ResultScanner scanner = htable2.getScanner(scan);
Result[] res = scanner.next(rowCount);
scanner.close();
if (res.length != 0) {
if (res.length < lastCount) {
i--; // Don't increment timeout if we make progress
}
lastCount = res.length;
LOG.info("Still got " + res.length + " rows");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
// Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple
// batches. the default max request size is 256M, so all replication entries are in a batch, but
// when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table
// may apply first, and then test_dropped table, and we will believe that the replication is not
// got stuck (HBASE-20475).
CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024);
}