org.apache.hadoop.hbase.client.BufferedMutatorParams#writeBufferSize ( )源码实例Demo

下面列出了org.apache.hadoop.hbase.client.BufferedMutatorParams#writeBufferSize ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: hbase   文件:
void onStartup() throws IOException {
  BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
  this.mutator = connection.getBufferedMutator(p);
  this.table = connection.getTable(TableName.valueOf(opts.tableName));
protected void instantiateHTable() throws IOException {
  for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
    BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i));
    params.writeBufferSize(4 * 1024 * 1024);
    BufferedMutator table = connection.getBufferedMutator(params);
    this.tables[i] = table;
源代码3 项目: uavstack   文件:
 * msg 包括:
 * @param tablename
 * @param entity:
 *            rowkey->cf:column->value 其中增加对_timestamp字段的处理
@SuppressWarnings({ "unchecked", "rawtypes" })
protected boolean insert(DataStoreMsg msg) {

    // 根据TABLE名进行合法验证
    Map[] maps = (Map[]) adaptor.prepareInsertObj(msg, datasource.getDataStoreConnection());
    Map<byte[], Map> entity = maps[0];
    Map<byte[], Long> entityStamp = maps[1];
    String tableName = (String) msg.get(DataStoreProtocol.HBASE_TABLE_NAME);
    // add write buffer
    BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));

    params.writeBufferSize(1024 * 1024 * 2);
    try (BufferedMutator table = datasource.getSourceConnect().getBufferedMutator(params);) {

        // 取得所有cf
        List<Put> puts = Lists.newArrayList();
        Put put = null;
        for (byte[] rowkey : entity.keySet()) {
            // 定制时间戳
            put = entityStamp.containsKey(rowkey) ? new Put(rowkey, entityStamp.get(rowkey)) : new Put(rowkey);

            // 取得column和value
            for (Object entry : entity.get(rowkey).keySet()) {

                String[] column = ((String) entry).split(":");
                put.addColumn(Bytes.toBytes(column[0]), Bytes.toBytes(column[1]),
                        Bytes.toBytes((String) entity.get(rowkey).get(entry)));
        // 批量提交
        Object[] results = new Object[puts.size()];
        // table.batch(puts, results);
        // flush
        // 根据插入信息操作并返回结果
        return adaptor.handleInsertResult(results, msg, datasource.getDataStoreConnection());
    catch (IOException e) {
        log.err(this, "INSERT HBASE TABLE[" + tableName + "] FAIL:" + msg.toJSONString(), e);
        return false;

源代码4 项目: flink   文件:
public void open(Configuration parameters) throws Exception {"start open ...");
	org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
	try {;
		this.numPendingRequests = new AtomicLong(0);

		if (null == connection) {
			this.connection = ConnectionFactory.createConnection(config);
		// create a parameter instance, set the table name and custom listener reference.
		BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(hTableName))
		if (bufferFlushMaxSizeInBytes > 0) {
		this.mutator = connection.getBufferedMutator(params);

		if (bufferFlushIntervalMillis > 0) {
			this.executor = Executors.newScheduledThreadPool(
				1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
			this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
				if (closed) {
				try {
				} catch (Exception e) {
					// fail the sink and skip the rest of the items
					// if the failure handler decides to throw an exception
					failureThrowable.compareAndSet(null, e);
			}, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
	} catch (TableNotFoundException tnfe) {
		LOG.error("The table " + hTableName + " not found ", tnfe);
		throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
	} catch (IOException ioe) {
		LOG.error("Exception while creating connection to HBase.", ioe);
		throw new RuntimeException("Cannot create connection to HBase.", ioe);
	}"end open.");