下面列出了org.apache.hadoop.hbase.client.Connection#getBufferedMutator ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 利用BufferedMutator批量导入
*
* @param connection
* @throws IOException
*/
private static void bmImport(Connection connection) throws IOException {
BufferedMutator bufferedMutator = connection.getBufferedMutator(TableName.valueOf("t3"));
byte[] columnFamily = "f1".getBytes();
long startTime = System.currentTimeMillis();
ArrayList<Put> puts = new ArrayList<Put>();
for (int i = 0; i < 999999; i++) {
puts.add(HBaseUtil.createPut(i + "", columnFamily, "c1", i + ""));
//每10000条导入一次
if (i % 10000 == 0) {
bufferedMutator.mutate(puts);
puts.clear();
}
}
//批量调用
bufferedMutator.mutate(puts);
bufferedMutator.close();
System.out.println("共耗时:" + (System.currentTimeMillis() - startTime) + "ms");
}
/** Helper function to create a table and return the rows that it created. */
private static void writeData(String tableId, int numRows) throws Exception {
Connection connection = admin.getConnection();
TableName tableName = TableName.valueOf(tableId);
BufferedMutator mutator = connection.getBufferedMutator(tableName);
List<Mutation> mutations = makeTableData(numRows);
mutator.mutate(mutations);
mutator.flush();
mutator.close();
}
Writer() throws IOException {
Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
table = conn.getBufferedMutator(htableName);
}
Writer() throws IOException {
Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
table = conn.getBufferedMutator(htableName);
}
protected static void runMutationTests(Connection conn, TableName tableName, long rowCount,
int valueSize) throws IOException {
System.out.println("starting mutations");
Stopwatch uberStopwatch = Stopwatch.createUnstarted();
Stopwatch incrementalStopwatch = Stopwatch.createUnstarted();
try (BufferedMutator mutator = conn.getBufferedMutator(tableName)) {
// Use the same value over and over again. Creating new random data takes time. Don't count
// creating a large array towards Bigtable performance
byte[] value = Bytes.toBytes(RandomStringUtils.randomAlphanumeric(valueSize));
incrementalStopwatch.start();
for (long i = 1; i < 10; i++) {
// The first few writes are slow.
doPut(mutator, value);
}
mutator.flush();
BigtableUtilities.printPerformance("starter batch", incrementalStopwatch, 10);
uberStopwatch.reset();
incrementalStopwatch.reset();
uberStopwatch.start();
incrementalStopwatch.start();
for (int i = 0; i < rowCount - 10; i++) {
doPut(mutator, value);
if (i > 0 && i % PRINT_COUNT == 0) {
BigtableUtilities.printPerformance("one batch", incrementalStopwatch, PRINT_COUNT);
BigtableUtilities.printPerformance("average so far", uberStopwatch, i);
incrementalStopwatch.reset();
incrementalStopwatch.start();
}
}
incrementalStopwatch.reset();
incrementalStopwatch.start();
System.out.println("Flushing");
mutator.flush();
System.out.println(String.format("Flush took %d ms.",
incrementalStopwatch.elapsed(TimeUnit.MILLISECONDS)));
BigtableUtilities.printPerformance("full batch", uberStopwatch, Math.toIntExact(rowCount));
} catch (RetriesExhaustedWithDetailsException e) {
logExceptions(e);
}
}