类org.apache.hadoop.hbase.client.Connection源码实例Demo

下面列出了怎么用org.apache.hadoop.hbase.client.Connection的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: java-docs-samples   文件: Reads.java
public static void readPrefix(String projectId, String instanceId, String tableId) {
  // Initialize client that will be used to send requests. This client only needs to be created
  // once, and can be reused for multiple requests. After completing all of your requests, call
  // the "close" method on the client to safely clean up any remaining background resources.
  try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)) {
    Table table = connection.getTable(TableName.valueOf(tableId));
    Scan prefixScan = new Scan().setRowPrefixFilter(Bytes.toBytes("phone"));
    ResultScanner rows = table.getScanner(prefixScan);

    for (Result row : rows) {
      printRow(row);
    }
  } catch (IOException e) {
    System.out.println(
        "Unable to initialize service client, as a network error occurred: \n" + e.toString());
  }
}
 
源代码2 项目: hbase   文件: SecureTestUtil.java
/**
 * Grant permissions on a namespace to the given user using AccessControl Client.
 * Will wait until all active AccessController instances have updated their permissions caches
 * or will throw an exception upon timeout (10 seconds).
 */
public static void grantOnNamespaceUsingAccessControlClient(final HBaseTestingUtility util,
    final Connection connection, final String user, final String namespace,
    final Permission.Action... actions) throws Exception {
  SecureTestUtil.updateACLs(util, new Callable<Void>() {
    @Override
    public Void call() throws Exception {
      try {
        AccessControlClient.grant(connection, namespace, user, actions);
      } catch (Throwable t) {
        LOG.error("grant failed: ", t);
      }
      return null;
    }
  });
}
 
源代码3 项目: spliceengine   文件: HFileGenerationFunction.java
private static InetSocketAddress getFavoredNode(BulkImportPartition partition) throws IOException {
    InetSocketAddress favoredNode = null;
    SConfiguration configuration = HConfiguration.getConfiguration();
    Connection connection = HBaseConnectionFactory.getInstance(configuration).getConnection();

    String regionName = partition.getRegionName();
    HRegionLocation regionLocation = MetaTableAccessor.getRegionLocation(connection,
            com.splicemachine.primitives.Bytes.toBytesBinary(regionName));

    if (regionLocation != null) {
        String hostname = regionLocation.getHostname();
        int port = regionLocation.getPort();

        InetSocketAddress address = new InetSocketAddress(hostname, port);
        if (!address.isUnresolved()) {
            favoredNode = address;
        } else {
            SpliceLogUtils.info(LOG, "Cannot resolve host %s to achieve better data locality.", hostname);
        }

    }
    else {
        SpliceLogUtils.info(LOG, "Cannot to get region location %s to achieve better data locality.", regionName);
    }
    return favoredNode;
}
 
源代码4 项目: hraven   文件: ProcessRecordUpdater.java
@Override
public Boolean call() throws Exception {

  ProcessRecord updatedRecord = null;
  Connection hbaseConnection = null;
  try {
    hbaseConnection = ConnectionFactory.createConnection(hbaseConf);
    // Connect only when needed.
    ProcessRecordService processRecordService =
        new ProcessRecordService(hbaseConf, hbaseConnection);

    updatedRecord =
        processRecordService.setProcessState(processRecord, newState);
  } finally {
    if (hbaseConnection != null) {
      hbaseConnection.close();
    }
  }

  if ((updatedRecord != null)
      && (updatedRecord.getProcessState() == newState)) {
    return true;
  }
  return false;
}
 
源代码5 项目: hbase   文件: TestCellACLWithMultipleVersions.java
private void verifyUserDeniedForDeleteExactVersion(final User user, final byte[] row,
    final byte[] q1, final byte[] q2) throws IOException, InterruptedException {
  user.runAs(new PrivilegedExceptionAction<Void>() {
    @Override
    public Void run() throws Exception {
      try (Connection connection = ConnectionFactory.createConnection(conf)) {
        try (Table t = connection.getTable(testTable.getTableName())) {
          Delete d = new Delete(row, 127);
          d.addColumns(TEST_FAMILY1, q1);
          d.addColumns(TEST_FAMILY1, q2);
          d.addFamily(TEST_FAMILY2, 129);
          t.delete(d);
          fail(user.getShortName() + " can not do the delete");
        } catch (Exception e) {

        }
      }
      return null;
    }
  });
}
 
源代码6 项目: phoenix   文件: SystemCatalogWALEntryFilterIT.java
public WAL.Entry getEntry(TableName tableName, Get get) throws IOException {
  WAL.Entry entry = null;
  try(Connection conn = ConnectionFactory.createConnection(getUtility().getConfiguration())){
    Table htable = conn.getTable(tableName);
    Result result = htable.get(get);
    WALEdit edit = new WALEdit();
    if (result != null) {
      List<Cell> cellList = result.listCells();
      Assert.assertNotNull("Didn't retrieve any cells from SYSTEM.CATALOG", cellList);
      for (Cell c : cellList) {
        edit.add(c);
      }
    }
    Assert.assertTrue("Didn't retrieve any cells from SYSTEM.CATALOG",
        edit.getCells().size() > 0);
    WALKeyImpl key = new WALKeyImpl(REGION, tableName, 0, 0, uuid);
    entry = new WAL.Entry(key, edit);
  }
  return entry;
}
 
源代码7 项目: phoenix   文件: IndexSplitTransaction.java
private void offlineParentInMetaAndputMetaEntries(Connection conn,
    HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
    ServerName serverName, List<Mutation> metaEntries) throws IOException {
  List<Mutation> mutations = metaEntries;
  HRegionInfo copyOfParent = new HRegionInfo(parent);
  copyOfParent.setOffline(true);
  copyOfParent.setSplit(true);

  //Put for parent
  Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent);
  MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB);
  mutations.add(putParent);
  
  //Puts for daughters
  Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA);
  Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB);

  addLocation(putA, serverName, 1); //these are new regions, openSeqNum = 1 is fine.
  addLocation(putB, serverName, 1);
  mutations.add(putA);
  mutations.add(putB);
  MetaTableAccessor.mutateMetaTable(conn, mutations);
}
 
源代码8 项目: styx   文件: BigtableStorage.java
@VisibleForTesting
BigtableStorage(Connection connection,
                ExecutorService executor,
                WaitStrategy waitStrategy,
                StopStrategy retryStopStrategy) {
  this.connection = Objects.requireNonNull(connection, "connection");

  this.executor = MDCUtil.withMDC(Context.currentContextExecutor(
      register(closer, Objects.requireNonNull(executor, "executor"), "bigtable-storage")));

  retryer = RetryerBuilder.<Void>newBuilder()
      .retryIfExceptionOfType(IOException.class)
      .withWaitStrategy(Objects.requireNonNull(waitStrategy, "waitStrategy"))
      .withStopStrategy(Objects.requireNonNull(retryStopStrategy, "retryStopStrategy"))
      .withRetryListener(BigtableStorage::onRequestAttempt)
      .build();
}
 
源代码9 项目: kylin-on-parquet-v2   文件: CleanHtableCLI.java
private void clean() throws IOException {
    Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
    Admin hbaseAdmin = conn.getAdmin();

    for (HTableDescriptor descriptor : hbaseAdmin.listTables()) {
        String name = descriptor.getNameAsString().toLowerCase(Locale.ROOT);
        if (name.startsWith("kylin") || name.startsWith("_kylin")) {
            String x = descriptor.getValue(IRealizationConstants.HTableTag);
            System.out.println("table name " + descriptor.getNameAsString() + " host: " + x);
            System.out.println(descriptor);
            System.out.println();

            descriptor.setValue(IRealizationConstants.HTableOwner, "[email protected]");
            hbaseAdmin.modifyTable(TableName.valueOf(descriptor.getNameAsString()), descriptor);
        }
    }
    hbaseAdmin.close();
}
 
源代码10 项目: hraven   文件: JobFileProcessor.java
/**
 * @param conf used to communicate arguments to the running jobs.
 * @param hbaseConnection used to create Table references connecting to HBase.
 * @param cluster for which we are processing
 * @param reprocess Reprocess those records that may have been processed
 *          already. Otherwise successfully processed job files are skipped.
 * @param batchSize the total number of jobs to process in a batch (a MR job
 *          scanning these many records in the raw table).
 * @param minJobId used to start the scan. If null then there is no min limit
 *          on JobId.
 * @param maxJobId used to end the scan (inclusive). If null then there is no
 *          max limit on jobId.
 * @throws IOException
 * @throws InterruptedException
 * @throws ClassNotFoundException
 * @throws ExecutionException
 * @throws RowKeyParseException
 */
private List<JobRunner> getJobRunners(Configuration conf, Connection hbaseConnection, String cluster,
    boolean reprocess, int batchSize, String minJobId, String maxJobId)
    throws IOException, InterruptedException, ClassNotFoundException,
    RowKeyParseException {
  List<JobRunner> jobRunners = new LinkedList<JobRunner>();

  JobHistoryRawService jobHistoryRawService = new JobHistoryRawService(hbaseConnection);

  // Bind all MR jobs together with one runID.
  long now = System.currentTimeMillis();
  conf.setLong(Constants.MR_RUN_CONF_KEY, now);

  List<Scan> scanList = jobHistoryRawService.getHistoryRawTableScans(cluster,
      minJobId, maxJobId, reprocess, batchSize);

  for (Scan scan : scanList) {
    Job job = getProcessingJob(conf, scan, scanList.size());

    JobRunner jobRunner = new JobRunner(job, null);
    jobRunners.add(jobRunner);
  }

  return jobRunners;

}
 
源代码11 项目: hbase   文件: PerformanceEvaluation.java
private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException,
    InterruptedException, ClassNotFoundException, ExecutionException {
  // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
  // the TestOptions introspection for us and dump the output in a readable format.
  LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
  Admin admin = null;
  Connection connection = null;
  try {
    connection = ConnectionFactory.createConnection(getConf());
    admin = connection.getAdmin();
    checkTable(admin, opts);
  } finally {
    if (admin != null) admin.close();
    if (connection != null) connection.close();
  }
  if (opts.nomapred) {
    doLocalClients(opts, getConf());
  } else {
    doMapReduce(opts, getConf());
  }
}
 
源代码12 项目: antsdb   文件: CheckPoint.java
public void readFromHBase(Connection conn) throws IOException {
    // Get table object
    Table table = conn.getTable(this.tn);
    
    // Query row
    Get get = new Get(KEY);
    Result result = table.get(get);
    if (!result.isEmpty()) {
        this.currentSp = Bytes.toLong(get(result, "currentSp"));
        this.serverId = Bytes.toLong(get(result, "serverId"));
        this.createTimestamp = Optional.ofNullable(get(result, "createTimestamp")).map(Bytes::toLong).orElse(0l);
        this.updateTimestamp = Optional.ofNullable(get(result, "updateTimestamp")).map(Bytes::toLong).orElse(0l);
        this.createOrcaVersion = Bytes.toString(get(result, "createOrcaVersion"));
        this.updateorcaVersion = Bytes.toString(get(result, "updateOrcaVersion"));
        this.isActive = Optional.ofNullable(get(result, "isActive")).map(Bytes::toBoolean).orElse(Boolean.FALSE);
    }
}
 
源代码13 项目: hbase   文件: TestVisibilityLabelsWithDeletes.java
@Test
public void testDeleteFamilyWithoutCellVisibilityWithMulipleVersions() throws Exception {
  setAuths();
  final TableName tableName = TableName.valueOf(testName.getMethodName());
  try (Table table = doPutsWithoutVisibility(tableName)) {
    TEST_UTIL.getAdmin().flush(tableName);
    PrivilegedExceptionAction<Void> actiona = new PrivilegedExceptionAction<Void>() {
      @Override
      public Void run() throws Exception {
        try (Connection connection = ConnectionFactory.createConnection(conf);
          Table table = connection.getTable(tableName)) {
          Delete d = new Delete(row1);
          d.addFamily(fam);
          table.delete(d);
        } catch (Throwable t) {
          throw new IOException(t);
        }
        return null;
      }
    };
    SUPERUSER.runAs(actiona);

    TEST_UTIL.getAdmin().flush(tableName);
    Scan s = new Scan();
    s.readVersions(5);
    s.setAuthorizations(new Authorizations(SECRET, PRIVATE, CONFIDENTIAL, TOPSECRET));
    ResultScanner scanner = table.getScanner(s);
    Result[] next = scanner.next(3);
    assertTrue(next.length == 1);
    // All cells wrt row1 should be deleted as we are not passing the Cell Visibility
    CellScanner cellScanner = next[0].cellScanner();
    cellScanner.advance();
    Cell current = cellScanner.current();
    assertTrue(Bytes.equals(current.getRowArray(), current.getRowOffset(), current.getRowLength(),
      row2, 0, row2.length));
  }
}
 
源代码14 项目: antsdb   文件: HBaseUtil.java
public static void closeQuietly(Connection conn) {
    try {
        if (conn != null) {
            conn.close();
        }
    }
    catch (Exception ignored) {}
}
 
源代码15 项目: hbase   文件: TokenUtil.java
/**
 * Checks if an authentication tokens exists for the connected cluster,
 * obtaining one if needed and adding it to the user's credentials.
 *
 * @param conn The HBase cluster connection
 * @param user The user for whom to obtain the token
 * @throws IOException If making a remote call to the authentication service fails
 * @throws InterruptedException If executing as the given user is interrupted
 * @return true if the token was added, false if it already existed
 */
public static boolean addTokenIfMissing(Connection conn, User user)
    throws IOException, InterruptedException {
  Token<AuthenticationTokenIdentifier> token = getAuthToken(conn, user);
  if (token == null) {
    token = ClientTokenUtil.obtainToken(conn, user);
    user.getUGI().addToken(token.getService(), token);
    return true;
  }
  return false;
}
 
@Before
public void createTable() throws Exception {
  tableName = TableName.valueOf(name.getMethodName());

  // Create a table and write a record as the service user (hbase)
  UserGroupInformation serviceUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(
      "hbase/localhost", KEYTAB_FILE.getAbsolutePath());
  clusterId = serviceUgi.doAs(new PrivilegedExceptionAction<String>() {
    @Override public String run() throws Exception {
      try (Connection conn = ConnectionFactory.createConnection(CONF);
          Admin admin = conn.getAdmin();) {
        admin.createTable(TableDescriptorBuilder
            .newBuilder(tableName)
            .setColumnFamily(ColumnFamilyDescriptorBuilder.of("f1"))
            .build());

        UTIL.waitTableAvailable(tableName);

        try (Table t = conn.getTable(tableName)) {
          Put p = new Put(Bytes.toBytes("r1"));
          p.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
          t.put(p);
        }

        return admin.getClusterMetrics().getClusterId();
      }
    }
  });

  assertNotNull(clusterId);
}
 
源代码17 项目: kylin   文件: HBaseConnection.java
public static boolean tableExists(Connection conn, String tableName) throws IOException {
    Admin hbase = conn.getAdmin();
    try {
        return hbase.tableExists(TableName.valueOf(tableName));
    } finally {
        hbase.close();
    }
}
 
源代码18 项目: hbase   文件: BackupSystemTable.java
public BackupSystemTable(Connection conn) throws IOException {
  this.connection = conn;
  Configuration conf = this.connection.getConfiguration();
  tableName = BackupSystemTable.getTableName(conf);
  bulkLoadTableName = BackupSystemTable.getTableNameForBulkLoadedData(conf);
  checkSystemTable();
}
 
源代码19 项目: hbase   文件: PermissionStorage.java
/**
 * Reads user permission assignments stored in the <code>l:</code> column family of the first
 * table row in <code>_acl_</code>.
 * <p>
 * See {@link PermissionStorage class documentation} for the key structure used for storage.
 * </p>
 */
static ListMultimap<String, UserPermission> getPermissions(Configuration conf, byte[] entryName,
    Table t, byte[] cf, byte[] cq, String user, boolean hasFilterUser) throws IOException {
  if (entryName == null) {
    entryName = ACL_GLOBAL_NAME;
  }
  // for normal user tables, we just read the table row from _acl_
  ListMultimap<String, UserPermission> perms = ArrayListMultimap.create();
  Get get = new Get(entryName);
  get.addFamily(ACL_LIST_FAMILY);
  Result row = null;
  if (t == null) {
    try (Connection connection = ConnectionFactory.createConnection(conf)) {
      try (Table table = connection.getTable(ACL_TABLE_NAME)) {
        row = table.get(get);
      }
    }
  } else {
    row = t.get(get);
  }
  if (!row.isEmpty()) {
    perms = parsePermissions(entryName, row, cf, cq, user, hasFilterUser);
  } else {
    LOG.info("No permissions found in " + ACL_TABLE_NAME + " for acl entry "
        + Bytes.toString(entryName));
  }

  return perms;
}
 
源代码20 项目: hbase-indexer   文件: Indexer.java
/**
 * Instantiate an indexer based on the given {@link IndexerConf}.
 */
public static Indexer createIndexer(String indexerName, IndexerConf conf, String tableName, ResultToSolrMapper mapper,
                                    Connection tablePool, Sharder sharder, SolrInputDocumentWriter solrWriter) {
    switch (conf.getMappingType()) {
        case COLUMN:
            return new ColumnBasedIndexer(indexerName, conf, tableName, mapper, sharder, solrWriter);
        case ROW:
            return new RowBasedIndexer(indexerName, conf, tableName, mapper, tablePool, sharder, solrWriter);
        default:
            throw new IllegalStateException("Can't determine the type of indexing to use for mapping type "
                    + conf.getMappingType());
    }
}
 
源代码21 项目: antsdb   文件: HumpbackStorageCheckMain.java
private void checkHbase(ConfigService conf) throws Exception {
    // check configuration
    
    String hbaseConfPath = conf.getHBaseConf(); 
    if (hbaseConfPath != null) {
        if (!new File(hbaseConfPath).exists()) {
            println("error: hbase config %s is not found", hbaseConfPath);
            System.exit(0);
        }
        println("hbase config: %s", hbaseConfPath);
    }
    else if (conf.getProperty("hbase.zookeeper.quorum", null) == null) {
        println("error: hbase is not configured");
        System.exit(0);
    }
    
    // check the connection 
    
    Configuration hbaseConf = HBaseStorageService.getHBaseConfig(conf);
    hbaseConf.set("hbase.client.retries.number", "0");
    if (hbaseConf.get("hbase.zookeeper.quorum") != null) {
        println("zookeeper quorum: %s", hbaseConf.get("hbase.zookeeper.quorum"));
    }
    try {
        Connection conn = ConnectionFactory.createConnection(hbaseConf);
        conn.getAdmin().listNamespaceDescriptors();
        conn.close();
        println("quorum is connected");
    }
    catch (Exception x) {
        println("error: unable to connect to quorum");
    }
}
 
源代码22 项目: kylin-on-parquet-v2   文件: HBaseLookupTable.java
public HBaseLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
    String tableName = extTableSnapshot.getStorageLocationIdentifier();
    this.lookupTableName = TableName.valueOf(tableName);
    KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
    Connection connection = HBaseConnection.get(kylinConfig.getStorageUrl());
    try {
        table = connection.getTable(lookupTableName);
    } catch (IOException e) {
        throw new RuntimeException("error when connect HBase", e);
    }

    String[] keyColumns = extTableSnapshot.getKeyColumns();
    encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, extTableSnapshot.getShardNum());
}
 
源代码23 项目: hbase   文件: TestMasterQuotasObserver.java
@Before
public void removeAllQuotas() throws Exception {
  if (helper == null) {
    helper = new SpaceQuotaHelperForTests(TEST_UTIL, testName, new AtomicLong());
  }
  final Connection conn = TEST_UTIL.getConnection();
  // Wait for the quota table to be created
  if (!conn.getAdmin().tableExists(QuotaUtil.QUOTA_TABLE_NAME)) {
    helper.waitForQuotaTable(conn);
  } else {
    // Or, clean up any quotas from previous test runs.
    helper.removeAllQuotas(conn);
    assertEquals(0, helper.listNumDefinedQuotas(conn));
  }
}
 
源代码24 项目: hbase   文件: PerformanceEvaluation.java
RandomReadTest(Connection con, TestOptions options, Status status) {
  super(con, options, status);
  consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
  if (opts.multiGet > 0) {
    LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
    this.gets = new ArrayList<>(opts.multiGet);
  }
}
 
源代码25 项目: hbase   文件: IntegrationTestBackupRestore.java
private void loadData(TableName table, int numRows) throws IOException {
  Connection conn = util.getConnection();
  // #0- insert some data to a table
  Table t1 = conn.getTable(table);
  util.loadRandomRows(t1, new byte[]{'f'}, 100, numRows);
  // flush table
  conn.getAdmin().flush(TableName.valueOf(table.getName()));
}
 
源代码26 项目: hbase   文件: TestMasterRepairMode.java
@Test
public void testExistingCluster() throws Exception {
  TableName testRepairMode = TableName.valueOf(name.getMethodName());

  TEST_UTIL.startMiniCluster();
  Table t = TEST_UTIL.createTable(testRepairMode, FAMILYNAME);
  Put p = new Put(Bytes.toBytes("r"));
  p.addColumn(FAMILYNAME, Bytes.toBytes("c"), new byte[0]);
  t.put(p);

  TEST_UTIL.shutdownMiniHBaseCluster();

  LOG.info("Starting master-only");

  enableMaintenanceMode();
  TEST_UTIL.startMiniHBaseCluster(StartMiniClusterOption.builder()
      .numRegionServers(0).createRootDir(false).build());

  Connection conn = TEST_UTIL.getConnection();
  assertTrue(conn.getAdmin().isMasterInMaintenanceMode());

  try (Table table = conn.getTable(TableName.META_TABLE_NAME);
      ResultScanner scanner = table.getScanner(HConstants.TABLE_FAMILY);
      Stream<Result> results = StreamSupport.stream(scanner.spliterator(), false)) {
    assertTrue("Did not find user table records while reading hbase:meta",
        results.anyMatch(r -> Arrays.equals(r.getRow(), testRepairMode.getName())));
  }

  try (Table table = conn.getTable(testRepairMode);
      ResultScanner scanner = table.getScanner(new Scan())) {
    scanner.next();
    fail("Should not be able to access user-space tables in repair mode.");
  } catch (Exception e) {
    // Expected
  }
}
 
源代码27 项目: java-study   文件: HBaseUtil.java
/**
 * 获取连接
 * 
 * @return
 */
public synchronized static Connection getConnection() {
	try {
		if (null == con || con.isClosed()) {
			// 获得连接对象
			con = ConnectionFactory.createConnection(conf);
		}
	} catch (IOException e) {
		System.out.println("获取连接失败!");
		e.printStackTrace();
	}

	return con;
}
 
源代码28 项目: hbase   文件: SecureTestUtil.java
/**
 * Revoke permissions globally from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeGlobal(final User caller, final HBaseTestingUtility util,
    final String user, final Permission.Action... actions) throws Exception {
  SecureTestUtil.updateACLs(util, new Callable<Void>() {
    @Override
    public Void call() throws Exception {
      Configuration conf = util.getConfiguration();
      try (Connection connection = ConnectionFactory.createConnection(conf, caller)) {
        connection.getAdmin().revoke(
          new UserPermission(user, Permission.newBuilder().withActions(actions).build()));
      }
      return null;
    }
  });
}
 
源代码29 项目: super-cloudops   文件: HfileBulkExporter.java
/**
 * Do hfile bulk exporting
 * 
 * @param builder
 * @throws Exception
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void doExporting(CommandLine line) throws Exception {
	// Configuration.
	String tabname = line.getOptionValue("tabname");
	String user = line.getOptionValue("user");
	Configuration conf = new Configuration();
	conf.set("hbase.zookeeper.quorum", line.getOptionValue("zkaddr"));
	conf.set("hbase.fs.tmp.dir", line.getOptionValue("T", DEFAULT_HBASE_MR_TMPDIR));
	conf.set(TableInputFormat.INPUT_TABLE, tabname);
	conf.set(TableInputFormat.SCAN_BATCHSIZE, line.getOptionValue("batchSize", DEFAULT_SCAN_BATCH_SIZE));

	// Check directory.
	String outputDir = line.getOptionValue("output", DEFAULT_HFILE_OUTPUT_DIR) + "/" + tabname;
	FileSystem fs = FileSystem.get(new URI(outputDir), new Configuration(), user);
	state(!fs.exists(new Path(outputDir)), format("HDFS temporary directory already has data, path: '%s'", outputDir));

	// Set scan condition.(if necessary)
	setScanIfNecessary(conf, line);

	// Job.
	Connection conn = ConnectionFactory.createConnection(conf);
	TableName tab = TableName.valueOf(tabname);
	Job job = Job.getInstance(conf);
	job.setJobName(HfileBulkExporter.class.getSimpleName() + "@" + tab.getNameAsString());
	job.setJarByClass(HfileBulkExporter.class);
	job.setMapperClass((Class<Mapper>) ClassUtils.getClass(line.getOptionValue("mapperClass", DEFAULT_MAPPER_CLASS)));
	job.setInputFormatClass(TableInputFormat.class);
	job.setMapOutputKeyClass(ImmutableBytesWritable.class);
	job.setMapOutputValueClass(Put.class);

	HFileOutputFormat2.configureIncrementalLoad(job, conn.getTable(tab), conn.getRegionLocator(tab));
	FileOutputFormat.setOutputPath(job, new Path(outputDir));
	if (job.waitForCompletion(true)) {
		long total = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_TOTAL).getValue();
		long processed = job.getCounters().findCounter(DEFUALT_COUNTER_GROUP, DEFUALT_COUNTER_PROCESSED).getValue();
		log.info(String.format("Exported to successfully! with processed:(%d)/total:(%d)", processed, total));
	}

}
 
源代码30 项目: hbase   文件: TestSlowLogAccessor.java
private int getTableCount(Connection connection) {
  try (Table table = connection.getTable(SlowLogTableAccessor.SLOW_LOG_TABLE_NAME)) {
    ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
    int count = 0;
    for (Result result : resultScanner) {
      ++count;
    }
    return count;
  } catch (Exception e) {
    return 0;
  }
}
 
 同包方法