类org.apache.hadoop.hbase.replication.ReplicationPeerConfig源码实例Demo

下面列出了怎么用org.apache.hadoop.hbase.replication.ReplicationPeerConfig的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: hbase   文件: ReplicationPeerConfigUpgrader.java
public void upgrade() throws Exception {
  try (Connection conn = ConnectionFactory.createConnection(conf)) {
    Admin admin = conn.getAdmin();
    admin.listReplicationPeers().forEach((peerDesc) -> {
      String peerId = peerDesc.getPeerId();
      ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
      if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
          || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
        peerConfig.setReplicateAllUserTables(false);
        try {
          admin.updateReplicationPeerConfig(peerId, peerConfig);
        } catch (Exception e) {
          LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e);
        }
      }
    });
  }
}
 
源代码2 项目: hbase   文件: DumpReplicationQueues.java
public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception {
  Map<String, String> currentConf;
  StringBuilder sb = new StringBuilder();
  for (ReplicationPeerDescription peer : peers) {
    ReplicationPeerConfig peerConfig = peer.getPeerConfig();
    sb.append("Peer: " + peer.getPeerId() + "\n");
    sb.append("    " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n");
    sb.append("    " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
    sb.append("    " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n");
    currentConf = peerConfig.getConfiguration();
    // Only show when we have a custom configuration for the peer
    if (currentConf.size() > 1) {
      sb.append("    " + "Peer Configuration: " + currentConf + "\n");
    }
    sb.append("    " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
    sb.append("    " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n");
  }
  return sb.toString();
}
 
源代码3 项目: hbase   文件: RawAsyncHBaseAdmin.java
@Override
public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
    Map<TableName, List<String>> tableCfs) {
  if (tableCfs == null) {
    return failedFuture(new ReplicationException("tableCfs is null"));
  }

  CompletableFuture<Void> future = new CompletableFuture<Void>();
  addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
    if (!completeExceptionally(future, error)) {
      ReplicationPeerConfig newPeerConfig =
        ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
      addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
        if (!completeExceptionally(future, error)) {
          future.complete(result);
        }
      });
    }
  });
  return future;
}
 
源代码4 项目: hbase   文件: ModifyPeerProcedure.java
@VisibleForTesting
protected void reopenRegions(MasterProcedureEnv env) throws IOException {
  ReplicationPeerConfig peerConfig = getNewPeerConfig();
  ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
  TableStateManager tsm = env.getMasterServices().getTableStateManager();
  for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
    if (!td.hasGlobalReplicationScope()) {
      continue;
    }
    TableName tn = td.getTableName();
    if (!peerConfig.needToReplicate(tn)) {
      continue;
    }
    if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
      oldPeerConfig.needToReplicate(tn)) {
      continue;
    }
    if (needReopen(tsm, tn)) {
      addChildProcedure(new ReopenTableRegionsProcedure(tn));
    }
  }
}
 
源代码5 项目: hbase   文件: ReplicationPeerManager.java
void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
    throws DoNotRetryIOException, ReplicationException {
  if (peerId.contains("-")) {
    throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
  }
  checkPeerConfig(peerConfig);
  if (peerConfig.isSyncReplication()) {
    checkSyncReplicationPeerConfigConflict(peerConfig);
  }
  if (peers.containsKey(peerId)) {
    throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
  }
  // make sure that there is no queues with the same peer id. This may happen when we create a
  // peer with the same id with a old deleted peer. If the replication queues for the old peer
  // have not been cleaned up yet then we should not create the new peer, otherwise the old wal
  // file may also be replicated.
  checkQueuesDeleted(peerId);
}
 
源代码6 项目: hbase   文件: ReplicationPeerManager.java
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
    throws ReplicationException {
  // the checking rules are too complicated here so we give up checking whether this is a retry.
  ReplicationPeerDescription desc = peers.get(peerId);
  ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
  ReplicationPeerConfigBuilder newPeerConfigBuilder =
    ReplicationPeerConfig.newBuilder(peerConfig);
  // we need to use the new conf to overwrite the old one.
  newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
  newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
  newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
  newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
  ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
  peerStorage.updatePeerConfig(peerId, newPeerConfig);
  peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig,
    desc.getSyncReplicationState()));
}
 
源代码7 项目: hbase   文件: ReplicationPeerManager.java
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
    throws DoNotRetryIOException {
  String filterCSV = peerConfig.getConfiguration()
    .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
  if (filterCSV != null && !filterCSV.isEmpty()) {
    String[] filters = filterCSV.split(",");
    for (String filter : filters) {
      try {
        Class.forName(filter).getDeclaredConstructor().newInstance();
      } catch (Exception e) {
        throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
          " could not be created. Failing add/update peer operation.", e);
      }
    }
  }
}
 
源代码8 项目: hbase   文件: AbstractPeerProcedure.java
protected final void setLastPushedSequenceId(MasterProcedureEnv env,
    ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
  Map<String, Long> lastSeqIds = new HashMap<String, Long>();
  for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
    if (!td.hasGlobalReplicationScope()) {
      continue;
    }
    TableName tn = td.getTableName();
    if (!peerConfig.needToReplicate(tn)) {
      continue;
    }
    setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
  }
  if (!lastSeqIds.isEmpty()) {
    env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
  }
}
 
源代码9 项目: hbase   文件: TestReplicationSourceManager.java
/**
 * Add a peer and wait for it to initialize
 * @param waitForSource Whether to wait for replication source to initialize
 */
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
    final boolean waitForSource) throws Exception {
  final ReplicationPeers rp = manager.getReplicationPeers();
  rp.getPeerStorage().addPeer(peerId, peerConfig, true, SyncReplicationState.NONE);
  try {
    manager.addPeer(peerId);
  } catch (Exception e) {
    // ignore the failed exception, because we'll test both success & failed case.
  }
  waitPeer(peerId, manager, waitForSource);
  if (managerOfCluster != null) {
    managerOfCluster.addPeer(peerId);
    waitPeer(peerId, managerOfCluster, waitForSource);
  }
}
 
源代码10 项目: hbase   文件: TestBulkLoadReplication.java
@Before
@Override
public void setUpBase() throws Exception {
  //"super.setUpBase()" already sets replication from 1->2,
  //then on the subsequent lines, sets 2->1, 2->3 and 3->2.
  //So we have following topology: "1 <-> 2 <->3"
  super.setUpBase();
  ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
  ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
  ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
  //adds cluster1 as a remote peer on cluster2
  UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
  //adds cluster3 as a remote peer on cluster2
  UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
  //adds cluster2 as a remote peer on cluster3
  UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
  setupCoprocessor(UTIL1, "cluster1");
  setupCoprocessor(UTIL2, "cluster2");
  setupCoprocessor(UTIL3, "cluster3");
  BULK_LOADS_COUNT = new AtomicInteger(0);
}
 
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
  TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
  TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
  TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
  TEST_UTIL.startMiniCluster();
  ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();

  conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
  conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
  TEST_UTIL2 = new HBaseTestingUtility(conf2);
  TEST_UTIL2.startMiniCluster();
  admin2 =
      ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin();

  ReplicationPeerConfig rpc = new ReplicationPeerConfig();
  rpc.setClusterKey(TEST_UTIL2.getClusterKey());
  ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
}
 
源代码12 项目: hbase   文件: TestAsyncReplicationAdminApi.java
@Test
public void testPeerConfig() throws Exception {
  ReplicationPeerConfig config = new ReplicationPeerConfig();
  config.setClusterKey(KEY_ONE);
  config.getConfiguration().put("key1", "value1");
  config.getConfiguration().put("key2", "value2");
  admin.addReplicationPeer(ID_ONE, config).join();

  List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
  assertEquals(1, peers.size());
  ReplicationPeerDescription peerOne = peers.get(0);
  assertNotNull(peerOne);
  assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
  assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));

  admin.removeReplicationPeer(ID_ONE).join();
}
 
源代码13 项目: hbase-connect-kafka   文件: BaseTest.java
/**
*
* @param configuration
* @param peerName
* @param tableCFs
* @throws ReplicationException
* @throws IOException
*/
  protected void addPeer(final Configuration configuration,String peerName, Map<TableName, List<String>> tableCFs)
     throws ReplicationException, IOException {
      try (ReplicationAdmin replicationAdmin = new ReplicationAdmin(configuration)) {
          ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
              .setClusterKey(ZKConfig.getZooKeeperClusterKey(configuration))
              .setReplicationEndpointImpl(HbaseEndpoint.class.getName());

          replicationAdmin.addPeer(peerName, peerConfig, tableCFs);
      }
  }
 
源代码14 项目: hbase   文件: RawAsyncHBaseAdmin.java
@Override
public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
  return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
    .<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig>
        call(controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
          (s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
          (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
}
 
源代码15 项目: hbase   文件: ReplicationPeerManager.java
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
    throws ReplicationException {
  if (peers.containsKey(peerId)) {
    // this should be a retry, just return
    return;
  }
  ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
  SyncReplicationState syncReplicationState =
    copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE
      : SyncReplicationState.NONE;
  peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState);
  peers.put(peerId,
    new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState));
}
 
源代码16 项目: hbase   文件: ReplicationPeerManager.java
private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig)
    throws DoNotRetryIOException {
  // This is used to reduce the difficulty for implementing the sync replication state transition
  // as we need to reopen all the related regions.
  // TODO: Add namespace, replicat_all flag back
  if (peerConfig.replicateAllUserTables()) {
    throw new DoNotRetryIOException(
      "Only support replicated table config for sync replication peer");
  }
  if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) {
    throw new DoNotRetryIOException(
      "Only support replicated table config for sync replication peer");
  }
  if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) {
    throw new DoNotRetryIOException("Need config replicated tables for sync replication peer");
  }
  for (List<String> cfs : peerConfig.getTableCFsMap().values()) {
    if (cfs != null && !cfs.isEmpty()) {
      throw new DoNotRetryIOException(
        "Only support replicated table config for sync replication peer");
    }
  }

  Path remoteWALDir = new Path(peerConfig.getRemoteWALDir());
  if (!remoteWALDir.isAbsolute()) {
    throw new DoNotRetryIOException(
      "The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute");
  }
  URI remoteWALDirUri = remoteWALDir.toUri();
  if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) {
    throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir() +
      " is not qualified, you must provide scheme and authority");
  }
}
 
源代码17 项目: hbase   文件: ReplicationPeerManager.java
private void checkSyncReplicationPeerConfigConflict(ReplicationPeerConfig peerConfig)
    throws DoNotRetryIOException {
  for (TableName tableName : peerConfig.getTableCFsMap().keySet()) {
    for (Map.Entry<String, ReplicationPeerDescription> entry : peers.entrySet()) {
      ReplicationPeerConfig rpc = entry.getValue().getPeerConfig();
      if (rpc.isSyncReplication() && rpc.getTableCFsMap().containsKey(tableName)) {
        throw new DoNotRetryIOException(
            "Table " + tableName + " has been replicated by peer " + entry.getKey());
      }
    }
  }
}
 
源代码18 项目: hbase   文件: ReplicationPeerManager.java
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
    throws ReplicationException {
  ReplicationPeerStorage peerStorage =
    ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
  ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
  for (String peerId : peerStorage.listPeerIds()) {
    ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
    boolean enabled = peerStorage.isPeerEnabled(peerId);
    SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
    peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
  }
  return new ReplicationPeerManager(peerStorage,
    ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
}
 
源代码19 项目: hbase   文件: RawAsyncHBaseAdmin.java
@Override
public CompletableFuture<Void> addReplicationPeer(String peerId,
    ReplicationPeerConfig peerConfig, boolean enabled) {
  return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
    RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
    (s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
    new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
}
 
源代码20 项目: hbase   文件: MasterCoprocessorHost.java
public void postAddReplicationPeer(final String peerId, final ReplicationPeerConfig peerConfig)
    throws IOException {
  execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
    @Override
    public void call(MasterObserver observer) throws IOException {
      observer.postAddReplicationPeer(this, peerId, peerConfig);
    }
  });
}
 
源代码21 项目: hbase   文件: MasterCoprocessorHost.java
public void preUpdateReplicationPeerConfig(final String peerId,
    final ReplicationPeerConfig peerConfig) throws IOException {
  execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
    @Override
    public void call(MasterObserver observer) throws IOException {
      observer.preUpdateReplicationPeerConfig(this, peerId, peerConfig);
    }
  });
}
 
源代码22 项目: hbase   文件: RequestConverter.java
public static UpdateReplicationPeerConfigRequest buildUpdateReplicationPeerConfigRequest(
    String peerId, ReplicationPeerConfig peerConfig) {
  UpdateReplicationPeerConfigRequest.Builder builder = UpdateReplicationPeerConfigRequest
      .newBuilder();
  builder.setPeerId(peerId);
  builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
  return builder.build();
}
 
源代码23 项目: hbase   文件: HMaster.java
@Override
public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
    throws ReplicationException, IOException {
  if (cpHost != null) {
    cpHost.preGetReplicationPeerConfig(peerId);
  }
  LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId);
  ReplicationPeerConfig peerConfig = this.replicationPeerManager.getPeerConfig(peerId)
      .orElseThrow(() -> new ReplicationPeerNotFoundException(peerId));
  if (cpHost != null) {
    cpHost.postGetReplicationPeerConfig(peerId);
  }
  return peerConfig;
}
 
源代码24 项目: hbase   文件: HMaster.java
@Override
public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
    throws ReplicationException, IOException {
  LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId +
    ", config=" + peerConfig);
  return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig));
}
 
源代码25 项目: hbase   文件: MasterRpcServices.java
@Override
public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller,
    GetReplicationPeerConfigRequest request) throws ServiceException {
  GetReplicationPeerConfigResponse.Builder response = GetReplicationPeerConfigResponse
      .newBuilder();
  try {
    String peerId = request.getPeerId();
    ReplicationPeerConfig peerConfig = master.getReplicationPeerConfig(peerId);
    response.setPeerId(peerId);
    response.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
  } catch (ReplicationException | IOException e) {
    throw new ServiceException(e);
  }
  return response.build();
}
 
源代码26 项目: hbase   文件: Admin.java
/**
 * Append the replicable table column family config from the specified peer.
 * @param id a short that identifies the cluster
 * @param tableCfs A map from tableName to column family names
 * @throws ReplicationException if tableCfs has conflict with existing config
 * @throws IOException if a remote or network exception occurs
 */
default void appendReplicationPeerTableCFs(String id, Map<TableName, List<String>> tableCfs)
    throws ReplicationException, IOException {
  if (tableCfs == null) {
    throw new ReplicationException("tableCfs is null");
  }
  ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
  ReplicationPeerConfig newPeerConfig =
    ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
  updateReplicationPeerConfig(id, newPeerConfig);
}
 
源代码27 项目: hbase   文件: TestReplicationSourceManager.java
@Test
public void testRemovePeerMetricsCleanup() throws Exception {
  final String peerId = "DummyPeer";
  final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
    .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
  try {
    MetricsReplicationSourceSource globalSource = getGlobalSource();
    final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
    final long sizeOfLatestPath = getSizeOfLatestPath();
    addPeerAndWait(peerId, peerConfig, true);
    assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
    ReplicationSourceInterface source = manager.getSource(peerId);
    // Sanity check
    assertNotNull(source);
    final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
    // Enqueue log and check if metrics updated
    source.enqueueLog(new Path("abc"));
    assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
    assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
      globalSource.getSizeOfLogQueue());

    // Removing the peer should reset the global metrics
    removePeerAndWait(peerId);
    assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());

    // Adding the same peer back again should reset the single source metrics
    addPeerAndWait(peerId, peerConfig, true);
    source = manager.getSource(peerId);
    assertNotNull(source);
    assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
      globalSource.getSizeOfLogQueue());
  } finally {
    removePeerAndWait(peerId);
  }
}
 
源代码28 项目: hbase   文件: TestReplicationSourceManager.java
private ReplicationSourceInterface mockReplicationSource(String peerId) {
  ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
  when(source.getPeerId()).thenReturn(peerId);
  when(source.getQueueId()).thenReturn(peerId);
  when(source.isRecovered()).thenReturn(false);
  when(source.isSyncReplication()).thenReturn(true);
  ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
  when(config.getRemoteWALDir())
    .thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
  ReplicationPeer peer = mock(ReplicationPeer.class);
  when(peer.getPeerConfig()).thenReturn(config);
  when(source.getPeer()).thenReturn(peer);
  return source;
}
 
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  UTIL.startMiniCluster();
  ReplicationPeer replicationPeer = mock(ReplicationPeer.class);
  ReplicationPeerConfig rpc = mock(ReplicationPeerConfig.class);
  when(rpc.isSerial()).thenReturn(false);
  when(replicationPeer.getPeerConfig()).thenReturn(rpc);
  Context context = new Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(), null,
      null, null, replicationPeer, null, null, null);
  endpoint = new HBaseInterClusterReplicationEndpoint();
  endpoint.init(context);

  UTIL.createTable(TABLE1, FAMILY);
}
 
@BeforeClass
public static void setUpBeforeClass() throws Exception {
  UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "multiwal");
  // make sure that we will create a new group for the table
  UTIL.getConfiguration().setInt("hbase.wal.regiongrouping.numgroups", 8);
  UTIL.startMiniCluster(3);
  Path dir = UTIL.getDataTestDirOnTestFS();
  FS = UTIL.getTestFileSystem();
  LOG_PATH = new Path(dir, "replicated");
  WRITER = WALFactory.createWALWriter(FS, LOG_PATH, UTIL.getConfiguration());
  UTIL.getAdmin().addReplicationPeer(PEER_ID,
    ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
      .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(),
    true);
}
 
 类所在包
 同包方法