下面列出了org.apache.hadoop.hbase.client.HBaseAdmin#flush ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Test to repro ArrayIndexOutOfBoundException that happens during filtering in BinarySubsetComparator
* only after a flush is performed
* @throws Exception
*/
@Test
public void testFilterOnTrailingKeyColumn() throws Exception {
long ts = nextTimestamp();
String tenantId = getOrganizationId();
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+1));
Connection conn = DriverManager.getConnection(getUrl(), props);
HBaseAdmin admin = null;
try {
initTableValues(tenantId, getSplits(tenantId), ts);
admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
admin.flush(SchemaUtil.getTableNameAsBytes(PRODUCT_METRICS_SCHEMA_NAME,PRODUCT_METRICS_NAME));
String query = "SELECT SUM(TRANSACTIONS) FROM " + PRODUCT_METRICS_NAME + " WHERE FEATURE=?";
PreparedStatement statement = conn.prepareStatement(query);
statement.setString(1, F1);
ResultSet rs = statement.executeQuery();
assertTrue(rs.next());
assertEquals(1200, rs.getInt(1));
} finally {
if (admin != null) admin.close();
conn.close();
}
}
/**
* Test to repro ArrayIndexOutOfBoundException that happens during filtering in BinarySubsetComparator
* only after a flush is performed
* @throws Exception
*/
@Test
public void testFilterOnTrailingKeyColumn() throws Exception {
long ts = nextTimestamp();
String tenantId = getOrganizationId();
Properties props = new Properties(TEST_PROPERTIES);
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+1));
Connection conn = DriverManager.getConnection(PHOENIX_JDBC_URL, props);
HBaseAdmin admin = null;
try {
initTableValues(tenantId, getSplits(tenantId), ts);
admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
admin.flush(SchemaUtil.getTableNameAsBytes(PRODUCT_METRICS_SCHEMA_NAME,PRODUCT_METRICS_NAME));
String query = "SELECT SUM(TRANSACTIONS) FROM " + PRODUCT_METRICS_NAME + " WHERE FEATURE=?";
PreparedStatement statement = conn.prepareStatement(query);
statement.setString(1, F1);
ResultSet rs = statement.executeQuery();
assertTrue(rs.next());
assertEquals(1200, rs.getInt(1));
} finally {
if (admin != null) admin.close();
conn.close();
}
}
private static void initTable() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
conn.createStatement().execute("CREATE TABLE " + TABLE_NAME + "("
+ "a VARCHAR PRIMARY KEY, b VARCHAR) "
+ HTableDescriptor.MAX_FILESIZE + "=" + MAX_FILESIZE + ","
+ " SALT_BUCKETS = 4");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO s VALUES(?,?)");
int rowCount = 0;
for (int c1 = MIN_CHAR; c1 <= MAX_CHAR; c1++) {
for (int c2 = MIN_CHAR; c2 <= MAX_CHAR; c2++) {
String pk = Character.toString((char)c1) + Character.toString((char)c2);
stmt.setString(1, pk);
stmt.setString(2, PAYLOAD);
stmt.execute();
rowCount++;
if (rowCount % BATCH_SIZE == 0) {
conn.commit();
}
}
}
conn.commit();
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
HBaseAdmin admin = services.getAdmin();
try {
admin.flush(TABLE_NAME);
} finally {
admin.close();
}
conn.close();
}
private void compactTable(Connection conn, String tableName) throws IOException, InterruptedException, SQLException {
ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
HBaseAdmin admin = services.getAdmin();
try {
admin.flush(tableName);
admin.majorCompact(tableName);
Thread.sleep(10000); // FIXME: how do we know when compaction is done?
} finally {
admin.close();
}
}
private void insertData(final byte[] tableName, HBaseAdmin admin, HTableInterface t) throws IOException,
InterruptedException {
Put p = new Put(Bytes.toBytes("row"));
p.add(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("value1"));
t.put(p);
t.flushCommits();
admin.flush(tableName);
}
private void doFlush(HBaseAdmin admin, TableName tableName) throws IOException {
LOG.trace("Flushing " + tableName);
admin.flush(tableName);
LOG.trace("Flushed " + tableName);
}