下面列出了怎么用org.apache.hadoop.hbase.replication.ReplicationPeerConfig的API类实例代码及写法,或者点击链接到github查看源代码。
public void upgrade() throws Exception {
try (Connection conn = ConnectionFactory.createConnection(conf)) {
Admin admin = conn.getAdmin();
admin.listReplicationPeers().forEach((peerDesc) -> {
String peerId = peerDesc.getPeerId();
ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|| (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
peerConfig.setReplicateAllUserTables(false);
try {
admin.updateReplicationPeerConfig(peerId, peerConfig);
} catch (Exception e) {
LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e);
}
}
});
}
}
public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception {
Map<String, String> currentConf;
StringBuilder sb = new StringBuilder();
for (ReplicationPeerDescription peer : peers) {
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
sb.append("Peer: " + peer.getPeerId() + "\n");
sb.append(" " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n");
sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
sb.append(" " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n");
currentConf = peerConfig.getConfiguration();
// Only show when we have a custom configuration for the peer
if (currentConf.size() > 1) {
sb.append(" " + "Peer Configuration: " + currentConf + "\n");
}
sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
sb.append(" " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n");
}
return sb.toString();
}
@Override
public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
Map<TableName, List<String>> tableCfs) {
if (tableCfs == null) {
return failedFuture(new ReplicationException("tableCfs is null"));
}
CompletableFuture<Void> future = new CompletableFuture<Void>();
addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
if (!completeExceptionally(future, error)) {
ReplicationPeerConfig newPeerConfig =
ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
if (!completeExceptionally(future, error)) {
future.complete(result);
}
});
}
});
return future;
}
@VisibleForTesting
protected void reopenRegions(MasterProcedureEnv env) throws IOException {
ReplicationPeerConfig peerConfig = getNewPeerConfig();
ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
TableStateManager tsm = env.getMasterServices().getTableStateManager();
for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
if (!td.hasGlobalReplicationScope()) {
continue;
}
TableName tn = td.getTableName();
if (!peerConfig.needToReplicate(tn)) {
continue;
}
if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
oldPeerConfig.needToReplicate(tn)) {
continue;
}
if (needReopen(tsm, tn)) {
addChildProcedure(new ReopenTableRegionsProcedure(tn));
}
}
}
void preAddPeer(String peerId, ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException, ReplicationException {
if (peerId.contains("-")) {
throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
}
checkPeerConfig(peerConfig);
if (peerConfig.isSyncReplication()) {
checkSyncReplicationPeerConfigConflict(peerConfig);
}
if (peers.containsKey(peerId)) {
throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
}
// make sure that there is no queues with the same peer id. This may happen when we create a
// peer with the same id with a old deleted peer. If the replication queues for the old peer
// have not been cleaned up yet then we should not create the new peer, otherwise the old wal
// file may also be replicated.
checkQueuesDeleted(peerId);
}
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException {
// the checking rules are too complicated here so we give up checking whether this is a retry.
ReplicationPeerDescription desc = peers.get(peerId);
ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
ReplicationPeerConfigBuilder newPeerConfigBuilder =
ReplicationPeerConfig.newBuilder(peerConfig);
// we need to use the new conf to overwrite the old one.
newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
peerStorage.updatePeerConfig(peerId, newPeerConfig);
peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig,
desc.getSyncReplicationState()));
}
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException {
String filterCSV = peerConfig.getConfiguration()
.get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY);
if (filterCSV != null && !filterCSV.isEmpty()) {
String[] filters = filterCSV.split(",");
for (String filter : filters) {
try {
Class.forName(filter).getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new DoNotRetryIOException("Configured WALEntryFilter " + filter +
" could not be created. Failing add/update peer operation.", e);
}
}
}
}
protected final void setLastPushedSequenceId(MasterProcedureEnv env,
ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
Map<String, Long> lastSeqIds = new HashMap<String, Long>();
for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
if (!td.hasGlobalReplicationScope()) {
continue;
}
TableName tn = td.getTableName();
if (!peerConfig.needToReplicate(tn)) {
continue;
}
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
}
if (!lastSeqIds.isEmpty()) {
env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
}
}
/**
* Add a peer and wait for it to initialize
* @param waitForSource Whether to wait for replication source to initialize
*/
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
final boolean waitForSource) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
rp.getPeerStorage().addPeer(peerId, peerConfig, true, SyncReplicationState.NONE);
try {
manager.addPeer(peerId);
} catch (Exception e) {
// ignore the failed exception, because we'll test both success & failed case.
}
waitPeer(peerId, manager, waitForSource);
if (managerOfCluster != null) {
managerOfCluster.addPeer(peerId);
waitPeer(peerId, managerOfCluster, waitForSource);
}
}
@Before
@Override
public void setUpBase() throws Exception {
//"super.setUpBase()" already sets replication from 1->2,
//then on the subsequent lines, sets 2->1, 2->3 and 3->2.
//So we have following topology: "1 <-> 2 <->3"
super.setUpBase();
ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
//adds cluster1 as a remote peer on cluster2
UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
//adds cluster3 as a remote peer on cluster2
UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
//adds cluster2 as a remote peer on cluster3
UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
setupCoprocessor(UTIL1, "cluster1");
setupCoprocessor(UTIL2, "cluster2");
setupCoprocessor(UTIL3, "cluster3");
BULK_LOADS_COUNT = new AtomicInteger(0);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
TEST_UTIL.startMiniCluster();
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
TEST_UTIL2 = new HBaseTestingUtility(conf2);
TEST_UTIL2.startMiniCluster();
admin2 =
ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin();
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(TEST_UTIL2.getClusterKey());
ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
}
@Test
public void testPeerConfig() throws Exception {
ReplicationPeerConfig config = new ReplicationPeerConfig();
config.setClusterKey(KEY_ONE);
config.getConfiguration().put("key1", "value1");
config.getConfiguration().put("key2", "value2");
admin.addReplicationPeer(ID_ONE, config).join();
List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
assertEquals(1, peers.size());
ReplicationPeerDescription peerOne = peers.get(0);
assertNotNull(peerOne);
assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
admin.removeReplicationPeer(ID_ONE).join();
}
/**
*
* @param configuration
* @param peerName
* @param tableCFs
* @throws ReplicationException
* @throws IOException
*/
protected void addPeer(final Configuration configuration,String peerName, Map<TableName, List<String>> tableCFs)
throws ReplicationException, IOException {
try (ReplicationAdmin replicationAdmin = new ReplicationAdmin(configuration)) {
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
.setClusterKey(ZKConfig.getZooKeeperClusterKey(configuration))
.setReplicationEndpointImpl(HbaseEndpoint.class.getName());
replicationAdmin.addPeer(peerName, peerConfig, tableCFs);
}
}
@Override
public CompletableFuture<ReplicationPeerConfig> getReplicationPeerConfig(String peerId) {
return this.<ReplicationPeerConfig> newMasterCaller().action((controller, stub) -> this
.<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig>
call(controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId),
(s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
(resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
}
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException {
if (peers.containsKey(peerId)) {
// this should be a retry, just return
return;
}
ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
SyncReplicationState syncReplicationState =
copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE
: SyncReplicationState.NONE;
peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState);
peers.put(peerId,
new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState));
}
private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException {
// This is used to reduce the difficulty for implementing the sync replication state transition
// as we need to reopen all the related regions.
// TODO: Add namespace, replicat_all flag back
if (peerConfig.replicateAllUserTables()) {
throw new DoNotRetryIOException(
"Only support replicated table config for sync replication peer");
}
if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) {
throw new DoNotRetryIOException(
"Only support replicated table config for sync replication peer");
}
if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) {
throw new DoNotRetryIOException("Need config replicated tables for sync replication peer");
}
for (List<String> cfs : peerConfig.getTableCFsMap().values()) {
if (cfs != null && !cfs.isEmpty()) {
throw new DoNotRetryIOException(
"Only support replicated table config for sync replication peer");
}
}
Path remoteWALDir = new Path(peerConfig.getRemoteWALDir());
if (!remoteWALDir.isAbsolute()) {
throw new DoNotRetryIOException(
"The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute");
}
URI remoteWALDirUri = remoteWALDir.toUri();
if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) {
throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir() +
" is not qualified, you must provide scheme and authority");
}
}
private void checkSyncReplicationPeerConfigConflict(ReplicationPeerConfig peerConfig)
throws DoNotRetryIOException {
for (TableName tableName : peerConfig.getTableCFsMap().keySet()) {
for (Map.Entry<String, ReplicationPeerDescription> entry : peers.entrySet()) {
ReplicationPeerConfig rpc = entry.getValue().getPeerConfig();
if (rpc.isSyncReplication() && rpc.getTableCFsMap().containsKey(tableName)) {
throw new DoNotRetryIOException(
"Table " + tableName + " has been replicated by peer " + entry.getKey());
}
}
}
}
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
throws ReplicationException {
ReplicationPeerStorage peerStorage =
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
for (String peerId : peerStorage.listPeerIds()) {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
}
return new ReplicationPeerManager(peerStorage,
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
}
@Override
public CompletableFuture<Void> addReplicationPeer(String peerId,
ReplicationPeerConfig peerConfig, boolean enabled) {
return this.<AddReplicationPeerRequest, AddReplicationPeerResponse> procedureCall(
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled),
(s, c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> resp.getProcId(),
new ReplicationProcedureBiConsumer(peerId, () -> "ADD_REPLICATION_PEER"));
}
public void postAddReplicationPeer(final String peerId, final ReplicationPeerConfig peerConfig)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.postAddReplicationPeer(this, peerId, peerConfig);
}
});
}
public void preUpdateReplicationPeerConfig(final String peerId,
final ReplicationPeerConfig peerConfig) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
@Override
public void call(MasterObserver observer) throws IOException {
observer.preUpdateReplicationPeerConfig(this, peerId, peerConfig);
}
});
}
public static UpdateReplicationPeerConfigRequest buildUpdateReplicationPeerConfigRequest(
String peerId, ReplicationPeerConfig peerConfig) {
UpdateReplicationPeerConfigRequest.Builder builder = UpdateReplicationPeerConfigRequest
.newBuilder();
builder.setPeerId(peerId);
builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
return builder.build();
}
@Override
public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
throws ReplicationException, IOException {
if (cpHost != null) {
cpHost.preGetReplicationPeerConfig(peerId);
}
LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId);
ReplicationPeerConfig peerConfig = this.replicationPeerManager.getPeerConfig(peerId)
.orElseThrow(() -> new ReplicationPeerNotFoundException(peerId));
if (cpHost != null) {
cpHost.postGetReplicationPeerConfig(peerId);
}
return peerConfig;
}
@Override
public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException, IOException {
LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId +
", config=" + peerConfig);
return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig));
}
@Override
public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller,
GetReplicationPeerConfigRequest request) throws ServiceException {
GetReplicationPeerConfigResponse.Builder response = GetReplicationPeerConfigResponse
.newBuilder();
try {
String peerId = request.getPeerId();
ReplicationPeerConfig peerConfig = master.getReplicationPeerConfig(peerId);
response.setPeerId(peerId);
response.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);
}
return response.build();
}
/**
* Append the replicable table column family config from the specified peer.
* @param id a short that identifies the cluster
* @param tableCfs A map from tableName to column family names
* @throws ReplicationException if tableCfs has conflict with existing config
* @throws IOException if a remote or network exception occurs
*/
default void appendReplicationPeerTableCFs(String id, Map<TableName, List<String>> tableCfs)
throws ReplicationException, IOException {
if (tableCfs == null) {
throw new ReplicationException("tableCfs is null");
}
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
ReplicationPeerConfig newPeerConfig =
ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
updateReplicationPeerConfig(id, newPeerConfig);
}
@Test
public void testRemovePeerMetricsCleanup() throws Exception {
final String peerId = "DummyPeer";
final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build();
try {
MetricsReplicationSourceSource globalSource = getGlobalSource();
final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
final long sizeOfLatestPath = getSizeOfLatestPath();
addPeerAndWait(peerId, peerConfig, true);
assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
ReplicationSourceInterface source = manager.getSource(peerId);
// Sanity check
assertNotNull(source);
final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue();
// Enqueue log and check if metrics updated
source.enqueueLog(new Path("abc"));
assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue());
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());
// Removing the peer should reset the global metrics
removePeerAndWait(peerId);
assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue());
// Adding the same peer back again should reset the single source metrics
addPeerAndWait(peerId, peerConfig, true);
source = manager.getSource(peerId);
assertNotNull(source);
assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial,
globalSource.getSizeOfLogQueue());
} finally {
removePeerAndWait(peerId);
}
}
private ReplicationSourceInterface mockReplicationSource(String peerId) {
ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
when(source.getPeerId()).thenReturn(peerId);
when(source.getQueueId()).thenReturn(peerId);
when(source.isRecovered()).thenReturn(false);
when(source.isSyncReplication()).thenReturn(true);
ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
when(config.getRemoteWALDir())
.thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
ReplicationPeer peer = mock(ReplicationPeer.class);
when(peer.getPeerConfig()).thenReturn(config);
when(source.getPeer()).thenReturn(peer);
return source;
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.startMiniCluster();
ReplicationPeer replicationPeer = mock(ReplicationPeer.class);
ReplicationPeerConfig rpc = mock(ReplicationPeerConfig.class);
when(rpc.isSerial()).thenReturn(false);
when(replicationPeer.getPeerConfig()).thenReturn(rpc);
Context context = new Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(), null,
null, null, replicationPeer, null, null, null);
endpoint = new HBaseInterClusterReplicationEndpoint();
endpoint.init(context);
UTIL.createTable(TABLE1, FAMILY);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "multiwal");
// make sure that we will create a new group for the table
UTIL.getConfiguration().setInt("hbase.wal.regiongrouping.numgroups", 8);
UTIL.startMiniCluster(3);
Path dir = UTIL.getDataTestDirOnTestFS();
FS = UTIL.getTestFileSystem();
LOG_PATH = new Path(dir, "replicated");
WRITER = WALFactory.createWALWriter(FS, LOG_PATH, UTIL.getConfiguration());
UTIL.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(),
true);
}