下面列出了org.apache.hadoop.hbase.client.HTable#close ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private static void createHBaseTable1() throws IOException {
// create a table
TableName tableName = TableName.valueOf(TEST_TABLE_1);
createTable(tableName, FAMILIES, SPLIT_KEYS);
// get the HTable instance
HTable table = openTable(tableName);
List<Put> puts = new ArrayList<>();
// add some data
puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
// append rows to table
table.put(puts);
table.close();
}
protected void verifyHBaseCell(String tableName, String rowKey,
String colFamily, String colName, String val) throws IOException {
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(colName));
HTable table = new HTable(new Configuration(
hbaseTestUtil.getConfiguration()), Bytes.toBytes(tableName));
try {
Result r = table.get(get);
byte [] actualVal = r.getValue(Bytes.toBytes(colFamily),
Bytes.toBytes(colName));
if (null == val) {
assertNull("Got a result when expected null", actualVal);
} else {
assertNotNull("No result, but we expected one", actualVal);
assertEquals(val, Bytes.toString(actualVal));
}
} finally {
table.close();
}
}
/**
* 上传对象到LOB
* @param tableName Hyperbase表名
* @param row rowkey byte形式
* @param filename 文件名
* @param fileData 文件
*/
public void putLob(String tableName, String row, String filename, byte[] fileData){
byte[] rowkey = Bytes.toBytes(row);
try {
HTable htable = new HTable(conf, tableName);
Put put = new Put(rowkey);
put.add(Bytes.toBytes(family1), Bytes.toBytes(f1_q1), Bytes.toBytes(filename));
put.add(Bytes.toBytes(family2), Bytes.toBytes(f2_q1), fileData);
htable.put(put);
htable.flushCommits();
htable.close();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
@Before
public void beforeTest() throws Exception {
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
// Prune state table is a non-transactional table, hence no transaction co-processor
Collections.<String>emptyList());
table.close();
connection = HConnectionManager.createConnection(conf);
dataJanitorState =
new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public HTableInterface get() throws IOException {
return connection.getTable(pruneStateTable);
}
});
}
@Before
public void beforeTest() throws Exception {
pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
// Prune state table is a non-transactional table, hence no transaction co-processor
Collections.<String>emptyList());
table.close();
dataJanitorState =
new DataJanitorState(new DataJanitorState.TableSupplier() {
@Override
public Table get() throws IOException {
return testUtil.getConnection().getTable(pruneStateTable);
}
});
}
private static void prepareTable() throws IOException {
// create a table
TableName tableName = TableName.valueOf(TEST_TABLE);
// column families
byte[][] families = new byte[][]{
Bytes.toBytes(FAMILY1),
Bytes.toBytes(FAMILY2),
Bytes.toBytes(FAMILY3)
};
// split keys
byte[][] splitKeys = new byte[][]{ Bytes.toBytes(4) };
createTable(tableName, families, splitKeys);
// get the HTable instance
HTable table = openTable(tableName);
List<Put> puts = new ArrayList<>();
// add some data
puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4"));
puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8"));
// append rows to table
table.put(puts);
table.close();
}
/**
* load from hbase 'TESTTABLE_1' using UTF-8 Plain Text format, and store it
* into 'TESTTABLE_2' using UTF-8 Plain Text format projecting column c
*
* @throws IOException
*/
@Test
public void testStoreToHBase_3_with_projection_no_caster() throws IOException {
prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
prepareTable(TESTTABLE_2, false, DataFormat.UTF8PlainText);
scanTable1(pig, DataFormat.UTF8PlainText);
pig.registerQuery("b = FOREACH a GENERATE rowKey, col_a, col_b;");
pig.store("b", TESTTABLE_2,
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ TESTCOLUMN_A + " " + TESTCOLUMN_B + "')");
HTable table = new HTable(conf, TESTTABLE_2);
ResultScanner scanner = table.getScanner(new Scan());
Iterator<Result> iter = scanner.iterator();
int i = 0;
for (i = 0; iter.hasNext(); ++i) {
Result result = iter.next();
String v = i + "";
String rowKey = new String(result.getRow());
String col_a = new String(getColValue(result, TESTCOLUMN_A));
String col_b = new String(getColValue(result, TESTCOLUMN_B));
Assert.assertEquals("00".substring(v.length()) + v, rowKey);
Assert.assertEquals(i + "", col_a);
Assert.assertEquals(i + 0.0 + "", col_b);
}
Assert.assertEquals(100, i);
table.close();
}
@Test
public void testBuildIndexWhenUserTableAlreadyHasData() throws Exception {
createBaseTable(TestUtil.DEFAULT_DATA_TABLE_NAME, null, "('e','i','o')");
Connection conn1 = DriverManager.getConnection(getUrl());
conn1.createStatement().execute("UPSERT INTO "+TestUtil.DEFAULT_DATA_TABLE_NAME+" values('b',1,2,4,'z')");
conn1.createStatement().execute("UPSERT INTO "+TestUtil.DEFAULT_DATA_TABLE_NAME+" values('f',1,2,3,'z')");
conn1.createStatement().execute("UPSERT INTO "+TestUtil.DEFAULT_DATA_TABLE_NAME+" values('j',2,4,2,'a')");
conn1.createStatement().execute("UPSERT INTO "+TestUtil.DEFAULT_DATA_TABLE_NAME+" values('q',3,1,1,'c')");
conn1.commit();
conn1.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_NAME + "(v1)");
ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + TestUtil.DEFAULT_INDEX_TABLE_NAME);
assertTrue(rs.next());
assertEquals(4, rs.getInt(1));
HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
HTable indexTable = new HTable(admin.getConfiguration() ,TableName.valueOf(MetaDataUtil.getLocalIndexTableName(TestUtil.DEFAULT_DATA_TABLE_NAME)));
Pair<byte[][], byte[][]> startEndKeys = indexTable.getStartEndKeys();
byte[][] startKeys = startEndKeys.getFirst();
byte[][] endKeys = startEndKeys.getSecond();
for (int i = 0; i < startKeys.length; i++) {
Scan s = new Scan();
s.setStartRow(startKeys[i]);
s.setStopRow(endKeys[i]);
ResultScanner scanner = indexTable.getScanner(s);
int count = 0;
for(Result r:scanner){
count++;
}
scanner.close();
assertEquals(1, count);
}
indexTable.close();
}
@SuppressWarnings("rawtypes")
public static void startWrite(RecordReceiver lineReceiver, HTable table, Configuration configuration) {
List<Map> columns = configuration.getList(Key.COLUMN, Map.class);
Integer batchSize = configuration.getInt(Key.BATCH_SIZE, 100);
boolean writeToWAL = configuration.getBool(Key.WRITE_TO_WAL, true);
List<HbaseColumnCell> hbaseColumnCells = parseColumns(columns);
try {
Record record = null;
List<Put> puts = new ArrayList<Put>();
while ((record = lineReceiver.getFromReader()) != null) {
puts.add(getPut(hbaseColumnCells, record, writeToWAL));
if (puts.size() % batchSize == 0) {
table.put(puts);
table.flushCommits();
puts.clear();
}
}
if (!puts.isEmpty()) {
table.put(puts);
table.flushCommits();
}
table.close();
} catch (Exception e) {
String message = String.format("写hbase[%s]时发生IO异常,请检查您的网络是否正常!", table.getName());
LOG.error(message, e);
ErrorRecord.addError(message+"->"+e.getMessage());
throw DataXException.asDataXException(HBaseWriter98ErrorCode.WRITE_HBASE_IO_ERROR, e);
}
}
private void deleteAllRows(String tableName) throws Exception {
HTable table = new HTable(conf, tableName);
ResultScanner scanner = table.getScanner(new Scan());
List<Delete> deletes = Lists.newArrayList();
for (Result row : scanner) {
deletes.add(new Delete(row.getRow()));
}
table.delete(deletes);
table.close();
}
private static void createHBaseTable1() throws IOException {
// create a table
TableName tableName = TableName.valueOf(TEST_TABLE_1);
createTable(tableName, FAMILIES, SPLIT_KEYS);
// get the HTable instance
HTable table = openTable(tableName);
List<Put> puts = new ArrayList<>();
// add some data
puts.add(putRow(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1",
Timestamp.valueOf("2019-08-18 19:00:00"), Date.valueOf("2019-08-18"),
Time.valueOf("19:00:00"), new BigDecimal(12345678.0001)));
puts.add(putRow(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2",
Timestamp.valueOf("2019-08-18 19:01:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:01:00"),
new BigDecimal(12345678.0002)));
puts.add(putRow(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3",
Timestamp.valueOf("2019-08-18 19:02:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:02:00"),
new BigDecimal(12345678.0003)));
puts.add(putRow(4, 40, null, 400L, 4.04, true, "Welt-4",
Timestamp.valueOf("2019-08-18 19:03:00"), Date.valueOf("2019-08-18"), Time.valueOf("19:03:00"),
new BigDecimal(12345678.0004)));
puts.add(putRow(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5",
Timestamp.valueOf("2019-08-19 19:10:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:10:00"),
new BigDecimal(12345678.0005)));
puts.add(putRow(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6",
Timestamp.valueOf("2019-08-19 19:20:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:20:00"),
new BigDecimal(12345678.0006)));
puts.add(putRow(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7",
Timestamp.valueOf("2019-08-19 19:30:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:30:00"),
new BigDecimal(12345678.0007)));
puts.add(putRow(8, 80, null, 800L, 8.08, true, "Welt-8",
Timestamp.valueOf("2019-08-19 19:40:00"), Date.valueOf("2019-08-19"), Time.valueOf("19:40:00"),
new BigDecimal(12345678.0008)));
// append rows to table
table.put(puts);
table.close();
}
public void startHBaseCluster() throws Exception {
if (zkCluster == null) {
startMiniZKCluster();
}
if (hbaseCluster != null) {
return;
}
System.setProperty("HBASE_ZNODE_FILE", testBaseDir + "/hbase_znode_file");
if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
}
if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
}
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, false);
createRootDir();
Configuration c = HBaseConfiguration.create(this.conf);
// randomize hbase info port
c.setInt(HConstants.MASTER_INFO_PORT, 0);
hbaseCluster = new MiniHBaseCluster(c, 1);
// Don't leave here till we've done a successful scan of the hbase:meta
HTable t = new HTable(c, TableName.META_TABLE_NAME);
ResultScanner s = t.getScanner(new Scan());
while (s.next() != null) {
continue;
}
s.close();
t.close();
LOG.info("MiniHBaseCluster started");
}
/**
* Wait for the hbase cluster to start up and come online, and then return.
*
* @param hbaseCluster
* The hbase cluster to wait for.
* @throws IOException
*/
private static void waitForHBaseToComeOnline(MiniHBaseCluster hbaseCluster)
throws IOException, InterruptedException {
// Wait for the master to be initialized. This is required because even
// before it's initialized, the regionserver can come online and the meta
// table can be scannable. If the cluster is quickly shut down after all of
// this before the master is initialized, it can cause the shutdown to hang
// indefinitely as initialization tasks will block forever.
//
// Unfortunately, no method available to wait for master to come online like
// regionservers, so we use a while loop with a sleep so we don't hammer the
// isInitialized method.
while (!hbaseCluster.getMaster().isInitialized()) {
Thread.sleep(1000);
}
// Now wait for the regionserver to come online.
hbaseCluster.getRegionServer(0).waitForServerOnline();
// Don't leave here till we've done a successful scan of the hbase:meta
// This validates that not only is the regionserver up, but that the
// meta region is online so there are no race conditions where operations
// requiring the meta region might run before it's available. Otherwise,
// operations are susceptible to region not online errors.
HTable t = new HTable(hbaseCluster.getConf(), HBASE_META_TABLE);
ResultScanner s = t.getScanner(new Scan());
while (s.next() != null) {
continue;
}
s.close();
t.close();
}
@Test
public void testPruneEmptyTable() throws Exception {
// Make sure that empty tables do not block the progress of pruning
// Create an empty table
TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
Collections.singletonList(TestTransactionProcessor.class.getName()));
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
transactionPruningPlugin.initialize(conf);
try {
long now1 = System.currentTimeMillis();
long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
long noPruneUpperBound = -1;
long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
InMemoryTransactionStateCache.setTransactionSnapshot(
new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
ImmutableSet.of(expectedPruneUpperBound1),
ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
testUtil.compact(txEmptyTable, true);
testUtil.compact(txDataTable1, true);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
// fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
// Now flush the empty table, this will record the table region as empty, and then pruning will continue
testUtil.flush(txEmptyTable);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
// fetch prune upper bound, again, this time it should work
pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
// Now add some data to the empty table
// (adding data non-transactionally is okay too, we just need some data for the compaction to run)
emptyHTable.put(new Put(Bytes.toBytes(1)).add(family, qualifier, Bytes.toBytes(1)));
emptyHTable.close();
// Now run another compaction on txDataTable1 with an updated tx snapshot
long now2 = System.currentTimeMillis();
long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
InMemoryTransactionStateCache.setTransactionSnapshot(
new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
ImmutableSet.of(expectedPruneUpperBound2),
ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
testUtil.flush(txEmptyTable);
testUtil.compact(txDataTable1, true);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
// Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
// txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
// empty in the previous run with inactiveTxTimeNow1
long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
// However, after compacting txEmptyTable we should get the latest upper bound
testUtil.flush(txEmptyTable);
testUtil.compact(txEmptyTable, true);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
} finally {
transactionPruningPlugin.destroy();
hBaseAdmin.disableTable(txEmptyTable);
hBaseAdmin.deleteTable(txEmptyTable);
}
}
private void createPruneStateTable() throws Exception {
HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
// Prune state table is a non-transactional table, hence no transaction co-processor
Collections.<String>emptyList());
table.close();
}
@Test
public void testPruneEmptyTable() throws Exception {
// Make sure that empty tables do not block the progress of pruning
// Create an empty table
TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
Collections.singletonList(TestTransactionProcessor.class.getName()));
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
transactionPruningPlugin.initialize(conf);
try {
long now1 = System.currentTimeMillis();
long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
long noPruneUpperBound = -1;
long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
InMemoryTransactionStateCache.setTransactionSnapshot(
new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
ImmutableSet.of(expectedPruneUpperBound1),
ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
testUtil.compact(txEmptyTable, true);
testUtil.compact(txDataTable1, true);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
// fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
// Now flush the empty table, this will record the table region as empty, and then pruning will continue
hBaseAdmin.flush(txEmptyTable);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
// fetch prune upper bound, again, this time it should work
pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
// Now add some data to the empty table
// (adding data non-transactionally is okay too, we just need some data for the compaction to run)
emptyHTable.put(new Put(Bytes.toBytes(1)).addColumn(family, qualifier, Bytes.toBytes(1)));
emptyHTable.close();
// Now run another compaction on txDataTable1 with an updated tx snapshot
long now2 = System.currentTimeMillis();
long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
InMemoryTransactionStateCache.setTransactionSnapshot(
new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
ImmutableSet.of(expectedPruneUpperBound2),
ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
testUtil.flush(txEmptyTable);
testUtil.compact(txDataTable1, true);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
// Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
// txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
// empty in the previous run with inactiveTxTimeNow1
long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
// However, after compacting txEmptyTable we should get the latest upper bound
testUtil.flush(txEmptyTable);
testUtil.compact(txEmptyTable, true);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
} finally {
transactionPruningPlugin.destroy();
hBaseAdmin.disableTable(txEmptyTable);
hBaseAdmin.deleteTable(txEmptyTable);
}
}
@Test
public void testPruneEmptyTable() throws Exception {
// Make sure that empty tables do not block the progress of pruning
// Create an empty table
TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
Collections.singletonList(TestTransactionProcessor.class.getName()));
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
transactionPruningPlugin.initialize(conf);
try {
long now1 = System.currentTimeMillis();
long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
long noPruneUpperBound = -1;
long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
InMemoryTransactionStateCache.setTransactionSnapshot(
new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
ImmutableSet.of(expectedPruneUpperBound1),
ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
testUtil.compact(txEmptyTable, true);
testUtil.compact(txDataTable1, true);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
// fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
// Now flush the empty table, this will record the table region as empty, and then pruning will continue
hBaseAdmin.flush(txEmptyTable);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
// fetch prune upper bound, again, this time it should work
pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
// Now add some data to the empty table
// (adding data non-transactionally is okay too, we just need some data for the compaction to run)
emptyHTable.put(new Put(Bytes.toBytes(1)).addColumn(family, qualifier, Bytes.toBytes(1)));
emptyHTable.close();
// Now run another compaction on txDataTable1 with an updated tx snapshot
long now2 = System.currentTimeMillis();
long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
InMemoryTransactionStateCache.setTransactionSnapshot(
new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
ImmutableSet.of(expectedPruneUpperBound2),
ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
testUtil.flush(txEmptyTable);
testUtil.compact(txDataTable1, true);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
// Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
// txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
// empty in the previous run with inactiveTxTimeNow1
long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
// However, after compacting txEmptyTable we should get the latest upper bound
testUtil.flush(txEmptyTable);
testUtil.compact(txEmptyTable, true);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
} finally {
transactionPruningPlugin.destroy();
hBaseAdmin.disableTable(txEmptyTable);
hBaseAdmin.deleteTable(txEmptyTable);
}
}
private void createPruneStateTable() throws Exception {
HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
// Prune state table is a non-transactional table, hence no transaction co-processor
Collections.<String>emptyList());
table.close();
}
@Test
public void testPruneEmptyTable() throws Exception {
// Make sure that empty tables do not block the progress of pruning
// Create an empty table
TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
Collections.singletonList(TestTransactionProcessor.class.getName()));
TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();
transactionPruningPlugin.initialize(conf);
try {
long now1 = System.currentTimeMillis();
long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
long noPruneUpperBound = -1;
long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS;
InMemoryTransactionStateCache.setTransactionSnapshot(
new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1,
ImmutableSet.of(expectedPruneUpperBound1),
ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
testUtil.compact(txEmptyTable, true);
testUtil.compact(txDataTable1, true);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
// fetch prune upper bound, there should be no prune upper bound since txEmptyTable cannot be compacted
long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
// Now flush the empty table, this will record the table region as empty, and then pruning will continue
hBaseAdmin.flush(txEmptyTable);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
// fetch prune upper bound, again, this time it should work
pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound1);
transactionPruningPlugin.pruneComplete(now1, expectedPruneUpperBound1);
// Now add some data to the empty table
// (adding data non-transactionally is okay too, we just need some data for the compaction to run)
emptyHTable.put(new Put(Bytes.toBytes(1)).addColumn(family, qualifier, Bytes.toBytes(1)));
emptyHTable.close();
// Now run another compaction on txDataTable1 with an updated tx snapshot
long now2 = System.currentTimeMillis();
long inactiveTxTimeNow2 = (now2 - 150) * TxConstants.MAX_TX_PER_MS;
long expectedPruneUpperBound2 = (now2 - 200) * TxConstants.MAX_TX_PER_MS;
InMemoryTransactionStateCache.setTransactionSnapshot(
new TransactionSnapshot(expectedPruneUpperBound2, expectedPruneUpperBound2, expectedPruneUpperBound2,
ImmutableSet.of(expectedPruneUpperBound2),
ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of()));
testUtil.flush(txEmptyTable);
testUtil.compact(txDataTable1, true);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
// Running a prune now should still return min(inactiveTxTimeNow1, expectedPruneUpperBound1) since
// txEmptyTable is no longer empty. This information is returned since the txEmptyTable was recorded as being
// empty in the previous run with inactiveTxTimeNow1
long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
Assert.assertEquals(inactiveTxTimeNow1, pruneUpperBound2);
transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1);
// However, after compacting txEmptyTable we should get the latest upper bound
testUtil.flush(txEmptyTable);
testUtil.compact(txEmptyTable, true);
// Since the write to prune table happens async, we need to sleep a bit before checking the state of the table
TimeUnit.SECONDS.sleep(2);
pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound2);
} finally {
transactionPruningPlugin.destroy();
hBaseAdmin.disableTable(txEmptyTable);
hBaseAdmin.deleteTable(txEmptyTable);
}
}
/**
* Process key.
*
* @param pcapsResponse
* the pcaps response
* @param key
* the key
* @param startTime
* the start time
* @param endTime
* the end time
* @param isPartialResponse
* the is partial response
* @param includeDuplicateLastRow
* the include duplicate last row
* @param maxResultSize
* the max result size
* @return the pcaps response
* @throws IOException
* Signals that an I/O exception has occurred.
*/
@VisibleForTesting
PcapsResponse processKey(PcapsResponse pcapsResponse, String key,
long startTime, long endTime, boolean isPartialResponse,
boolean includeDuplicateLastRow, long maxResultSize) throws IOException {
HTable table = null;
Scan scan = null;
List<Cell> scannedCells = null;
try {
// 1. Create start and stop row for the key;
Map<String, String> keysMap = createStartAndStopRowKeys(key,
isPartialResponse, includeDuplicateLastRow);
// 2. if the input key contains all fragments (7) and it is not part
// of previous partial response (isPartialResponse),
// 'keysMap' will be null; do a Get; currently not doing any
// response size related checks for Get;
// by default all cells from a specific row are sorted by timestamp
if (keysMap == null) {
Get get = createGetRequest(key, startTime, endTime);
List<Cell> cells = executeGetRequest(table, get);
for (Cell cell : cells) {
pcapsResponse.addPcaps(CellUtil.cloneValue(cell));
}
return pcapsResponse;
}
// 3. Create and execute Scan request
scan = createScanRequest(pcapsResponse, keysMap, startTime, endTime,
maxResultSize);
scannedCells = executeScanRequest(table, scan);
LOGGER.info("scannedCells size :" + scannedCells.size());
addToResponse(pcapsResponse, scannedCells, maxResultSize);
} catch (IOException e) {
LOGGER.error("Exception occurred while fetching Pcaps for the keys :"
+ key, e);
if (e instanceof ZooKeeperConnectionException
|| e instanceof MasterNotRunningException
|| e instanceof NoServerForRegionException) {
int maxRetryLimit = ConfigurationUtil.getConnectionRetryLimit();
System.out.println("maxRetryLimit =" + maxRetryLimit);
for (int attempt = 1; attempt <= maxRetryLimit; attempt++) {
System.out.println("attempting =" + attempt);
try {
HBaseConfigurationUtil.closeConnection(); // closing the
// existing
// connection
// and retry,
// it will
// create a new
// HConnection
scannedCells = executeScanRequest(table, scan);
addToResponse(pcapsResponse, scannedCells, maxResultSize);
break;
} catch (IOException ie) {
if (attempt == maxRetryLimit) {
LOGGER.error("Throwing the exception after retrying "
+ maxRetryLimit + " times.");
throw e;
}
}
}
}
} finally {
if (table != null) {
table.close();
}
}
return pcapsResponse;
}