下面列出了org.apache.hadoop.hbase.client.Admin#flush ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private HStore prepareData() throws IOException {
Admin admin = TEST_UTIL.getAdmin();
TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
.setValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
FIFOCompactionPolicy.class.getName())
.setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
DisabledRegionSplitPolicy.class.getName())
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(1).build())
.build();
admin.createTable(desc);
Table table = TEST_UTIL.getConnection().getTable(tableName);
TimeOffsetEnvironmentEdge edge =
(TimeOffsetEnvironmentEdge) EnvironmentEdgeManager.getDelegate();
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
byte[] value = new byte[128 * 1024];
ThreadLocalRandom.current().nextBytes(value);
table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
}
admin.flush(tableName);
edge.increment(1001);
}
return getStoreWithName(tableName);
}
private HStore prepareData() throws IOException {
Admin admin = TEST_UTIL.getAdmin();
if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
Table table = TEST_UTIL.createTable(tableName, family);
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
byte[] value = new byte[128 * 1024];
ThreadLocalRandom.current().nextBytes(value);
table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value));
}
admin.flush(tableName);
}
return getStoreWithName(tableName);
}
/**
* If the durability is set to {@link Durability#SKIP_WAL} and the data is imported to hbase, we
* need to flush all the regions of the table as the data is held in memory and is also not
* present in the Write Ahead Log to replay in scenarios of a crash. This method flushes all the
* regions of the table in the scenarios of import data to hbase with {@link Durability#SKIP_WAL}
*/
public static void flushRegionsIfNecessary(Configuration conf) throws IOException,
InterruptedException {
String tableName = conf.get(TABLE_NAME);
Admin hAdmin = null;
Connection connection = null;
String durability = conf.get(WAL_DURABILITY);
// Need to flush if the data is written to hbase and skip wal is enabled.
if (conf.get(BULK_OUTPUT_CONF_KEY) == null && durability != null
&& Durability.SKIP_WAL.name().equalsIgnoreCase(durability)) {
LOG.info("Flushing all data that skipped the WAL.");
try {
connection = ConnectionFactory.createConnection(conf);
hAdmin = connection.getAdmin();
hAdmin.flush(TableName.valueOf(tableName));
} finally {
if (hAdmin != null) {
hAdmin.close();
}
if (connection != null) {
connection.close();
}
}
}
}
@Override
public void perform() throws Exception {
HBaseTestingUtility util = context.getHBaseIntegrationTestingUtility();
Admin admin = util.getAdmin();
// Don't try the flush if we're stopping
if (context.isStopping()) {
return;
}
getLogger().info("Performing action: Flush table " + tableName);
try {
admin.flush(tableName);
} catch (Exception ex) {
getLogger().warn("Flush failed, might be caused by other chaos: " + ex.getMessage());
}
if (sleepTime > 0) {
Thread.sleep(sleepTime);
}
}
/**
* Test to repro ArrayIndexOutOfBoundException that happens during filtering in BinarySubsetComparator
* only after a flush is performed
* @throws Exception
*/
@Test
public void testFilterOnTrailingKeyColumn() throws Exception {
String tablename=generateUniqueName();
String tenantId = getOrganizationId();
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = DriverManager.getConnection(getUrl(), props);
Admin admin = null;
try {
initTableValues(tablename, tenantId, getSplits(tenantId));
admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
admin.flush(TableName.valueOf(SchemaUtil.getTableNameAsBytes(PRODUCT_METRICS_SCHEMA_NAME,tablename)));
String query = "SELECT SUM(TRANSACTIONS) FROM " + tablename + " WHERE FEATURE=?";
PreparedStatement statement = conn.prepareStatement(query);
statement.setString(1, F1);
ResultSet rs = statement.executeQuery();
assertTrue(rs.next());
assertEquals(1200, rs.getInt(1));
} finally {
if (admin != null) admin.close();
conn.close();
}
}
/**
* Verify that load data- backup - delete some data - restore works as expected - deleted data get
* restored.
*
* @throws Exception if doing the backup or an operation on the tables fails
*/
@Test
public void testBackupDeleteRestore() throws Exception {
LOG.info("test full restore on a single table empty table");
List<TableName> tables = Lists.newArrayList(table1);
String backupId = fullTableBackup(tables);
assertTrue(checkSucceeded(backupId));
LOG.info("backup complete");
int numRows = TEST_UTIL.countRows(table1);
Admin hba = TEST_UTIL.getAdmin();
// delete row
try (Table table = TEST_UTIL.getConnection().getTable(table1)) {
Delete delete = new Delete(Bytes.toBytes("row0"));
table.delete(delete);
hba.flush(table1);
}
TableName[] tableset = new TableName[] { table1 };
TableName[] tablemap = null;// new TableName[] { table1_restore };
BackupAdmin client = getBackupAdmin();
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupId, false,
tableset, tablemap, true));
int numRowsAfterRestore = TEST_UTIL.countRows(table1);
assertEquals(numRows, numRowsAfterRestore);
hba.close();
}
@Test
public void testBasicRegionSizeReports() throws Exception {
final long bytesWritten = 5L * 1024L * 1024L; // 5MB
final TableName tn = writeData(bytesWritten);
LOG.debug("Data was written to HBase");
final Admin admin = TEST_UTIL.getAdmin();
// Push the data to disk.
admin.flush(tn);
LOG.debug("Data flushed to disk");
// Get the final region distribution
final List<RegionInfo> regions = TEST_UTIL.getAdmin().getRegions(tn);
HMaster master = cluster.getMaster();
MasterQuotaManager quotaManager = master.getMasterQuotaManager();
Map<RegionInfo,Long> regionSizes = quotaManager.snapshotRegionSizes();
// Wait until we get all of the region reports for our table
// The table may split, so make sure we have at least as many as expected right after we
// finished writing the data.
int observedRegions = numRegionsForTable(tn, regionSizes);
while (observedRegions < regions.size()) {
LOG.debug("Expecting more regions. Saw " + observedRegions
+ " region sizes reported, expected at least " + regions.size());
Thread.sleep(1000);
regionSizes = quotaManager.snapshotRegionSizes();
observedRegions = numRegionsForTable(tn, regionSizes);
}
LOG.debug("Observed region sizes by the HMaster: " + regionSizes);
long totalRegionSize = 0L;
for (Long regionSize : regionSizes.values()) {
totalRegionSize += regionSize;
}
assertTrue("Expected region size report to exceed " + bytesWritten + ", but was "
+ totalRegionSize + ". RegionSizes=" + regionSizes, bytesWritten < totalRegionSize);
}
@Test
public void testMissingRows() throws Exception {
Connection conn = methodWatcher.createConnection();
conn.setAutoCommit(false);
HBaseTestingUtility testingUtility = new HBaseTestingUtility(HConfiguration.unwrapDelegate());
Admin admin = testingUtility.getAdmin();
conn.createStatement().executeUpdate("CREATE TABLE A (A1 int, a2 int)");
conn.createStatement().executeUpdate("INSERT INTO A VALUES (1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1),(1,1)");
for (int i = 0; i < 10; i++) {
conn.createStatement().executeUpdate("insert into a select * from a --splice-properties useSpark=false\n");
}
String conglomerateNumber = TestUtils.lookupConglomerateNumber(CLASS_NAME, "A", methodWatcher);
final TableName tableName = TableName.valueOf("splice", conglomerateNumber);
admin.flush(tableName);
conn.createStatement().executeUpdate("UPDATE A SET A1 = 2");
PreparedStatement ps = conn.prepareStatement("SELECT a2 FROM A --splice-properties useSpark=true, splits=1\n");
try (ResultSet rs = ps.executeQuery()) {
rs.next();
int numberOfRows = 1;
admin.flush(tableName);
while (rs.next()) {
numberOfRows++;
assertNotNull("Failure at row: " + numberOfRows, rs.getObject(1));
assertEquals(1, rs.getInt(1));
}
}
}
protected static void createTableAndSnapshot(HBaseTestingUtility util, TableName tableName,
String snapshotName, byte[] startRow, byte[] endRow, int numRegions)
throws Exception {
try {
LOG.debug("Ensuring table doesn't exist.");
util.deleteTable(tableName);
} catch(Exception ex) {
// ignore
}
LOG.info("creating table '" + tableName + "'");
if (numRegions > 1) {
util.createTable(tableName, FAMILIES, 1, startRow, endRow, numRegions);
} else {
util.createTable(tableName, FAMILIES);
}
Admin admin = util.getAdmin();
LOG.info("put some stuff in the table");
Table table = util.getConnection().getTable(tableName);
util.loadTable(table, FAMILIES);
Path rootDir = CommonFSUtils.getRootDir(util.getConfiguration());
FileSystem fs = rootDir.getFileSystem(util.getConfiguration());
LOG.info("snapshot");
SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName,
Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true);
LOG.info("load different values");
byte[] value = Bytes.toBytes("after_snapshot_value");
util.loadTable(table, FAMILIES, value);
LOG.info("cause flush to create new files in the region");
admin.flush(tableName);
table.close();
}
private static void initTable(String tableName) throws Exception {
Connection conn = getConnection();
conn.createStatement().execute("CREATE TABLE " + tableName + "("
+ "a VARCHAR PRIMARY KEY, b VARCHAR) "
+ TableDescriptorBuilder.MAX_FILESIZE + "=" + MAX_FILESIZE + ","
+ " SALT_BUCKETS = 4");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?)");
int rowCount = 0;
for (int c1 = MIN_CHAR; c1 <= MAX_CHAR; c1++) {
for (int c2 = MIN_CHAR; c2 <= MAX_CHAR; c2++) {
String pk = Character.toString((char)c1) + Character.toString((char)c2);
stmt.setString(1, pk);
stmt.setString(2, PAYLOAD);
stmt.execute();
rowCount++;
if (rowCount % BATCH_SIZE == 0) {
conn.commit();
}
}
}
conn.commit();
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
Admin admin = services.getAdmin();
try {
admin.flush(TableName.valueOf(tableName));
} finally {
admin.close();
}
conn.close();
}
private void insertData(final byte[] tableName, Admin admin, Table t) throws IOException,
InterruptedException {
Put p = new Put(Bytes.toBytes("row"));
p.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("q1"), Bytes.toBytes("value1"));
p.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("q2"), Bytes.toBytes("value2"));
t.put(p);
admin.flush(TableName.valueOf(tableName));
}
@Test
public void testIncrementalFileArchiving() throws Exception {
final Admin admin = TEST_UTIL.getAdmin();
final TableName tn = TableName.valueOf(testName.getMethodName());
if (admin.tableExists(tn)) {
admin.disableTable(tn);
admin.deleteTable(tn);
}
final Table quotaTable = conn.getTable(QuotaUtil.QUOTA_TABLE_NAME);
final TableName tn1 = helper.createTableWithRegions(1);
admin.setQuota(QuotaSettingsFactory.limitTableSpace(
tn1, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS));
// Write some data and flush it
helper.writeData(tn1, 256L * SpaceQuotaHelperForTests.ONE_KILOBYTE);
admin.flush(tn1);
// Create a snapshot on the table
final String snapshotName1 = tn1 + "snapshot1";
admin.snapshot(new SnapshotDescription(snapshotName1, tn1, SnapshotType.SKIPFLUSH));
FileArchiverNotifierImpl notifier = new FileArchiverNotifierImpl(conn, conf, fs, tn);
long t1 = notifier.getLastFullCompute();
long snapshotSize = notifier.computeAndStoreSnapshotSizes(Arrays.asList(snapshotName1));
assertEquals("The size of the snapshots should be zero", 0, snapshotSize);
assertTrue("Last compute time was not less than current compute time",
t1 < notifier.getLastFullCompute());
// No recently archived files and the snapshot should have no size
assertEquals(0, extractSnapshotSize(quotaTable, tn, snapshotName1));
// Invoke the addArchivedFiles method with no files
notifier.addArchivedFiles(Collections.emptySet());
// The size should not have changed
assertEquals(0, extractSnapshotSize(quotaTable, tn, snapshotName1));
notifier.addArchivedFiles(ImmutableSet.of(entry("a", 1024L), entry("b", 1024L)));
// The size should not have changed
assertEquals(0, extractSnapshotSize(quotaTable, tn, snapshotName1));
// Pull one file referenced by the snapshot out of the manifest
Set<String> referencedFiles = getFilesReferencedBySnapshot(snapshotName1);
assertTrue("Found snapshot referenced files: " + referencedFiles, referencedFiles.size() >= 1);
String referencedFile = Iterables.getFirst(referencedFiles, null);
assertNotNull(referencedFile);
// Report that a file this snapshot referenced was moved to the archive. This is a sign
// that the snapshot should now "own" the size of this file
final long fakeFileSize = 2048L;
notifier.addArchivedFiles(ImmutableSet.of(entry(referencedFile, fakeFileSize)));
// Verify that the snapshot owns this file.
assertEquals(fakeFileSize, extractSnapshotSize(quotaTable, tn, snapshotName1));
// In reality, we did not actually move the file, so a "full" computation should re-set the
// size of the snapshot back to 0.
long t2 = notifier.getLastFullCompute();
snapshotSize = notifier.computeAndStoreSnapshotSizes(Arrays.asList(snapshotName1));
assertEquals(0, snapshotSize);
assertEquals(0, extractSnapshotSize(quotaTable, tn, snapshotName1));
// We should also have no recently archived files after a re-computation
assertTrue("Last compute time was not less than current compute time",
t2 < notifier.getLastFullCompute());
}
@Test
public void testBulkLoad() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
long l = System.currentTimeMillis();
Admin admin = TEST_UTIL.getAdmin();
createTable(admin, tableName);
Scan scan = createScan();
final Table table = init(admin, l, scan, tableName);
// use bulkload
final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file",
false);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
BulkLoadHFiles.create(conf).bulkLoad(tableName, hfilePath);
ResultScanner scanner = table.getScanner(scan);
Result result = scanner.next();
result = scanAfterBulkLoad(scanner, result, "version2");
Put put0 = new Put(Bytes.toBytes("row1"));
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version3")));
table.put(put0);
admin.flush(tableName);
scanner = table.getScanner(scan);
result = scanner.next();
while (result != null) {
List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
for (Cell _c : cells) {
if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
.equals("row1")) {
System.out
.println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
_c.getQualifierLength()));
System.out.println(
Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
Assert.assertEquals("version3",
Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
}
}
result = scanner.next();
}
scanner.close();
table.close();
}
@Test
public void testBulkLoadNativeHFile() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
long l = System.currentTimeMillis();
Admin admin = TEST_UTIL.getAdmin();
createTable(admin, tableName);
Scan scan = createScan();
final Table table = init(admin, l, scan, tableName);
// use bulkload
final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/",
"/temp/testBulkLoadNativeHFile/col/file", true);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
BulkLoadHFiles.create(conf).bulkLoad(tableName, hfilePath);
ResultScanner scanner = table.getScanner(scan);
Result result = scanner.next();
// We had 'version0', 'version1' for 'row1,col:q' in the table.
// Bulk load added 'version2' scanner should be able to see 'version2'
result = scanAfterBulkLoad(scanner, result, "version2");
Put put0 = new Put(Bytes.toBytes("row1"));
put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
.toBytes("version3")));
table.put(put0);
admin.flush(tableName);
scanner = table.getScanner(scan);
result = scanner.next();
while (result != null) {
List<Cell> cells = result.getColumnCells(Bytes.toBytes("col"), Bytes.toBytes("q"));
for (Cell _c : cells) {
if (Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength())
.equals("row1")) {
System.out
.println(Bytes.toString(_c.getRowArray(), _c.getRowOffset(), _c.getRowLength()));
System.out.println(Bytes.toString(_c.getQualifierArray(), _c.getQualifierOffset(),
_c.getQualifierLength()));
System.out.println(
Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
Assert.assertEquals("version3",
Bytes.toString(_c.getValueArray(), _c.getValueOffset(), _c.getValueLength()));
}
}
result = scanner.next();
}
scanner.close();
table.close();
}
@Test
public void testTags() throws Exception {
Table table = null;
try {
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
byte[] fam = Bytes.toBytes("info");
byte[] row = Bytes.toBytes("rowa");
// column names
byte[] qual = Bytes.toBytes("qual");
byte[] row1 = Bytes.toBytes("rowb");
byte[] row2 = Bytes.toBytes("rowc");
TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
familyDescriptor.setBlockCacheEnabled(true);
familyDescriptor.setDataBlockEncoding(DataBlockEncoding.NONE);
tableDescriptor.setColumnFamily(familyDescriptor);
Admin admin = TEST_UTIL.getAdmin();
admin.createTable(tableDescriptor);
byte[] value = Bytes.toBytes("value");
table = TEST_UTIL.getConnection().getTable(tableName);
Put put = new Put(row);
put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
put.setAttribute("visibility", Bytes.toBytes("myTag"));
table.put(put);
admin.flush(tableName);
// We are lacking an API for confirming flush request compaction.
// Just sleep for a short time. We won't be able to confirm flush
// completion but the test won't hang now or in the future if
// default compaction policy causes compaction between flush and
// when we go to confirm it.
Thread.sleep(1000);
Put put1 = new Put(row1);
byte[] value1 = Bytes.toBytes("1000dfsdf");
put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
// put1.setAttribute("visibility", Bytes.toBytes("myTag3"));
table.put(put1);
admin.flush(tableName);
Thread.sleep(1000);
Put put2 = new Put(row2);
byte[] value2 = Bytes.toBytes("1000dfsdf");
put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
table.put(put2);
admin.flush(tableName);
Thread.sleep(1000);
result(fam, row, qual, row2, table, value, value2, row1, value1);
admin.compact(tableName);
while (admin.getCompactionState(tableName) != CompactionState.NONE) {
Thread.sleep(10);
}
result(fam, row, qual, row2, table, value, value2, row1, value1);
} finally {
if (table != null) {
table.close();
}
}
}
@Test
public void testCompactionAfterRefresh() throws Exception {
Admin admin = util.getAdmin();
table = util.createTable(TEST_TABLE, TEST_FAMILY);
try {
// Create Multiple store files
Put puta = new Put(ROW_A);
puta.addColumn(TEST_FAMILY, qualifierCol1, bytes1);
table.put(puta);
admin.flush(TEST_TABLE);
Put putb = new Put(ROW_B);
putb.addColumn(TEST_FAMILY, qualifierCol1, bytes2);
table.put(putb);
admin.flush(TEST_TABLE);
Put putc = new Put(ROW_C);
putc.addColumn(TEST_FAMILY, qualifierCol1, bytes3);
table.put(putc);
admin.flush(TEST_TABLE);
admin.compact(TEST_TABLE);
while (admin.getCompactionState(TEST_TABLE) != CompactionState.NONE) {
Thread.sleep(1000);
}
table.put(putb);
HRegion hr1 = (HRegion) util.getRSForFirstRegionInTable(TEST_TABLE)
.getRegionByEncodedName(admin.getRegions(TEST_TABLE).get(0).getEncodedName());
// Refresh store files post compaction, this should not open already compacted files
hr1.refreshStoreFiles(true);
// Archive the store files and try another compaction to see if all is good
for (HStore store : hr1.getStores()) {
store.closeAndArchiveCompactedFiles();
}
try {
hr1.compact(false);
} catch (IOException e) {
LOG.error("Got an exception during compaction", e);
if (e instanceof FileNotFoundException) {
Assert.fail("Got a FNFE during compaction");
} else {
Assert.fail();
}
}
} finally {
if (admin != null) {
admin.close();
}
}
}
@Test
public void testNoDuplicateResultsWhenSplitting() throws Exception {
TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");
String snapshotName = "testSnapshotBug";
try {
if (UTIL.getAdmin().tableExists(tableName)) {
UTIL.deleteTable(tableName);
}
UTIL.createTable(tableName, FAMILIES);
Admin admin = UTIL.getAdmin();
// put some stuff in the table
Table table = UTIL.getConnection().getTable(tableName);
UTIL.loadTable(table, FAMILIES);
// split to 2 regions
admin.split(tableName, Bytes.toBytes("eee"));
TestTableSnapshotScanner.blockUntilSplitFinished(UTIL, tableName, 2);
Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
null, snapshotName, rootDir, fs, true);
// load different values
byte[] value = Bytes.toBytes("after_snapshot_value");
UTIL.loadTable(table, FAMILIES, value);
// cause flush to create new files in the region
admin.flush(tableName);
table.close();
Job job = new Job(UTIL.getConfiguration());
Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
// limit the scan
Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job, false,
tmpTableDir);
verifyWithMockedMapReduce(job, 2, 2, getStartRow(), getEndRow());
} finally {
UTIL.getAdmin().deleteSnapshot(snapshotName);
UTIL.deleteTable(tableName);
}
}
@Override
protected void runIngestTest(long defaultRunTime, long keysPerServerPerIter, int colsPerKey,
int recordSize, int writeThreads, int readThreads) throws Exception {
LOG.info("Cluster size:"+
util.getHBaseClusterInterface().getClusterMetrics().getLiveServerMetrics().size());
long start = System.currentTimeMillis();
String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
long runtime = util.getConfiguration().getLong(runtimeKey, defaultRunTime);
long startKey = 0;
long numKeys = getNumKeys(keysPerServerPerIter);
// write data once
LOG.info("Writing some data to the table");
writeData(colsPerKey, recordSize, writeThreads, startKey, numKeys);
// flush the table
LOG.info("Flushing the table");
Admin admin = util.getAdmin();
admin.flush(getTablename());
// re-open the regions to make sure that the replicas are up to date
long refreshTime = conf.getLong(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 0);
if (refreshTime > 0 && refreshTime <= 10000) {
LOG.info("Sleeping " + refreshTime + "ms to ensure that the data is replicated");
Threads.sleep(refreshTime*3);
} else {
LOG.info("Reopening the table");
admin.disableTable(getTablename());
admin.enableTable(getTablename());
}
// We should only start the ChaosMonkey after the readers are started and have cached
// all of the region locations. Because the meta is not replicated, the timebounded reads
// will timeout if meta server is killed.
// We will start the chaos monkey after 1 minute, and since the readers are reading random
// keys, it should be enough to cache every region entry.
long chaosMonkeyDelay = conf.getLong(String.format("%s.%s", TEST_NAME, CHAOS_MONKEY_DELAY_KEY)
, DEFAUL_CHAOS_MONKEY_DELAY);
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
LOG.info(String.format("ChaosMonkey delay is : %d seconds. Will start %s " +
"ChaosMonkey after delay", chaosMonkeyDelay / 1000, monkeyToUse));
ScheduledFuture<?> result = executorService.schedule(new Runnable() {
@Override
public void run() {
try {
LOG.info("Starting ChaosMonkey");
monkey.start();
monkey.waitForStop();
} catch (Exception e) {
LOG.warn(StringUtils.stringifyException(e));
}
}
}, chaosMonkeyDelay, TimeUnit.MILLISECONDS);
// set the intended run time for the reader. The reader will do read requests
// to random keys for this amount of time.
long remainingTime = runtime - (System.currentTimeMillis() - start);
if (remainingTime <= 0) {
LOG.error("The amount of time left for the test to perform random reads is "
+ "non-positive. Increase the test execution time via "
+ String.format(RUN_TIME_KEY,
IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName())
+ " or reduce the amount of data written per server via "
+ IntegrationTestTimeBoundedRequestsWithRegionReplicas.class.getSimpleName()
+ "." + IntegrationTestIngest.NUM_KEYS_PER_SERVER_KEY);
throw new IllegalArgumentException("No time remains to execute random reads");
}
LOG.info("Reading random keys from the table for " + remainingTime/60000 + " min");
this.conf.setLong(
String.format(RUN_TIME_KEY, TimeBoundedMultiThreadedReader.class.getSimpleName())
, remainingTime); // load tool shares the same conf
// now start the readers which will run for configured run time
try {
int ret = loadTool.run(getArgsForLoadTestTool("-read", String.format("100:%d", readThreads)
, startKey, numKeys));
if (0 != ret) {
String errorMsg = "Verification failed with error code " + ret;
LOG.error(errorMsg);
Assert.fail(errorMsg);
}
} finally {
if (result != null) result.cancel(false);
monkey.stop("Stopping the test");
monkey.waitForStop();
executorService.shutdown();
}
}
private static int setupTableForSplit(String tableName) throws Exception {
int batchSize = 25;
int maxFileSize = 1024 * 10;
int payLoadSize = 1024;
String payload;
StringBuilder buf = new StringBuilder();
for (int i = 0; i < payLoadSize; i++) {
buf.append('a');
}
payload = buf.toString();
int MIN_CHAR = 'a';
int MAX_CHAR = 'z';
Connection conn = getConnection();
conn.createStatement().execute("CREATE TABLE " + tableName + "("
+ "a VARCHAR PRIMARY KEY, b VARCHAR) "
+ TableDescriptorBuilder.MAX_FILESIZE + "=" + maxFileSize + ","
+ " SALT_BUCKETS = " + NUM_SALT_BUCKETS);
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?)");
int rowCount = 0;
for (int c1 = MIN_CHAR; c1 <= MAX_CHAR; c1++) {
for (int c2 = MIN_CHAR; c2 <= MAX_CHAR; c2++) {
String pk = Character.toString((char)c1) + Character.toString((char)c2);
stmt.setString(1, pk);
stmt.setString(2, payload);
stmt.execute();
rowCount++;
if (rowCount % batchSize == 0) {
conn.commit();
}
}
}
conn.commit();
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
Admin admin = services.getAdmin();
try {
admin.flush(TableName.valueOf(tableName));
} finally {
admin.close();
}
conn.close();
return rowCount;
}
/**
* Runs a major compaction, and then waits until the compaction is complete before returning.
*
* @param tableName name of the table to be compacted
*/
public static void doMajorCompaction(Connection conn, String tableName) throws Exception {
tableName = SchemaUtil.normalizeIdentifier(tableName);
// We simply write a marker row, request a major compaction, and then wait until the marker
// row is gone
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
PTable table = pconn.getTable(new PTableKey(pconn.getTenantId(), tableName));
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
MutationState mutationState = pconn.getMutationState();
if (table.isTransactional()) {
mutationState.startTransaction(table.getTransactionProvider());
}
try (Table htable = mutationState.getHTable(table)) {
byte[] markerRowKey = Bytes.toBytes("TO_DELETE");
Put put = new Put(markerRowKey);
put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
htable.put(put);
Delete delete = new Delete(markerRowKey);
delete.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
htable.delete(delete);
htable.close();
if (table.isTransactional()) {
mutationState.commit();
}
Admin hbaseAdmin = services.getAdmin();
hbaseAdmin.flush(TableName.valueOf(tableName));
hbaseAdmin.majorCompact(TableName.valueOf(tableName));
hbaseAdmin.close();
boolean compactionDone = false;
while (!compactionDone) {
Thread.sleep(6000L);
Scan scan = new Scan();
scan.setStartRow(markerRowKey);
scan.setStopRow(Bytes.add(markerRowKey, new byte[] { 0 }));
scan.setRaw(true);
try (Table htableForRawScan = services.getTable(Bytes.toBytes(tableName))) {
ResultScanner scanner = htableForRawScan.getScanner(scan);
List<Result> results = Lists.newArrayList(scanner);
LOGGER.info("Results: " + results);
compactionDone = results.isEmpty();
scanner.close();
}
LOGGER.info("Compaction done: " + compactionDone);
// need to run compaction after the next txn snapshot has been written so that compaction can remove deleted rows
if (!compactionDone && table.isTransactional()) {
hbaseAdmin = services.getAdmin();
hbaseAdmin.flush(TableName.valueOf(tableName));
hbaseAdmin.majorCompact(TableName.valueOf(tableName));
hbaseAdmin.close();
}
}
}
}