类org.apache.hadoop.hbase.security.UserProvider源码实例Demo

下面列出了怎么用org.apache.hadoop.hbase.security.UserProvider的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: tajo   文件: HBaseTablespace.java
HConnectionKey(Configuration conf) {
  Map<String, String> m = new HashMap<>();
  if (conf != null) {
    for (String property : CONNECTION_PROPERTIES) {
      String value = conf.get(property);
      if (value != null) {
        m.put(property, value);
      }
    }
  }
  this.properties = Collections.unmodifiableMap(m);

  try {
    UserProvider provider = UserProvider.instantiate(conf);
    User currentUser = provider.getCurrent();
    if (currentUser != null) {
      username = currentUser.getName();
    }
  } catch (IOException ioe) {
    LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe);
  }
}
 
源代码2 项目: hbase   文件: TestThriftServer.java
/**
 * Check that checkAndPut fails if the cell does not exist, then put in the cell, then check that
 * the checkAndPut succeeds.
 */
public static void doTestCheckAndPut() throws Exception {
  ThriftHBaseServiceHandler handler =
    new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
      UserProvider.instantiate(UTIL.getConfiguration()));
  handler.createTable(tableAname, getColumnDescriptors());
  try {
    List<Mutation> mutations = new ArrayList<>(1);
    mutations.add(new Mutation(false, columnAname, valueAname, true));
    Mutation putB = (new Mutation(false, columnBname, valueBname, true));

    assertFalse(handler.checkAndPut(tableAname, rowAname, columnAname, valueAname, putB, null));

    handler.mutateRow(tableAname, rowAname, mutations, null);

    assertTrue(handler.checkAndPut(tableAname, rowAname, columnAname, valueAname, putB, null));

    TRowResult rowResult = handler.getRow(tableAname, rowAname, null).get(0);
    assertEquals(rowAname, rowResult.row);
    assertEquals(valueBname, rowResult.columns.get(columnBname).value);
  } finally {
    handler.disableTable(tableAname);
    handler.deleteTable(tableAname);
  }
}
 
源代码3 项目: hbase   文件: TestSecureExport.java
/**
 * Sets the security firstly for getting the correct default realm.
 */
@BeforeClass
public static void beforeClass() throws Exception {
  UserProvider.setUserProviderForTesting(UTIL.getConfiguration(),
      HadoopSecurityEnabledUserProviderForTesting.class);
  setUpKdcServer();
  SecureTestUtil.enableSecurity(UTIL.getConfiguration());
  UTIL.getConfiguration().setBoolean(AccessControlConstants.EXEC_PERMISSION_CHECKS_KEY, true);
  VisibilityTestUtil.enableVisiblityLabels(UTIL.getConfiguration());
  SecureTestUtil.verifyConfiguration(UTIL.getConfiguration());
  setUpClusterKdc();
  UTIL.startMiniCluster();
  UTIL.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
  UTIL.waitUntilAllRegionsAssigned(VisibilityConstants.LABELS_TABLE_NAME);
  UTIL.waitTableEnabled(PermissionStorage.ACL_TABLE_NAME, 50000);
  UTIL.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME, 50000);
  SecureTestUtil.grantGlobal(UTIL, USER_ADMIN,
          Permission.Action.ADMIN,
          Permission.Action.CREATE,
          Permission.Action.EXEC,
          Permission.Action.READ,
          Permission.Action.WRITE);
  addLabels(UTIL.getConfiguration(), Arrays.asList(USER_OWNER),
          Arrays.asList(PRIVATE, CONFIDENTIAL, SECRET, TOPSECRET));
}
 
源代码4 项目: hbase   文件: SnapshotScannerHDFSAclController.java
@Override
public void preMasterInitialization(ObserverContext<MasterCoprocessorEnvironment> c)
    throws IOException {
  if (c.getEnvironment().getConfiguration()
      .getBoolean(SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE, false)) {
    MasterCoprocessorEnvironment mEnv = c.getEnvironment();
    if (!(mEnv instanceof HasMasterServices)) {
      throw new IOException("Does not implement HMasterServices");
    }
    masterServices = ((HasMasterServices) mEnv).getMasterServices();
    hdfsAclHelper = new SnapshotScannerHDFSAclHelper(masterServices.getConfiguration(),
        masterServices.getConnection());
    pathHelper = hdfsAclHelper.getPathHelper();
    hdfsAclHelper.setCommonDirectoryPermission();
    initialized = true;
    userProvider = UserProvider.instantiate(c.getEnvironment().getConfiguration());
  } else {
    LOG.warn("Try to initialize the coprocessor SnapshotScannerHDFSAclController but failure "
        + "because the config " + SnapshotScannerHDFSAclHelper.ACL_SYNC_TO_HDFS_ENABLE
        + " is false.");
  }
}
 
源代码5 项目: hbase   文件: HBaseFsck.java
private void preCheckPermission() throws IOException {
  if (shouldIgnorePreCheckPermission()) {
    return;
  }

  Path hbaseDir = CommonFSUtils.getRootDir(getConf());
  FileSystem fs = hbaseDir.getFileSystem(getConf());
  UserProvider userProvider = UserProvider.instantiate(getConf());
  UserGroupInformation ugi = userProvider.getCurrent().getUGI();
  FileStatus[] files = fs.listStatus(hbaseDir);
  for (FileStatus file : files) {
    try {
      fs.access(file.getPath(), FsAction.WRITE);
    } catch (AccessControlException ace) {
      LOG.warn("Got AccessDeniedException when preCheckPermission ", ace);
      errors.reportError(ERROR_CODE.WRONG_USAGE, "Current user " + ugi.getUserName()
        + " does not have write perms to " + file.getPath()
        + ". Please rerun hbck as hdfs user " + file.getOwner());
      throw ace;
    }
  }
}
 
源代码6 项目: hbase   文件: TestSecureBulkLoadHFiles.java
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  // set the always on security provider
  UserProvider.setUserProviderForTesting(util.getConfiguration(),
    HadoopSecurityEnabledUserProviderForTesting.class);
  // setup configuration
  SecureTestUtil.enableSecurity(util.getConfiguration());
  util.getConfiguration().setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
    MAX_FILES_PER_REGION_PER_FAMILY);
  // change default behavior so that tag values are returned with normal rpcs
  util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
    KeyValueCodecWithTags.class.getCanonicalName());

  util.startMiniCluster();

  // Wait for the ACL table to become available
  util.waitTableEnabled(PermissionStorage.ACL_TABLE_NAME);

  setupNamespace();
}
 
源代码7 项目: hbase   文件: TestConnectionCache.java
/**
 * test for ConnectionCache cleaning expired Connection
 */
@Test
public void testConnectionChore() throws Exception {
  UTIL.startMiniCluster();

  //1s for clean interval & 5s for maxIdleTime
  ConnectionCache cache = new ConnectionCache(UTIL.getConfiguration(),
      UserProvider.instantiate(UTIL.getConfiguration()), 1000, 5000);
  ConnectionCache.ConnectionInfo info = cache.getCurrentConnection();

  assertEquals(false, info.connection.isClosed());

  Thread.sleep(7000);

  assertEquals(true, info.connection.isClosed());
  UTIL.shutdownMiniCluster();
}
 
源代码8 项目: hbase   文件: TableMapReduceUtil.java
public static void initCredentials(JobConf job) throws IOException {
  UserProvider userProvider = UserProvider.instantiate(job);
  if (userProvider.isHadoopSecurityEnabled()) {
    // propagate delegation related props from launcher job to MR job
    if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
      job.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
    }
  }

  if (userProvider.isHBaseSecurityEnabled()) {
    Connection conn = ConnectionFactory.createConnection(job);
    try {
      // login the server principal (if using secure Hadoop)
      User user = userProvider.getCurrent();
      TokenUtil.addTokenForJob(conn, job, user);
    } catch (InterruptedException ie) {
      LOG.error("Interrupted obtaining user authentication token", ie);
      Thread.currentThread().interrupt();
    } finally {
      conn.close();
    }
  }
}
 
源代码9 项目: hbase   文件: TableMapReduceUtil.java
/**
 * Obtain an authentication token, for the specified cluster, on behalf of the current user
 * and add it to the credentials for the given map reduce job.
 *
 * @param job The job that requires the permission.
 * @param conf The configuration to use in connecting to the peer cluster
 * @throws IOException When the authentication token cannot be obtained.
 */
public static void initCredentialsForCluster(Job job, Configuration conf)
    throws IOException {
  UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
  if (userProvider.isHBaseSecurityEnabled()) {
    try {
      Connection peerConn = ConnectionFactory.createConnection(conf);
      try {
        TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
      } finally {
        peerConn.close();
      }
    } catch (InterruptedException e) {
      LOG.info("Interrupted obtaining user authentication token");
      Thread.interrupted();
    }
  }
}
 
源代码10 项目: hbase   文件: TestMobSecureExportSnapshot.java
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  setUpBaseConf(TEST_UTIL.getConfiguration());
  // Setup separate test-data directory for MR cluster and set corresponding configurations.
  // Otherwise, different test classes running MR cluster can step on each other.
  TEST_UTIL.getDataTestDir();

  // set the always on security provider
  UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
    HadoopSecurityEnabledUserProviderForTesting.class);

  // setup configuration
  SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());

  TEST_UTIL.startMiniCluster(3);
  TEST_UTIL.startMiniMapReduceCluster();

  // Wait for the ACL table to become available
  TEST_UTIL.waitTableEnabled(PermissionStorage.ACL_TABLE_NAME);
}
 
源代码11 项目: hbase   文件: TestSecureExportSnapshot.java
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  setUpBaseConf(TEST_UTIL.getConfiguration());
  // Setup separate test-data directory for MR cluster and set corresponding configurations.
  // Otherwise, different test classes running MR cluster can step on each other.
  TEST_UTIL.getDataTestDir();

  // set the always on security provider
  UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
    HadoopSecurityEnabledUserProviderForTesting.class);

  // setup configuration
  SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());

  TEST_UTIL.startMiniCluster(3);
  TEST_UTIL.startMiniMapReduceCluster();

  // Wait for the ACL table to become available
  TEST_UTIL.waitTableEnabled(PermissionStorage.ACL_TABLE_NAME);
}
 
源代码12 项目: hbase   文件: RESTServlet.java
/**
 * Constructor with existing configuration
 * @param conf existing configuration
 * @param userProvider the login user provider
 * @throws IOException
 */
RESTServlet(final Configuration conf,
    final UserProvider userProvider) throws IOException {
  this.realUser = userProvider.getCurrent().getUGI();
  this.conf = conf;
  registerCustomFilter(conf);

  int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
  int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
  connectionCache = new ConnectionCache(
    conf, userProvider, cleanInterval, maxIdleTime);
  if (supportsProxyuser()) {
    ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
  }

  metrics = new MetricsREST();

  pauseMonitor = new JvmPauseMonitor(conf, metrics.getSource());
  pauseMonitor.start();
}
 
源代码13 项目: hbase   文件: TestGetAndPutResource.java
@Test
public void testMetrics() throws IOException {
  final String path = "/" + TABLE + "/" + ROW_4 + "/" + COLUMN_1;
  Response response = client.put(path, Constants.MIMETYPE_BINARY,
      Bytes.toBytes(VALUE_4));
  assertEquals(200, response.getCode());
  Thread.yield();
  response = client.get(path, Constants.MIMETYPE_JSON);
  assertEquals(200, response.getCode());
  assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
  response = deleteRow(TABLE, ROW_4);
  assertEquals(200, response.getCode());

  UserProvider userProvider = UserProvider.instantiate(conf);
  METRICS_ASSERT.assertCounterGt("requests", 2L,
    RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());

  METRICS_ASSERT.assertCounterGt("successfulGet", 0L,
    RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());

  METRICS_ASSERT.assertCounterGt("successfulPut", 0L,
    RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());

  METRICS_ASSERT.assertCounterGt("successfulDelete", 0L,
    RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());
}
 
源代码14 项目: storm-hbase   文件: HBaseSecurityUtil.java
public static UserProvider login(Map conf, Configuration hbaseConfig) throws IOException {
    UserProvider provider = UserProvider.instantiate(hbaseConfig);
    if (UserGroupInformation.isSecurityEnabled()) {
        String keytab = (String) conf.get(STORM_KEYTAB_FILE_KEY);
        if (keytab != null) {
            hbaseConfig.set(STORM_KEYTAB_FILE_KEY, keytab);
        }
        String userName = (String) conf.get(STORM_USER_NAME_KEY);
        if (userName != null) {
            hbaseConfig.set(STORM_USER_NAME_KEY, userName);
        }
        provider.login(STORM_KEYTAB_FILE_KEY, STORM_USER_NAME_KEY, 
            InetAddress.getLocalHost().getCanonicalHostName());
    }
    return provider;
}
 
源代码15 项目: hbase   文件: HBaseServiceHandler.java
public HBaseServiceHandler(final Configuration c,
    final UserProvider userProvider) throws IOException {
  this.conf = c;
  int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
  int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
  connectionCache = new ConnectionCache(
      conf, userProvider, cleanInterval, maxIdleTime);
}
 
private ThriftHBaseServiceHandler createHandler() throws TException {
  try {
    Configuration conf = UTIL.getConfiguration();
    return new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
  } catch (IOException ie) {
    throw new TException(ie);
  }
}
 
源代码17 项目: hbase   文件: TestThriftHBaseServiceHandler.java
private ThriftHBaseServiceHandler createHandler() throws TException {
  try {
    Configuration conf = UTIL.getConfiguration();
    return new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
  } catch (IOException ie) {
    throw new TException(ie);
  }
}
 
源代码18 项目: hbase   文件: TestThriftHBaseServiceHandler.java
@Test
public void testSlowLogResponses() throws Exception {

  // start a thrift server
  HBaseThriftTestingUtility THRIFT_TEST_UTIL = new HBaseThriftTestingUtility();
  Configuration configuration = UTIL.getConfiguration();
  configuration.setBoolean("hbase.regionserver.slowlog.buffer.enabled", true);

  THRIFT_TEST_UTIL.startThriftServer(configuration, ThriftServerType.ONE);
  ThriftHBaseServiceHandler thriftHBaseServiceHandler =
    new ThriftHBaseServiceHandler(configuration,
      UserProvider.instantiate(configuration));
  Collection<ServerName> serverNames = UTIL.getAdmin().getRegionServers();
  Set<TServerName> tServerNames =
    ThriftUtilities.getServerNamesFromHBase(new HashSet<>(serverNames));
  List<Boolean> clearedResponses =
    thriftHBaseServiceHandler.clearSlowLogResponses(tServerNames);
  clearedResponses.forEach(Assert::assertTrue);
  TLogQueryFilter tLogQueryFilter = new TLogQueryFilter();
  tLogQueryFilter.setLimit(15);
  Assert.assertEquals(tLogQueryFilter.getFilterByOperator(), TFilterByOperator.OR);
  LogQueryFilter logQueryFilter = ThriftUtilities.getSlowLogQueryFromThrift(tLogQueryFilter);
  Assert.assertEquals(logQueryFilter.getFilterByOperator(), LogQueryFilter.FilterByOperator.OR);
  tLogQueryFilter.setFilterByOperator(TFilterByOperator.AND);
  logQueryFilter = ThriftUtilities.getSlowLogQueryFromThrift(tLogQueryFilter);
  Assert.assertEquals(logQueryFilter.getFilterByOperator(), LogQueryFilter.FilterByOperator.AND);
  List<TOnlineLogRecord> tLogRecords =
    thriftHBaseServiceHandler.getSlowLogResponses(tServerNames, tLogQueryFilter);
  assertEquals(tLogRecords.size(), 0);
}
 
源代码19 项目: hbase   文件: TestThriftServer.java
/**
 * Tests for creating, enabling, disabling, and deleting tables.  Also
 * tests that creating a table with an invalid column name yields an
 * IllegalArgument exception.
 */
public void doTestTableCreateDrop() throws Exception {
  ThriftHBaseServiceHandler handler =
    new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
      UserProvider.instantiate(UTIL.getConfiguration()));
  doTestTableCreateDrop(handler);
}
 
源代码20 项目: hbase   文件: TestThriftServer.java
public void doTestIncrements() throws Exception {
  ThriftHBaseServiceHandler handler =
    new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
      UserProvider.instantiate(UTIL.getConfiguration()));
  createTestTables(handler);
  doTestIncrements(handler);
  dropTestTables(handler);
}
 
源代码21 项目: hbase   文件: TestThriftServer.java
/**
 * Tests adding a series of Mutations and BatchMutations, including a
 * delete mutation.  Also tests data retrieval, and getting back multiple
 * versions.
 */
public void doTestTableMutations() throws Exception {
  ThriftHBaseServiceHandler handler =
    new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
      UserProvider.instantiate(UTIL.getConfiguration()));
  doTestTableMutations(handler);
}
 
源代码22 项目: hbase   文件: TestThriftServer.java
/**
 * For HBASE-2556
 * Tests for GetTableRegions
 */
public void doTestGetTableRegions() throws Exception {
  ThriftHBaseServiceHandler handler =
    new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
      UserProvider.instantiate(UTIL.getConfiguration()));
  doTestGetTableRegions(handler);
}
 
源代码23 项目: hbase   文件: TestThriftServer.java
/**
 * Appends the value to a cell and checks that the cell value is updated properly.
 */
public static void doTestAppend() throws Exception {
  ThriftHBaseServiceHandler handler =
    new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
      UserProvider.instantiate(UTIL.getConfiguration()));
  handler.createTable(tableAname, getColumnDescriptors());
  try {
    List<Mutation> mutations = new ArrayList<>(1);
    mutations.add(new Mutation(false, columnAname, valueAname, true));
    handler.mutateRow(tableAname, rowAname, mutations, null);

    List<ByteBuffer> columnList = new ArrayList<>(1);
    columnList.add(columnAname);
    List<ByteBuffer> valueList = new ArrayList<>(1);
    valueList.add(valueBname);

    TAppend append = new TAppend(tableAname, rowAname, columnList, valueList);
    handler.append(append);

    TRowResult rowResult = handler.getRow(tableAname, rowAname, null).get(0);
    assertEquals(rowAname, rowResult.row);
    assertArrayEquals(Bytes.add(valueAname.array(), valueBname.array()),
      rowResult.columns.get(columnAname).value.array());
  } finally {
    handler.disableTable(tableAname);
    handler.deleteTable(tableAname);
  }
}
 
源代码24 项目: hbase   文件: TestThriftServer.java
@Test
public void testGetThriftServerType() throws Exception {
  ThriftHBaseServiceHandler handler =
      new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
          UserProvider.instantiate(UTIL.getConfiguration()));
  assertEquals(TThriftServerType.ONE, handler.getThriftServerType());
}
 
源代码25 项目: hbase   文件: Export.java
public static Map<byte[], Response> run(final Configuration conf, TableName tableName,
    Scan scan, Path dir) throws Throwable {
  FileSystem fs = dir.getFileSystem(conf);
  UserProvider userProvider = UserProvider.instantiate(conf);
  checkDir(fs, dir);
  FsDelegationToken fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
  fsDelegationToken.acquireDelegationToken(fs);
  try {
    final ExportProtos.ExportRequest request = getConfiguredRequest(conf, dir,
      scan, fsDelegationToken.getUserToken());
    try (Connection con = ConnectionFactory.createConnection(conf);
            Table table = con.getTable(tableName)) {
      Map<byte[], Response> result = new TreeMap<>(Bytes.BYTES_COMPARATOR);
      table.coprocessorService(ExportProtos.ExportService.class,
        scan.getStartRow(),
        scan.getStopRow(),
        (ExportProtos.ExportService service) -> {
          ServerRpcController controller = new ServerRpcController();
          Map<byte[], ExportProtos.ExportResponse> rval = new TreeMap<>(Bytes.BYTES_COMPARATOR);
          CoprocessorRpcUtils.BlockingRpcCallback<ExportProtos.ExportResponse>
            rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
          service.export(controller, request, rpcCallback);
          if (controller.failedOnException()) {
            throw controller.getFailedOn();
          }
          return rpcCallback.get();
        }).forEach((k, v) -> result.put(k, new Response(v)));
      return result;
    } catch (Throwable e) {
      fs.delete(dir, true);
      throw e;
    }
  } finally {
    fsDelegationToken.releaseDelegationToken();
  }
}
 
源代码26 项目: hbase   文件: Export.java
@Override
public void start(CoprocessorEnvironment environment) throws IOException {
  if (environment instanceof RegionCoprocessorEnvironment) {
    env = (RegionCoprocessorEnvironment) environment;
    userProvider = UserProvider.instantiate(env.getConfiguration());
  } else {
    throw new CoprocessorException("Must be loaded on a table region!");
  }
}
 
源代码27 项目: hbase   文件: Export.java
SecureWriter(final Configuration conf, final UserProvider userProvider,
    final Token userToken, final List<SequenceFile.Writer.Option> opts)
    throws IOException {
  User user = getActiveUser(userProvider, userToken);
  try {
    SequenceFile.Writer sequenceFileWriter =
        user.runAs((PrivilegedExceptionAction<SequenceFile.Writer>) () ->
            SequenceFile.createWriter(conf,
                opts.toArray(new SequenceFile.Writer.Option[opts.size()])));
    privilegedWriter = new PrivilegedWriter(user, sequenceFileWriter);
  } catch (InterruptedException e) {
    throw new IOException(e);
  }
}
 
源代码28 项目: hbase   文件: Export.java
private static User getActiveUser(final UserProvider userProvider, final Token userToken)
    throws IOException {
  User user = RpcServer.getRequestUser().orElse(userProvider.getCurrent());
  if (user == null && userToken != null) {
    LOG.warn("No found of user credentials, but a token was got from user request");
  } else if (user != null && userToken != null) {
    user.addToken(userToken);
  }
  return user;
}
 
源代码29 项目: hbase   文件: HFileReplicator.java
public HFileReplicator(Configuration sourceClusterConf,
    String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
    Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
    AsyncClusterConnection connection, List<String> sourceClusterIds) throws IOException {
  this.sourceClusterConf = sourceClusterConf;
  this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
  this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
  this.bulkLoadHFileMap = tableQueueMap;
  this.conf = conf;
  this.connection = connection;
  this.sourceClusterIds = sourceClusterIds;

  userProvider = UserProvider.instantiate(conf);
  fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
  this.hbaseStagingDir =
    new Path(CommonFSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
  this.maxCopyThreads =
      this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
        REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
  this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS,
      new ThreadFactoryBuilder().setDaemon(true)
          .setNameFormat("HFileReplicationCopier-%1$d-" + this.sourceBaseNamespaceDirPath).
        build());
  this.copiesPerThread =
      conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
        REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);

  sinkFs = FileSystem.get(conf);
}
 
源代码30 项目: hbase   文件: ReplicationSink.java
private AsyncClusterConnection getConnection() throws IOException {
  // See https://en.wikipedia.org/wiki/Double-checked_locking
  AsyncClusterConnection connection = sharedConn;
  if (connection == null) {
    synchronized (sharedConnLock) {
      connection = sharedConn;
      if (connection == null) {
        connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
          UserProvider.instantiate(conf).getCurrent());
        sharedConn = connection;
      }
    }
  }
  return connection;
}
 
 类所在包
 同包方法