下面列出了怎么用org.apache.hadoop.hbase.replication.ReplicationException的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public boolean isFileDeletable(FileStatus fStat) {
Set<String> hfileRefsFromQueue;
// all members of this class are null if replication is disabled,
// so do not stop from deleting the file
if (getConf() == null) {
return true;
}
try {
hfileRefsFromQueue = rqs.getAllHFileRefs();
} catch (ReplicationException e) {
LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable "
+ "file for " + fStat.getPath());
return false;
}
return !hfileRefsFromQueue.contains(fStat.getPath().getName());
}
private void adoptAbandonedQueues() {
List<ServerName> currentReplicators = null;
try {
currentReplicators = queueStorage.getListOfReplicators();
} catch (ReplicationException e) {
server.abort("Failed to get all replicators", e);
return;
}
if (currentReplicators == null || currentReplicators.isEmpty()) {
return;
}
List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
.map(ServerName::valueOf).collect(Collectors.toList());
LOG.info(
"Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
// Look if there's anything to process after a restart
for (ServerName rs : currentReplicators) {
if (!otherRegionServers.contains(rs)) {
transferQueues(rs);
}
}
}
/**
* Refresh replication source will terminate the old source first, then the source thread will be
* interrupted. Need to handle it instead of abort the region server.
*/
private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
try {
op.exec();
} catch (ReplicationException e) {
if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
&& e.getCause().getCause() != null && e.getCause()
.getCause() instanceof InterruptedException) {
// ReplicationRuntimeException(a RuntimeException) is thrown out here. The reason is
// that thread is interrupted deep down in the stack, it should pass the following
// processing logic and propagate to the most top layer which can handle this exception
// properly. In this specific case, the top layer is ReplicationSourceShipper#run().
throw new ReplicationRuntimeException(
"Thread is interrupted, the replication source may be terminated",
e.getCause().getCause());
}
server.abort("Failed to operate on replication queue", e);
}
}
@VisibleForTesting
protected void setPeerNewSyncReplicationState(MasterProcedureEnv env)
throws ReplicationException {
if (toState.equals(SyncReplicationState.STANDBY) ||
(fromState.equals(SyncReplicationState.STANDBY) && serial) && enabled) {
// Disable the peer if we are going to transit to STANDBY state, as we need to remove
// all the pending replication files. If we do not disable the peer and delete the wal
// queues on zk directly, RS will get NoNode exception when updating the wal position
// and crash.
// Disable the peer if we are going to transit from STANDBY to DOWNGRADE_ACTIVE, and the
// replication is serial, as we need to update the lastPushedSequence id after we reopen all
// the regions, and for performance reason here we will update in batch, without using CAS, if
// we are still replicating at RS side, we may accidentally update the last pushed sequence id
// to a less value and cause the replication to be stuck.
env.getReplicationPeerManager().disablePeer(peerId);
}
env.getReplicationPeerManager().setPeerNewSyncReplicationState(peerId, toState);
}
private void checkQueuesDeleted(String peerId)
throws ReplicationException, DoNotRetryIOException {
for (ServerName replicator : queueStorage.getListOfReplicators()) {
List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (queueInfo.getPeerId().equals(peerId)) {
throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId +
", replicator: " + replicator + ", queueId: " + queueId);
}
}
}
if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs");
}
}
@Override
public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
Map<TableName, List<String>> tableCfs) {
if (tableCfs == null) {
return failedFuture(new ReplicationException("tableCfs is null"));
}
CompletableFuture<Void> future = new CompletableFuture<Void>();
addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
if (!completeExceptionally(future, error)) {
ReplicationPeerConfig newPeerConfig =
ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
if (!completeExceptionally(future, error)) {
future.complete(result);
}
});
}
});
return future;
}
protected final void setLastPushedSequenceId(MasterProcedureEnv env,
ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
Map<String, Long> lastSeqIds = new HashMap<String, Long>();
for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
if (!td.hasGlobalReplicationScope()) {
continue;
}
TableName tn = td.getTableName();
if (!peerConfig.needToReplicate(tn)) {
continue;
}
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
}
if (!lastSeqIds.isEmpty()) {
env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
}
}
public void fixUnDeletedQueues() throws ReplicationException {
for (Map.Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
ServerName replicator = replicatorAndQueueIds.getKey();
for (String queueId : replicatorAndQueueIds.getValue()) {
queueStorage.removeQueue(replicator, queueId);
}
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
}
for (String peerId : undeletedHFileRefsPeerIds) {
queueStorage.removePeerFromHFileRefs(peerId);
}
}
@Override
public AddReplicationPeerResponse addReplicationPeer(RpcController controller,
AddReplicationPeerRequest request) throws ServiceException {
try {
long procId = master.addReplicationPeer(request.getPeerId(),
ReplicationPeerConfigUtil.convert(request.getPeerConfig()),
request.getPeerState().getState().equals(ReplicationState.State.ENABLED));
return AddReplicationPeerResponse.newBuilder().setProcId(procId).build();
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetReplicationPeerConfigResponse getReplicationPeerConfig(RpcController controller,
GetReplicationPeerConfigRequest request) throws ServiceException {
GetReplicationPeerConfigResponse.Builder response = GetReplicationPeerConfigResponse
.newBuilder();
try {
String peerId = request.getPeerId();
ReplicationPeerConfig peerConfig = master.getReplicationPeerConfig(peerId);
response.setPeerId(peerId);
response.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);
}
return response.build();
}
/**
* <ol>
* <li>Add peer to replicationPeers</li>
* <li>Add the normal source and related replication queue</li>
* <li>Add HFile Refs</li>
* </ol>
* @param peerId the id of replication peer
*/
public void addPeer(String peerId) throws IOException {
boolean added = false;
try {
added = this.replicationPeers.addPeer(peerId);
} catch (ReplicationException e) {
throw new IOException(e);
}
if (added) {
addSource(peerId);
if (replicationForBulkLoadDataEnabled) {
throwIOExceptionWhenFail(() -> this.queueStorage.addPeerToHFileRefs(peerId));
}
}
}
private void abortWhenFail(ReplicationQueueOperation op) {
try {
op.exec();
} catch (ReplicationException e) {
server.abort("Failed to operate on replication queue", e);
}
}
private void throwIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
try {
op.exec();
} catch (ReplicationException e) {
throw new IOException(e);
}
}
private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation op) throws IOException {
try {
op.exec();
} catch (ReplicationException e) {
server.abort("Failed to operate on replication queue", e);
throw new IOException(e);
}
}
@VisibleForTesting
List<String> getAllQueues() throws IOException {
List<String> allQueues = Collections.emptyList();
try {
allQueues = queueStorage.getAllQueues(server.getServerName());
} catch (ReplicationException e) {
throw new IOException(e);
}
return allQueues;
}
private boolean isRangeFinished(long endBarrier, String encodedRegionName) throws IOException {
long pushedSeqId;
try {
pushedSeqId = storage.getLastSequenceId(encodedRegionName, peerId);
} catch (ReplicationException e) {
throw new IOException(
"Failed to get pushed sequence id for " + encodedRegionName + ", peer " + peerId, e);
}
// endBarrier is the open sequence number. When opening a region, the open sequence number will
// be set to the old max sequence id plus one, so here we need to minus one.
return pushedSeqId >= endBarrier - 1;
}
@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerId = replicationPeer.getId();
Set<String> namespaces = replicationPeer.getNamespaces();
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
if (tableCFMap != null) { // All peers with TableCFs
List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName)
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
} else if (namespaces != null) { // Only for set NAMESPACES peers
if (namespaces.contains(tableName.getNamespaceAsString())) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
tableName, Bytes.toString(family), peerId);
}
} else {
// user has explicitly not defined any table cfs for replication, means replicate all the
// data
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
}
}
private long getRecoveredQueueStartPos() {
long startPosition = 0;
String peerClusterZNode = source.getQueueId();
try {
startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(),
peerClusterZNode, this.queue.peek().getName());
LOG.trace("Recovered queue started with log {} at position {}", this.queue.peek(),
startPosition);
} catch (ReplicationException e) {
terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e);
}
return startPosition;
}
@Override
protected void createDirForRemoteWAL(MasterProcedureEnv env) throws IOException {
try {
tryFail();
} catch (ReplicationException e) {
throw new IOException(e);
}
}
@Override
public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
Map<TableName, List<String>> tableCfs) {
if (tableCfs == null) {
return failedFuture(new ReplicationException("tableCfs is null"));
}
CompletableFuture<Void> future = new CompletableFuture<Void>();
addListener(getReplicationPeerConfig(id), (peerConfig, error) -> {
if (!completeExceptionally(future, error)) {
ReplicationPeerConfig newPeerConfig = null;
try {
newPeerConfig = ReplicationPeerConfigUtil
.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
} catch (ReplicationException e) {
future.completeExceptionally(e);
return;
}
addListener(updateReplicationPeerConfig(id, newPeerConfig), (result, err) -> {
if (!completeExceptionally(future, error)) {
future.complete(result);
}
});
}
});
return future;
}
public void fixUnDeletedQueues() throws ReplicationException {
for (Map.Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
ServerName replicator = replicatorAndQueueIds.getKey();
for (String queueId : replicatorAndQueueIds.getValue()) {
queueStorage.removeQueue(replicator, queueId);
}
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
}
for (String peerId : undeletedHFileRefsPeerIds) {
queueStorage.removePeerFromHFileRefs(peerId);
}
}
@Override
public ListReplicationPeersResponse listReplicationPeers(RpcController controller,
ListReplicationPeersRequest request) throws ServiceException {
ListReplicationPeersResponse.Builder response = ListReplicationPeersResponse.newBuilder();
try {
List<ReplicationPeerDescription> peers = master
.listReplicationPeers(request.hasRegex() ? request.getRegex() : null);
for (ReplicationPeerDescription peer : peers) {
response.addPeerDesc(ReplicationPeerConfigUtil.toProtoReplicationPeerDescription(peer));
}
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);
}
return response.build();
}
private ReplicationQueueStorage create(Long lastPushedSeqId, Long... lastPushedSeqIds)
throws ReplicationException {
ReplicationQueueStorage queueStorage = mock(ReplicationQueueStorage.class);
if (lastPushedSeqIds.length == 0) {
when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId);
} else {
when(queueStorage.getLastSequenceId(anyString(), anyString())).thenReturn(lastPushedSeqId,
lastPushedSeqIds);
}
return queueStorage;
}
private void tryFail() throws ReplicationException {
synchronized (TestModifyPeerProcedureRetryBackoff.class) {
if (FAIL) {
throw new ReplicationException("Inject error");
}
FAIL = true;
}
}
public void removePeer(String peerId) throws ReplicationException {
if (!peers.containsKey(peerId)) {
// this should be a retry, just return
return;
}
peerStorage.removePeer(peerId);
peers.remove(peerId);
}
@Override
public DisableReplicationPeerResponse disableReplicationPeer(RpcController controller,
DisableReplicationPeerRequest request) throws ServiceException {
try {
long procId = master.disableReplicationPeer(request.getPeerId());
return DisableReplicationPeerResponse.newBuilder().setProcId(procId).build();
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);
}
}
@Test
public void testDeleteRowForDeletedRegion() throws IOException, ReplicationException {
TableName tableName = TableName.valueOf(name.getMethodName());
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
addBarrier(region, 40, 50, 60);
fillCatalogFamily(region);
String peerId = "1";
ReplicationQueueStorage queueStorage = create(59L);
@SuppressWarnings("unchecked")
ReplicationPeerManager peerManager = create(queueStorage, Lists.newArrayList(peerId));
ReplicationBarrierCleaner cleaner = create(peerManager);
// we have something in catalog family, so only delete 40
cleaner.chore();
assertArrayEquals(new long[] { 50, 60 },
MetaTableAccessor.getReplicationBarrier(UTIL.getConnection(), region.getRegionName()));
verify(queueStorage, never()).removeLastSequenceIds(anyString(), anyList());
// No catalog family, then we should remove the whole row
clearCatalogFamily(region);
cleaner.chore();
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
assertFalse(table
.exists(new Get(region.getRegionName()).addFamily(HConstants.REPLICATION_BARRIER_FAMILY)));
}
verify(queueStorage, times(1)).removeLastSequenceIds(peerId,
Arrays.asList(region.getEncodedName()));
}
public void transitPeerSyncReplicationState(String peerId, SyncReplicationState newState)
throws ReplicationException {
if (peerStorage.getPeerNewSyncReplicationState(peerId) != SyncReplicationState.NONE) {
// Only transit if this is not a retry
peerStorage.transitPeerSyncReplicationState(peerId);
}
ReplicationPeerDescription desc = peers.get(peerId);
if (desc.getSyncReplicationState() != newState) {
// Only recreate the desc if this is not a retry
peers.put(peerId,
new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), newState));
}
}
public void removeAllQueues(String peerId) throws ReplicationException {
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
// on-going when the refresh peer config procedure is done, if a RS which has already been
// scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
// the scan here, and if the RS who has claimed the queue crashed before creating recovered
// source, then the queue will leave there until the another RS detects the crash and helps
// removing the queue.
// A two pass scan can solve the problem. Anyway, the queue will not disappear during the
// claiming, it will either under the old RS or under the new RS, and a queue can only be
// claimed once after the refresh peer procedure done(as the next claim queue will just delete
// it), so we can make sure that a two pass scan will finally find the queue and remove it,
// unless it has already been removed by others.
ReplicationUtils.removeAllQueues(queueStorage, peerId);
ReplicationUtils.removeAllQueues(queueStorage, peerId);
}
@Override protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
if (peerConfig.isSyncReplication()) {
removeRemoteWALs(env);
}
env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId);
if (peerConfig.isSerial()) {
env.getReplicationPeerManager().removeAllLastPushedSeqIds(peerId);
}
LOG.info("Successfully removed peer {}", peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.postRemoveReplicationPeer(peerId);
}
}