下面列出了怎么用org.apache.hadoop.hbase.client.BufferedMutator的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void setup(final Context context) throws IOException, InterruptedException {
super.setup(context);
final Configuration configuration = context.getConfiguration();
skipWAL = configuration.getBoolean(Constants.MAPREDUCE_INDEX_SKIP_WAL, false);
TableName outputTable = TableName.valueOf(configuration.get(TableOutputFormat.OUTPUT_TABLE));
BufferedMutator.ExceptionListener listener = (e, mutator) -> {
for (int i = 0; i < e.getNumExceptions(); i++) {
LOG.warn("Failed to send put: " + e.getRow(i));
}
};
BufferedMutatorParams mutatorParms = new BufferedMutatorParams(outputTable).listener(listener);
mutator = getGraph().connection().getBufferedMutator(mutatorParms);
}
/**
* 利用BufferedMutator批量导入
*
* @param connection
* @throws IOException
*/
private static void bmImport(Connection connection) throws IOException {
BufferedMutator bufferedMutator = connection.getBufferedMutator(TableName.valueOf("t3"));
byte[] columnFamily = "f1".getBytes();
long startTime = System.currentTimeMillis();
ArrayList<Put> puts = new ArrayList<Put>();
for (int i = 0; i < 999999; i++) {
puts.add(HBaseUtil.createPut(i + "", columnFamily, "c1", i + ""));
//每10000条导入一次
if (i % 10000 == 0) {
bufferedMutator.mutate(puts);
puts.clear();
}
}
//批量调用
bufferedMutator.mutate(puts);
bufferedMutator.close();
System.out.println("共耗时:" + (System.currentTimeMillis() - startTime) + "ms");
}
private void migrateNamespaceTable() throws IOException {
try (Table nsTable = masterServices.getConnection().getTable(TableName.NAMESPACE_TABLE_NAME);
ResultScanner scanner = nsTable.getScanner(
new Scan().addFamily(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES).readAllVersions());
BufferedMutator mutator =
masterServices.getConnection().getBufferedMutator(TableName.META_TABLE_NAME)) {
for (Result result;;) {
result = scanner.next();
if (result == null) {
break;
}
Put put = new Put(result.getRow());
result
.getColumnCells(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES,
TableDescriptorBuilder.NAMESPACE_COL_DESC_BYTES)
.forEach(c -> put.addColumn(HConstants.NAMESPACE_FAMILY,
HConstants.NAMESPACE_COL_DESC_QUALIFIER, c.getTimestamp(), CellUtil.cloneValue(c)));
mutator.mutate(put);
}
}
// schedule a disable procedure instead of block waiting here, as when disabling a table we will
// wait until master is initialized, but we are part of the initialization...
masterServices.getMasterProcedureExecutor().submitProcedure(
new DisableTableProcedure(masterServices.getMasterProcedureExecutor().getEnvironment(),
TableName.NAMESPACE_TABLE_NAME, false));
}
/**
* Writes an action (Put or Delete) to the specified table.
*
* @param tableName
* the table being updated.
* @param action
* the update, either a put or a delete.
* @throws IllegalArgumentException
* if the action is not a put or a delete.
*/
@Override
public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
BufferedMutator mutator = getBufferedMutator(tableName);
// The actions are not immutable, so we defensively copy them
if (action instanceof Put) {
Put put = new Put((Put) action);
put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL
: Durability.SKIP_WAL);
mutator.mutate(put);
} else if (action instanceof Delete) {
Delete delete = new Delete((Delete) action);
mutator.mutate(delete);
} else
throw new IllegalArgumentException(
"action must be either Delete or Put");
}
private LinkedBlockingQueue<Long> insertData() throws IOException, InterruptedException {
LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<>(25000);
BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName);
byte[] value = new byte[300];
TraceUtil.addSampler(Sampler.ALWAYS);
for (int x = 0; x < 5000; x++) {
try (TraceScope traceScope = TraceUtil.createTrace("insertData")) {
for (int i = 0; i < 5; i++) {
long rk = random.nextLong();
rowKeys.add(rk);
Put p = new Put(Bytes.toBytes(rk));
for (int y = 0; y < 10; y++) {
random.nextBytes(value);
p.addColumn(familyName, Bytes.toBytes(random.nextLong()), value);
}
ht.mutate(p);
}
if ((x % 1000) == 0) {
admin.flush(tableName);
}
}
}
admin.flush(tableName);
return rowKeys;
}
public void deleteRowsFromDataIndex(final byte[][] rows, final short adapterId) {
try {
try (final BufferedMutator mutator =
getBufferedMutator(getTableName(DataIndexUtils.DATA_ID_INDEX.getName()))) {
final byte[] family = StringUtils.stringToBinary(ByteArrayUtils.shortToString(adapterId));
mutator.mutate(Arrays.stream(rows).map(r -> {
final Delete delete = new Delete(r);
delete.addFamily(family);
return delete;
}).collect(Collectors.toList()));
}
} catch (final IOException e) {
LOGGER.warn("Unable to delete from data index", e);
}
}
public void write(final TableName table, final List<Put> puts) {
Preconditions.checkNotNull(table);
Preconditions.checkNotNull(puts);
try(final Connection connection = this.connectionFactory.getConnection();
final BufferedMutator mutator = connection.getBufferedMutator(table);) {
mutator.mutate(puts);
mutator.flush();
} catch(Exception ex) {
final String errorMsg = String.format("Failed with a [%s] when writing to table [%s] ", ex.getMessage(),
table.getNameAsString());
throw new SinkConnectorException(errorMsg, ex);
}
}
private static BufferedMutator getBufferedMutator(HBaseGraph graph, String tableName) {
try {
HBaseGraphConfiguration config = graph.configuration();
TableName name = HBaseGraphUtils.getTableName(config, tableName);
BufferedMutatorParams params = new BufferedMutatorParams(name).listener(LISTENER);
return graph.connection().getBufferedMutator(params);
} catch (IOException e) {
throw new HBaseGraphException(e);
}
}
public HBaseBulkLoader(HBaseGraph graph,
BufferedMutator edgesMutator,
BufferedMutator edgeIndicesMutator,
BufferedMutator verticesMutator,
BufferedMutator vertexIndicesMutator) {
this.graph = graph;
this.edgesMutator = edgesMutator;
this.edgeIndicesMutator = edgeIndicesMutator;
this.verticesMutator = verticesMutator;
this.vertexIndicesMutator = vertexIndicesMutator;
this.skipWAL = graph.configuration().getBulkLoaderSkipWAL();
}
/**
* Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
*/
private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
if (mutations.isEmpty()) {
return;
}
mutator.mutate(mutations);
mutator.flush();
mutations.clear();
sleepForTime(SLEEP_TIME_MILLISECONDS);
}
/** Helper function to create a table and return the rows that it created. */
private static void writeData(String tableId, int numRows) throws Exception {
Connection connection = admin.getConnection();
TableName tableName = TableName.valueOf(tableId);
BufferedMutator mutator = connection.getBufferedMutator(tableName);
List<Mutation> mutations = makeTableData(numRows);
mutator.mutate(mutations);
mutator.flush();
mutator.close();
}
private void putKVAndFlush(BufferedMutator table, byte[] row, byte[] value, long ts)
throws Exception {
Put put = new Put(row, ts);
put.addColumn(Bytes.toBytes(family), qf, value);
table.mutate(put);
table.flush();
admin.flush(tableName);
}
/**
* @param tableName
* the name of the table, as a string
* @return the named mutator
* @throws IOException
* if there is a problem opening a table
*/
BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException {
if(this.connection == null){
this.connection = ConnectionFactory.createConnection(conf);
}
if (!mutatorMap.containsKey(tableName)) {
LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
BufferedMutator mutator =
connection.getBufferedMutator(TableName.valueOf(tableName.get()));
mutatorMap.put(tableName, mutator);
}
return mutatorMap.get(tableName);
}
@Override
public void close(TaskAttemptContext context) throws IOException {
for (BufferedMutator mutator : mutatorMap.values()) {
mutator.close();
}
if (connection != null) {
connection.close();
}
}
@Override
protected void instantiateHTable() throws IOException {
for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i));
params.writeBufferSize(4 * 1024 * 1024);
BufferedMutator table = connection.getBufferedMutator(params);
this.tables[i] = table;
}
}
private synchronized BufferedMutator getBufferedMutator() throws IOException {
if ( conn != null ) {
if ( mutator == null ) {
mutator = conn.getBufferedMutator( tab.getName() );
}
} else {
throw new IOException( "Can't mutate the table " + tab.getName() );
}
return mutator;
}
@Override
public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, exception);
}
/**
* msg 包括:
*
* @param tablename
* @param entity:
* rowkey->cf:column->value 其中增加对_timestamp字段的处理
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected boolean insert(DataStoreMsg msg) {
// 根据TABLE名进行合法验证
Map[] maps = (Map[]) adaptor.prepareInsertObj(msg, datasource.getDataStoreConnection());
Map<byte[], Map> entity = maps[0];
Map<byte[], Long> entityStamp = maps[1];
String tableName = (String) msg.get(DataStoreProtocol.HBASE_TABLE_NAME);
// add write buffer
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
params.writeBufferSize(1024 * 1024 * 2);
try (BufferedMutator table = datasource.getSourceConnect().getBufferedMutator(params);) {
// 取得所有cf
List<Put> puts = Lists.newArrayList();
Put put = null;
for (byte[] rowkey : entity.keySet()) {
// 定制时间戳
put = entityStamp.containsKey(rowkey) ? new Put(rowkey, entityStamp.get(rowkey)) : new Put(rowkey);
// 取得column和value
for (Object entry : entity.get(rowkey).keySet()) {
String[] column = ((String) entry).split(":");
put.addColumn(Bytes.toBytes(column[0]), Bytes.toBytes(column[1]),
Bytes.toBytes((String) entity.get(rowkey).get(entry)));
}
puts.add(put);
}
// 批量提交
Object[] results = new Object[puts.size()];
// table.batch(puts, results);
table.mutate(puts);
// flush
table.flush();
// 根据插入信息操作并返回结果
return adaptor.handleInsertResult(results, msg, datasource.getDataStoreConnection());
}
catch (IOException e) {
log.err(this, "INSERT HBASE TABLE[" + tableName + "] FAIL:" + msg.toJSONString(), e);
return false;
}
}
@Override
public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, exception);
}
@Override
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
}
@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
}
public static void loadData(final HBaseTestingUtility util, final TableName tableName, int rows,
byte[]... families) throws IOException, InterruptedException {
BufferedMutator mutator = util.getConnection().getBufferedMutator(tableName);
loadData(util, mutator, rows, families);
}
@Override
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
return null;
}
@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
return null;
}
@Override
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
throw new UnsupportedOperationException();
}
protected static void runMutationTests(Connection conn, TableName tableName, long rowCount,
int valueSize) throws IOException {
System.out.println("starting mutations");
Stopwatch uberStopwatch = Stopwatch.createUnstarted();
Stopwatch incrementalStopwatch = Stopwatch.createUnstarted();
try (BufferedMutator mutator = conn.getBufferedMutator(tableName)) {
// Use the same value over and over again. Creating new random data takes time. Don't count
// creating a large array towards Bigtable performance
byte[] value = Bytes.toBytes(RandomStringUtils.randomAlphanumeric(valueSize));
incrementalStopwatch.start();
for (long i = 1; i < 10; i++) {
// The first few writes are slow.
doPut(mutator, value);
}
mutator.flush();
BigtableUtilities.printPerformance("starter batch", incrementalStopwatch, 10);
uberStopwatch.reset();
incrementalStopwatch.reset();
uberStopwatch.start();
incrementalStopwatch.start();
for (int i = 0; i < rowCount - 10; i++) {
doPut(mutator, value);
if (i > 0 && i % PRINT_COUNT == 0) {
BigtableUtilities.printPerformance("one batch", incrementalStopwatch, PRINT_COUNT);
BigtableUtilities.printPerformance("average so far", uberStopwatch, i);
incrementalStopwatch.reset();
incrementalStopwatch.start();
}
}
incrementalStopwatch.reset();
incrementalStopwatch.start();
System.out.println("Flushing");
mutator.flush();
System.out.println(String.format("Flush took %d ms.",
incrementalStopwatch.elapsed(TimeUnit.MILLISECONDS)));
BigtableUtilities.printPerformance("full batch", uberStopwatch, Math.toIntExact(rowCount));
} catch (RetriesExhaustedWithDetailsException e) {
logExceptions(e);
}
}
protected static void doPut(BufferedMutator mutator, byte[] value) throws IOException {
byte[] key = Bytes.toBytes(RandomStringUtils.randomAlphanumeric(10));
mutator.mutate(new Put(key, System.currentTimeMillis()).addColumn(BigtableUtilities.FAMILY,
BigtableUtilities.QUALIFIER, value));
}
public BufferedMutator getBufferedMutator(final TableName tableName) throws IOException {
final BufferedMutatorParams params = new BufferedMutatorParams(tableName);
return conn.getBufferedMutator(params);
}
public HBaseMetadataWriter(final BufferedMutator writer, final MetadataType metadataType) {
this.writer = writer;
metadataTypeBytes = StringUtils.stringToBinary(metadataType.name());
}