类org.apache.hadoop.hbase.client.BufferedMutator源码实例Demo

下面列出了怎么用org.apache.hadoop.hbase.client.BufferedMutator的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hgraphdb   文件: HBaseIndexDirectMapperBase.java
@Override
protected void setup(final Context context) throws IOException, InterruptedException {
    super.setup(context);

    final Configuration configuration = context.getConfiguration();

    skipWAL = configuration.getBoolean(Constants.MAPREDUCE_INDEX_SKIP_WAL, false);

    TableName outputTable = TableName.valueOf(configuration.get(TableOutputFormat.OUTPUT_TABLE));
    BufferedMutator.ExceptionListener listener = (e, mutator) -> {
        for (int i = 0; i < e.getNumExceptions(); i++) {
            LOG.warn("Failed to send put: " + e.getRow(i));
        }
    };
    BufferedMutatorParams mutatorParms = new BufferedMutatorParams(outputTable).listener(listener);
    mutator = getGraph().connection().getBufferedMutator(mutatorParms);
}
 
源代码2 项目: BigData   文件: Data2HBase1.java
/**
 * 利用BufferedMutator批量导入
 *
 * @param connection
 * @throws IOException
 */
private static void bmImport(Connection connection) throws IOException {
    BufferedMutator bufferedMutator = connection.getBufferedMutator(TableName.valueOf("t3"));
    byte[] columnFamily = "f1".getBytes();

    long startTime = System.currentTimeMillis();
    ArrayList<Put> puts = new ArrayList<Put>();
    for (int i = 0; i < 999999; i++) {
        puts.add(HBaseUtil.createPut(i + "", columnFamily, "c1", i + ""));
        //每10000条导入一次
        if (i % 10000 == 0) {
            bufferedMutator.mutate(puts);
            puts.clear();
        }
    }
    //批量调用
    bufferedMutator.mutate(puts);
    bufferedMutator.close();
    System.out.println("共耗时:" + (System.currentTimeMillis() - startTime) + "ms");
}
 
源代码3 项目: hbase   文件: TableNamespaceManager.java
private void migrateNamespaceTable() throws IOException {
  try (Table nsTable = masterServices.getConnection().getTable(TableName.NAMESPACE_TABLE_NAME);
    ResultScanner scanner = nsTable.getScanner(
      new Scan().addFamily(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES).readAllVersions());
    BufferedMutator mutator =
      masterServices.getConnection().getBufferedMutator(TableName.META_TABLE_NAME)) {
    for (Result result;;) {
      result = scanner.next();
      if (result == null) {
        break;
      }
      Put put = new Put(result.getRow());
      result
        .getColumnCells(TableDescriptorBuilder.NAMESPACE_FAMILY_INFO_BYTES,
          TableDescriptorBuilder.NAMESPACE_COL_DESC_BYTES)
        .forEach(c -> put.addColumn(HConstants.NAMESPACE_FAMILY,
          HConstants.NAMESPACE_COL_DESC_QUALIFIER, c.getTimestamp(), CellUtil.cloneValue(c)));
      mutator.mutate(put);
    }
  }
  // schedule a disable procedure instead of block waiting here, as when disabling a table we will
  // wait until master is initialized, but we are part of the initialization...
  masterServices.getMasterProcedureExecutor().submitProcedure(
    new DisableTableProcedure(masterServices.getMasterProcedureExecutor().getEnvironment(),
      TableName.NAMESPACE_TABLE_NAME, false));
}
 
源代码4 项目: hbase   文件: MultiTableOutputFormat.java
/**
 * Writes an action (Put or Delete) to the specified table.
 *
 * @param tableName
 *          the table being updated.
 * @param action
 *          the update, either a put or a delete.
 * @throws IllegalArgumentException
 *          if the action is not a put or a delete.
 */
@Override
public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
  BufferedMutator mutator = getBufferedMutator(tableName);
  // The actions are not immutable, so we defensively copy them
  if (action instanceof Put) {
    Put put = new Put((Put) action);
    put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL
        : Durability.SKIP_WAL);
    mutator.mutate(put);
  } else if (action instanceof Delete) {
    Delete delete = new Delete((Delete) action);
    mutator.mutate(delete);
  } else
    throw new IllegalArgumentException(
        "action must be either Delete or Put");
}
 
源代码5 项目: hbase   文件: IntegrationTestSendTraceRequests.java
private LinkedBlockingQueue<Long> insertData() throws IOException, InterruptedException {
  LinkedBlockingQueue<Long> rowKeys = new LinkedBlockingQueue<>(25000);
  BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName);
  byte[] value = new byte[300];
  TraceUtil.addSampler(Sampler.ALWAYS);
  for (int x = 0; x < 5000; x++) {
    try (TraceScope traceScope = TraceUtil.createTrace("insertData")) {
      for (int i = 0; i < 5; i++) {
        long rk = random.nextLong();
        rowKeys.add(rk);
        Put p = new Put(Bytes.toBytes(rk));
        for (int y = 0; y < 10; y++) {
          random.nextBytes(value);
          p.addColumn(familyName, Bytes.toBytes(random.nextLong()), value);
        }
        ht.mutate(p);
      }
      if ((x % 1000) == 0) {
        admin.flush(tableName);
      }
    }
  }
  admin.flush(tableName);
  return rowKeys;
}
 
源代码6 项目: geowave   文件: HBaseOperations.java
public void deleteRowsFromDataIndex(final byte[][] rows, final short adapterId) {
  try {
    try (final BufferedMutator mutator =
        getBufferedMutator(getTableName(DataIndexUtils.DATA_ID_INDEX.getName()))) {

      final byte[] family = StringUtils.stringToBinary(ByteArrayUtils.shortToString(adapterId));
      mutator.mutate(Arrays.stream(rows).map(r -> {
        final Delete delete = new Delete(r);
        delete.addFamily(family);
        return delete;
      }).collect(Collectors.toList()));
    }
  } catch (final IOException e) {
    LOGGER.warn("Unable to delete from data index", e);
  }
}
 
源代码7 项目: kafka-connect-hbase   文件: HBaseClient.java
public void write(final TableName table, final List<Put> puts) {
    Preconditions.checkNotNull(table);
    Preconditions.checkNotNull(puts);
    try(final Connection connection = this.connectionFactory.getConnection();
        final BufferedMutator mutator = connection.getBufferedMutator(table);) {
        mutator.mutate(puts);
        mutator.flush();
    } catch(Exception ex) {
        final String errorMsg = String.format("Failed with a [%s] when writing to table [%s] ", ex.getMessage(),
          table.getNameAsString());
        throw new SinkConnectorException(errorMsg, ex);
    }
}
 
源代码8 项目: hgraphdb   文件: HBaseBulkLoader.java
private static BufferedMutator getBufferedMutator(HBaseGraph graph, String tableName) {
    try {
        HBaseGraphConfiguration config = graph.configuration();
        TableName name = HBaseGraphUtils.getTableName(config, tableName);
        BufferedMutatorParams params = new BufferedMutatorParams(name).listener(LISTENER);
        return graph.connection().getBufferedMutator(params);
    } catch (IOException e) {
        throw new HBaseGraphException(e);
    }
}
 
源代码9 项目: hgraphdb   文件: HBaseBulkLoader.java
public HBaseBulkLoader(HBaseGraph graph,
                       BufferedMutator edgesMutator,
                       BufferedMutator edgeIndicesMutator,
                       BufferedMutator verticesMutator,
                       BufferedMutator vertexIndicesMutator) {
    this.graph = graph;
    this.edgesMutator = edgesMutator;
    this.edgeIndicesMutator = edgeIndicesMutator;
    this.verticesMutator = verticesMutator;
    this.vertexIndicesMutator = vertexIndicesMutator;
    this.skipWAL = graph.configuration().getBulkLoaderSkipWAL();
}
 
源代码10 项目: hudi   文件: HBaseIndex.java
/**
 * Helper method to facilitate performing mutations (including puts and deletes) in Hbase.
 */
private void doMutations(BufferedMutator mutator, List<Mutation> mutations) throws IOException {
  if (mutations.isEmpty()) {
    return;
  }
  mutator.mutate(mutations);
  mutator.flush();
  mutations.clear();
  sleepForTime(SLEEP_TIME_MILLISECONDS);
}
 
源代码11 项目: beam   文件: HBaseIOTest.java
/** Helper function to create a table and return the rows that it created. */
private static void writeData(String tableId, int numRows) throws Exception {
  Connection connection = admin.getConnection();
  TableName tableName = TableName.valueOf(tableId);
  BufferedMutator mutator = connection.getBufferedMutator(tableName);
  List<Mutation> mutations = makeTableData(numRows);
  mutator.mutate(mutations);
  mutator.flush();
  mutator.close();
}
 
源代码12 项目: hbase   文件: TestExpiredMobFileCleaner.java
private void putKVAndFlush(BufferedMutator table, byte[] row, byte[] value, long ts)
    throws Exception {

  Put put = new Put(row, ts);
  put.addColumn(Bytes.toBytes(family), qf, value);
  table.mutate(put);

  table.flush();
  admin.flush(tableName);
}
 
源代码13 项目: hbase   文件: MultiTableOutputFormat.java
/**
 * @param tableName
 *          the name of the table, as a string
 * @return the named mutator
 * @throws IOException
 *           if there is a problem opening a table
 */
BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException {
  if(this.connection == null){
    this.connection = ConnectionFactory.createConnection(conf);
  }
  if (!mutatorMap.containsKey(tableName)) {
    LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");

    BufferedMutator mutator =
        connection.getBufferedMutator(TableName.valueOf(tableName.get()));
    mutatorMap.put(tableName, mutator);
  }
  return mutatorMap.get(tableName);
}
 
源代码14 项目: hbase   文件: MultiTableOutputFormat.java
@Override
public void close(TaskAttemptContext context) throws IOException {
  for (BufferedMutator mutator : mutatorMap.values()) {
    mutator.close();
  }
  if (connection != null) {
    connection.close();
  }
}
 
@Override
protected void instantiateHTable() throws IOException {
  for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) {
    BufferedMutatorParams params = new BufferedMutatorParams(getTableName(i));
    params.writeBufferSize(4 * 1024 * 1024);
    BufferedMutator table = connection.getBufferedMutator(params);
    this.tables[i] = table;
  }
}
 
源代码16 项目: pentaho-hadoop-shims   文件: HBase10Table.java
private synchronized BufferedMutator getBufferedMutator() throws IOException {
  if ( conn != null ) {
    if ( mutator == null ) {
      mutator = conn.getBufferedMutator( tab.getName() );
    }
  } else {
    throw new IOException( "Can't mutate the table " + tab.getName() );
  }
  return mutator;
}
 
源代码17 项目: flink   文件: HBaseUpsertSinkFunction.java
@Override
public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
	// fail the sink and skip the rest of the items
	// if the failure handler decides to throw an exception
	failureThrowable.compareAndSet(null, exception);
}
 
源代码18 项目: uavstack   文件: HBaseDataStore.java
/**
 * msg 包括:
 * 
 * @param tablename
 * @param entity:
 *            rowkey->cf:column->value 其中增加对_timestamp字段的处理
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected boolean insert(DataStoreMsg msg) {

    // 根据TABLE名进行合法验证
    Map[] maps = (Map[]) adaptor.prepareInsertObj(msg, datasource.getDataStoreConnection());
    Map<byte[], Map> entity = maps[0];
    Map<byte[], Long> entityStamp = maps[1];
    String tableName = (String) msg.get(DataStoreProtocol.HBASE_TABLE_NAME);
    // add write buffer
    BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));

    params.writeBufferSize(1024 * 1024 * 2);
    try (BufferedMutator table = datasource.getSourceConnect().getBufferedMutator(params);) {

        // 取得所有cf
        List<Put> puts = Lists.newArrayList();
        Put put = null;
        for (byte[] rowkey : entity.keySet()) {
            // 定制时间戳
            put = entityStamp.containsKey(rowkey) ? new Put(rowkey, entityStamp.get(rowkey)) : new Put(rowkey);

            // 取得column和value
            for (Object entry : entity.get(rowkey).keySet()) {

                String[] column = ((String) entry).split(":");
                put.addColumn(Bytes.toBytes(column[0]), Bytes.toBytes(column[1]),
                        Bytes.toBytes((String) entity.get(rowkey).get(entry)));
            }
            puts.add(put);
        }
        // 批量提交
        Object[] results = new Object[puts.size()];
        // table.batch(puts, results);
        table.mutate(puts);
        // flush
        table.flush();
        // 根据插入信息操作并返回结果
        return adaptor.handleInsertResult(results, msg, datasource.getDataStoreConnection());
    }
    catch (IOException e) {
        log.err(this, "INSERT HBASE TABLE[" + tableName + "] FAIL:" + msg.toJSONString(), e);
        return false;
    }

}
 
源代码19 项目: flink   文件: HBaseSinkFunction.java
@Override
public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
	// fail the sink and skip the rest of the items
	// if the failure handler decides to throw an exception
	failureThrowable.compareAndSet(null, exception);
}
 
源代码20 项目: hbase   文件: ThriftConnection.java
@Override
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
  throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
}
 
源代码21 项目: hbase   文件: ThriftConnection.java
@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
  throw new NotImplementedException("batchCoprocessorService not supported in ThriftTable");
}
 
源代码22 项目: hbase   文件: SnapshotTestingUtils.java
public static void loadData(final HBaseTestingUtility util, final TableName tableName, int rows,
    byte[]... families) throws IOException, InterruptedException {
  BufferedMutator mutator = util.getConnection().getBufferedMutator(tableName);
  loadData(util, mutator, rows, families);
}
 
源代码23 项目: hbase   文件: TestMultiTableInputFormatBase.java
@Override
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
  return null;
}
 
源代码24 项目: hbase   文件: TestMultiTableInputFormatBase.java
@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
  return null;
}
 
源代码25 项目: hbase   文件: TestTableInputFormatBase.java
@Override
public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
  throw new UnsupportedOperationException();
}
 
源代码26 项目: hbase   文件: TestTableInputFormatBase.java
@Override
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
  throw new UnsupportedOperationException();
}
 
源代码27 项目: cloud-bigtable-examples   文件: WritePerfTest.java
protected static void runMutationTests(Connection conn, TableName tableName, long rowCount,
    int valueSize) throws IOException {
  System.out.println("starting mutations");
  Stopwatch uberStopwatch = Stopwatch.createUnstarted();
  Stopwatch incrementalStopwatch = Stopwatch.createUnstarted();
  try (BufferedMutator mutator = conn.getBufferedMutator(tableName)) {
    // Use the same value over and over again. Creating new random data takes time. Don't count
    // creating a large array towards Bigtable performance
    byte[] value = Bytes.toBytes(RandomStringUtils.randomAlphanumeric(valueSize));
    incrementalStopwatch.start();
    for (long i = 1; i < 10; i++) {
      // The first few writes are slow.
      doPut(mutator, value);
    }
    mutator.flush();
    BigtableUtilities.printPerformance("starter batch", incrementalStopwatch, 10);

    uberStopwatch.reset();
    incrementalStopwatch.reset();
    uberStopwatch.start();
    incrementalStopwatch.start();
    for (int i = 0; i < rowCount - 10; i++) {
      doPut(mutator, value);
      if (i > 0 && i % PRINT_COUNT == 0) {
        BigtableUtilities.printPerformance("one batch", incrementalStopwatch, PRINT_COUNT);
        BigtableUtilities.printPerformance("average so far", uberStopwatch, i);
        incrementalStopwatch.reset();
        incrementalStopwatch.start();
      }
    }
    incrementalStopwatch.reset();
    incrementalStopwatch.start();
    System.out.println("Flushing");
    mutator.flush();
    System.out.println(String.format("Flush took %d ms.",
            incrementalStopwatch.elapsed(TimeUnit.MILLISECONDS)));
    BigtableUtilities.printPerformance("full batch", uberStopwatch, Math.toIntExact(rowCount));
  } catch (RetriesExhaustedWithDetailsException e) {
    logExceptions(e);
  }
}
 
源代码28 项目: cloud-bigtable-examples   文件: WritePerfTest.java
protected static void doPut(BufferedMutator mutator, byte[] value) throws IOException {
  byte[] key = Bytes.toBytes(RandomStringUtils.randomAlphanumeric(10));
  mutator.mutate(new Put(key, System.currentTimeMillis()).addColumn(BigtableUtilities.FAMILY,
  BigtableUtilities.QUALIFIER, value));
}
 
源代码29 项目: geowave   文件: HBaseOperations.java
public BufferedMutator getBufferedMutator(final TableName tableName) throws IOException {
  final BufferedMutatorParams params = new BufferedMutatorParams(tableName);

  return conn.getBufferedMutator(params);
}
 
源代码30 项目: geowave   文件: HBaseMetadataWriter.java
public HBaseMetadataWriter(final BufferedMutator writer, final MetadataType metadataType) {
  this.writer = writer;
  metadataTypeBytes = StringUtils.stringToBinary(metadataType.name());
}
 
 类方法
 同包方法