下面列出了org.apache.hadoop.hbase.HConstants#NO_SEQNUM 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public boolean removeRegion(final HRegion r, ServerName destination) {
HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
metricsRegionServerImpl.requestsCountCache.remove(r.getRegionInfo().getEncodedName());
if (destination != null) {
long closeSeqNum = r.getMaxFlushedSeqId();
if (closeSeqNum == HConstants.NO_SEQNUM) {
// No edits in WAL for this region; get the sequence number when the region was opened.
closeSeqNum = r.getOpenSeqNum();
if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
}
boolean selfMove = ServerName.isSameAddress(destination, this.getServerName());
addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum, selfMove);
if (selfMove) {
this.regionServerAccounting.getRetainedRegionRWRequestsCnt().put(r.getRegionInfo().getEncodedName()
, new Pair<>(r.getReadRequestsCount(), r.getWriteRequestsCount()));
}
}
this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
configurationManager.deregisterObserver(r);
return toReturn != null;
}
/**
* The latest seqnum that the server writing to meta observed when opening the region.
* E.g. the seqNum when the result of {@link #getServerName(Result, int)} was written.
* (Copied from MetaTableAccessor)
* @param r Result to pull the seqNum from
* @return SeqNum, or HConstants.NO_SEQNUM if there's no value written.
*/
private static long getSeqNumDuringOpen(final Result r, final int replicaId) {
Cell cell = r.getColumnLatestCell(CATALOG_FAMILY, getSeqNumColumn(replicaId));
if (cell == null || cell.getValueLength() == 0) {
return HConstants.NO_SEQNUM;
}
return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
}
void reportTransition(MasterProcedureEnv env, RegionStateNode regionNode, ServerName serverName,
TransitionCode transitionCode, long seqId) throws IOException {
if (state != RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH) {
// should be a retry
return;
}
if (!targetServer.equals(serverName)) {
throw new UnexpectedStateException("Received report from " + serverName + ", expected " +
targetServer + ", " + regionNode + ", proc=" + this);
}
checkTransition(regionNode, transitionCode, seqId);
// this state means we have received the report from RS, does not mean the result is fine, as we
// may received a FAILED_OPEN.
this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_REPORT_SUCCEED;
this.transitionCode = transitionCode;
this.seqId = seqId;
// Persist the transition code and openSeqNum(if provided).
// We should not update the hbase:meta directly as this may cause races when master restarts,
// as the old active master may incorrectly report back to RS and cause the new master to hang
// on a OpenRegionProcedure forever. See HBASE-22060 and HBASE-22074 for more details.
boolean succ = false;
try {
persistAndWake(env, regionNode);
succ = true;
} finally {
if (!succ) {
this.state = RegionRemoteProcedureBaseState.REGION_REMOTE_PROCEDURE_DISPATCH;
this.transitionCode = null;
this.seqId = HConstants.NO_SEQNUM;
}
}
try {
updateTransitionWithoutPersistingToMeta(env, regionNode, transitionCode, seqId);
} catch (IOException e) {
throw new AssertionError("should not happen", e);
}
}
void updateRegionLocation(RegionStateNode regionStateNode) throws IOException {
if (regionStateNode.getRegionInfo().isMetaRegion()) {
updateMetaLocation(regionStateNode.getRegionInfo(), regionStateNode.getRegionLocation(),
regionStateNode.getState());
} else {
long openSeqNum = regionStateNode.getState() == State.OPEN ? regionStateNode.getOpenSeqNum()
: HConstants.NO_SEQNUM;
updateUserRegionLocation(regionStateNode.getRegionInfo(), regionStateNode.getState(),
regionStateNode.getRegionLocation(), openSeqNum,
// The regionStateNode may have no procedure in a test scenario; allow for this.
regionStateNode.getProcedure() != null ? regionStateNode.getProcedure().getProcId()
: Procedure.NO_PROC_ID);
}
}
public void splitRegion(RegionInfo parent, RegionInfo hriA, RegionInfo hriB,
ServerName serverName) throws IOException {
TableDescriptor htd = getDescriptor(parent.getTable());
long parentOpenSeqNum = HConstants.NO_SEQNUM;
if (htd.hasGlobalReplicationScope()) {
parentOpenSeqNum = getOpenSeqNumForParentRegion(parent);
}
MetaTableAccessor.splitRegion(master.getConnection(), parent, parentOpenSeqNum, hriA, hriB,
serverName, getRegionReplication(htd));
}
private RegionReplicaInfo(final Result result, final HRegionLocation location) {
this.row = result != null ? result.getRow() : null;
this.regionInfo = location != null ? location.getRegion() : null;
this.regionState = (result != null && regionInfo != null)
? RegionStateStore.getRegionState(result, regionInfo)
: null;
this.serverName = location != null ? location.getServerName() : null;
this.seqNum = (location != null) ? location.getSeqNum() : HConstants.NO_SEQNUM;
this.targetServerName = (result != null && regionInfo != null)
? MetaTableAccessor.getTargetServerName(result, regionInfo.getReplicaId())
: null;
this.mergeRegionInfo = (result != null)
? MetaTableAccessor.getMergeRegionsWithName(result.rawCells())
: null;
if (result != null) {
PairOfSameType<RegionInfo> daughterRegions = MetaTableAccessor.getDaughterRegions(result);
this.splitRegionInfo = new LinkedHashMap<>();
if (daughterRegions.getFirst() != null) {
splitRegionInfo.put(HConstants.SPLITA_QUALIFIER_STR, daughterRegions.getFirst());
}
if (daughterRegions.getSecond() != null) {
splitRegionInfo.put(HConstants.SPLITB_QUALIFIER_STR, daughterRegions.getSecond());
}
} else {
this.splitRegionInfo = null;
}
}
/**
* Updates last flushed sequence Ids for the regions on server sn
* @param sn
* @param hsl
*/
private void updateLastFlushedSequenceIds(ServerName sn, ServerMetrics hsl) {
for (Entry<byte[], RegionMetrics> entry : hsl.getRegionMetrics().entrySet()) {
byte[] encodedRegionName = Bytes.toBytes(RegionInfo.encodeRegionName(entry.getKey()));
Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
long l = entry.getValue().getCompletedSequenceId();
// Don't let smaller sequence ids override greater sequence ids.
if (LOG.isTraceEnabled()) {
LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
", completeSequenceId=" + l);
}
if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
flushedSequenceIdByRegion.put(encodedRegionName, l);
} else if (l != HConstants.NO_SEQNUM && l < existingValue) {
LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
+ l + ") that is less than the previous last flushed sequence id ("
+ existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
}
ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
computeIfAbsent(storeFlushedSequenceIdsByRegion, encodedRegionName,
() -> new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR));
for (Entry<byte[], Long> storeSeqId : entry.getValue().getStoreSequenceId().entrySet()) {
byte[] family = storeSeqId.getKey();
existingValue = storeFlushedSequenceId.get(family);
l = storeSeqId.getValue();
if (LOG.isTraceEnabled()) {
LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
", existingValue=" + existingValue + ", completeSequenceId=" + l);
}
// Don't let smaller sequence ids override greater sequence ids.
if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
storeFlushedSequenceId.put(family, l);
}
}
}
}
@Override
public void postOpenDeployTasks(final PostOpenDeployContext context) throws IOException {
HRegion r = context.getRegion();
long openProcId = context.getOpenProcId();
long masterSystemTime = context.getMasterSystemTime();
rpcServices.checkOpen();
LOG.info("Post open deploy tasks for {}, pid={}, masterSystemTime={}",
r.getRegionInfo().getRegionNameAsString(), openProcId, masterSystemTime);
// Do checks to see if we need to compact (references or too many files)
for (HStore s : r.stores.values()) {
if (s.hasReferences() || s.needsCompaction()) {
this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
}
}
long openSeqNum = r.getOpenSeqNum();
if (openSeqNum == HConstants.NO_SEQNUM) {
// If we opened a region, we should have read some sequence number from it.
LOG.error(
"No sequence number found when opening " + r.getRegionInfo().getRegionNameAsString());
openSeqNum = 0;
}
// Notify master
if (!reportRegionStateTransition(new RegionStateTransitionContext(TransitionCode.OPENED,
openSeqNum, openProcId, masterSystemTime, r.getRegionInfo()))) {
throw new IOException(
"Failed to report opened region to master: " + r.getRegionInfo().getRegionNameAsString());
}
triggerFlushInPrimaryRegion(r);
LOG.debug("Finished post open deploy task for " + r.getRegionInfo().getRegionNameAsString());
}
/**
* This method is called before the flush is executed.
* @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush
* is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
*/
@Override
public long preFlushSeqIDEstimation() {
if(compositeSnapshot) {
return HConstants.NO_SEQNUM;
}
Segment segment = getLastSegment();
if(segment == null) {
return HConstants.NO_SEQNUM;
}
return segment.getMinSequenceId();
}
/**
* Returns the lowest unflushed sequence id for the region.
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionName</code>. Will
* return {@link HConstants#NO_SEQNUM} when none.
*/
long getLowestSequenceId(final byte[] encodedRegionName) {
synchronized (this.tieLock) {
Map<?, Long> m = this.flushingSequenceIds.get(encodedRegionName);
long flushingLowest = m != null ? getLowestSequenceId(m) : Long.MAX_VALUE;
m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
long unflushedLowest = m != null ? getLowestSequenceId(m) : HConstants.NO_SEQNUM;
return Math.min(flushingLowest, unflushedLowest);
}
}
/**
* @param sequenceids Map to search for lowest value.
* @return Lowest value found in <code>sequenceids</code>.
*/
private static long getLowestSequenceId(Map<?, Long> sequenceids) {
long lowest = HConstants.NO_SEQNUM;
for (Long sid: sequenceids.values()) {
if (lowest == HConstants.NO_SEQNUM || sid.longValue() < lowest) {
lowest = sid.longValue();
}
}
return lowest;
}
/**
* @param src
* @return New Map that has same keys as <code>src</code> but instead of a Map for a value, it
* instead has found the smallest sequence id and it returns that as the value instead.
*/
private <T extends Map<?, Long>> Map<byte[], Long> flattenToLowestSequenceId(Map<byte[], T> src) {
if (src == null || src.isEmpty()) {
return null;
}
Map<byte[], Long> tgt = new HashMap<>();
for (Map.Entry<byte[], T> entry : src.entrySet()) {
long lowestSeqId = getLowestSequenceId(entry.getValue());
if (lowestSeqId != HConstants.NO_SEQNUM) {
tgt.put(entry.getKey(), lowestSeqId);
}
}
return tgt;
}
/**
* For hadoop.ipc internal call. Do NOT use.
* We have to parse the hostname to recreate the exception.
* The input is the one generated by {@link #getMessage()}
*/
public RegionMovedException(String s) {
int posHostname = s.indexOf(HOST_FIELD) + HOST_FIELD.length();
int posPort = s.indexOf(PORT_FIELD) + PORT_FIELD.length();
int posStartCode = s.indexOf(STARTCODE_FIELD) + STARTCODE_FIELD.length();
int posSeqNum = s.indexOf(LOCATIONSEQNUM_FIELD) + LOCATIONSEQNUM_FIELD.length();
String tmpHostname = null;
int tmpPort = -1;
long tmpStartCode = -1;
long tmpSeqNum = HConstants.NO_SEQNUM;
try {
// TODO: this whole thing is extremely brittle.
tmpHostname = s.substring(posHostname, s.indexOf(' ', posHostname));
tmpPort = Integer.parseInt(s.substring(posPort, s.indexOf(' ', posPort)));
tmpStartCode = Long.parseLong(s.substring(posStartCode, s.indexOf('.', posStartCode)));
tmpSeqNum = Long.parseLong(s.substring(posSeqNum, s.indexOf('.', posSeqNum)));
} catch (Exception ignored) {
LOG.warn("Can't parse the hostname, port and startCode from this string: " +
s + ", continuing");
}
hostname = tmpHostname;
port = tmpPort;
startCode = tmpStartCode;
locationSeqNum = tmpSeqNum;
}
@Override
public MutableLong load(String key) throws Exception {
return new MutableLong(HConstants.NO_SEQNUM);
}
@Override
public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
if (closed.get()) return null;
return HConstants.NO_SEQNUM;
}
@Override
public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
return HConstants.NO_SEQNUM;
}
@Override
public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
return HConstants.NO_SEQNUM;
}
private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException {
MasterFileSystem fs = master.getMasterFileSystem();
long maxSeqId = WALSplitUtil.getMaxRegionSequenceId(master.getConfiguration(), region,
fs::getFileSystem, fs::getWALFileSystem);
return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM;
}
Long startCacheFlush(final byte[] encodedRegionName, final Map<byte[], Long> familyToSeq) {
Map<ImmutableByteArray, Long> oldSequenceIds = null;
Long lowestUnflushedInRegion = HConstants.NO_SEQNUM;
synchronized (tieLock) {
Map<ImmutableByteArray, Long> m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
if (m != null) {
// NOTE: Removal from this.lowestUnflushedSequenceIds must be done in controlled
// circumstance because another concurrent thread now may add sequenceids for this family
// (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually it
// is fine because updates are blocked when this method is called. Make sure!!!
for (Map.Entry<byte[], Long> entry : familyToSeq.entrySet()) {
ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap((byte[]) entry.getKey());
Long seqId = null;
if(entry.getValue() == HConstants.NO_SEQNUM) {
seqId = m.remove(familyNameWrapper);
} else {
seqId = m.replace(familyNameWrapper, entry.getValue());
}
if (seqId != null) {
if (oldSequenceIds == null) {
oldSequenceIds = new HashMap<>();
}
oldSequenceIds.put(familyNameWrapper, seqId);
}
}
if (oldSequenceIds != null && !oldSequenceIds.isEmpty()) {
if (this.flushingSequenceIds.put(encodedRegionName, oldSequenceIds) != null) {
LOG.warn("Flushing Map not cleaned up for " + Bytes.toString(encodedRegionName) +
", sequenceid=" + oldSequenceIds);
}
}
if (m.isEmpty()) {
// Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever
// even if the region is already moved to other server.
// Do not worry about data racing, we held write lock of region when calling
// startCacheFlush, so no one can add value to the map we removed.
this.lowestUnflushedSequenceIds.remove(encodedRegionName);
} else {
// Flushing a subset of the region families. Return the sequence id of the oldest entry.
lowestUnflushedInRegion = Collections.min(m.values());
}
}
}
// Do this check outside lock.
if (oldSequenceIds != null && oldSequenceIds.isEmpty()) {
// TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either
// the region is already flushing (which would make this call invalid), or there
// were no appends after last flush, so why are we starting flush? Maybe we should
// assert not empty. Less rigorous, but safer, alternative is telling the caller to stop.
// For now preserve old logic.
LOG.warn("Couldn't find oldest sequenceid for " + Bytes.toString(encodedRegionName));
}
return lowestUnflushedInRegion;
}
@Override
public long preFlushSeqIDEstimation() {
return HConstants.NO_SEQNUM;
}