下面列出了org.apache.hadoop.hbase.client.BufferedMutatorParams#writeBufferSize ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
void onStartup() throws IOException {
BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
p.writeBufferSize(opts.bufferSize);
this.mutator = connection.getBufferedMutator(p);
this.table = connection.getTable(TableName.valueOf(opts.tableName));
}
@Override
protected void instantiateHTable() throws IOException {
for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i));
params.writeBufferSize(4 * 1024 * 1024);
BufferedMutator table = connection.getBufferedMutator(params);
this.tables[i] = table;
}
}
/**
* msg 包括:
*
* @param tablename
* @param entity:
* rowkey->cf:column->value 其中增加对_timestamp字段的处理
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected boolean insert(DataStoreMsg msg) {
// 根据TABLE名进行合法验证
Map[] maps = (Map[]) adaptor.prepareInsertObj(msg, datasource.getDataStoreConnection());
Map<byte[], Map> entity = maps[0];
Map<byte[], Long> entityStamp = maps[1];
String tableName = (String) msg.get(DataStoreProtocol.HBASE_TABLE_NAME);
// add write buffer
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
params.writeBufferSize(1024 * 1024 * 2);
try (BufferedMutator table = datasource.getSourceConnect().getBufferedMutator(params);) {
// 取得所有cf
List<Put> puts = Lists.newArrayList();
Put put = null;
for (byte[] rowkey : entity.keySet()) {
// 定制时间戳
put = entityStamp.containsKey(rowkey) ? new Put(rowkey, entityStamp.get(rowkey)) : new Put(rowkey);
// 取得column和value
for (Object entry : entity.get(rowkey).keySet()) {
String[] column = ((String) entry).split(":");
put.addColumn(Bytes.toBytes(column[0]), Bytes.toBytes(column[1]),
Bytes.toBytes((String) entity.get(rowkey).get(entry)));
}
puts.add(put);
}
// 批量提交
Object[] results = new Object[puts.size()];
// table.batch(puts, results);
table.mutate(puts);
// flush
table.flush();
// 根据插入信息操作并返回结果
return adaptor.handleInsertResult(results, msg, datasource.getDataStoreConnection());
}
catch (IOException e) {
log.err(this, "INSERT HBASE TABLE[" + tableName + "] FAIL:" + msg.toJSONString(), e);
return false;
}
}
@Override
public void open(Configuration parameters) throws Exception {
LOG.info("start open ...");
org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
try {
this.mutationConverter.open();
this.numPendingRequests = new AtomicLong(0);
if (null == connection) {
this.connection = ConnectionFactory.createConnection(config);
}
// create a parameter instance, set the table name and custom listener reference.
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(hTableName))
.listener(this);
if (bufferFlushMaxSizeInBytes > 0) {
params.writeBufferSize(bufferFlushMaxSizeInBytes);
}
this.mutator = connection.getBufferedMutator(params);
if (bufferFlushIntervalMillis > 0) {
this.executor = Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
if (closed) {
return;
}
try {
flush();
} catch (Exception e) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, e);
}
}, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
}
} catch (TableNotFoundException tnfe) {
LOG.error("The table " + hTableName + " not found ", tnfe);
throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
} catch (IOException ioe) {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
LOG.info("end open.");
}