下面列出了java.util.concurrent.locks.StampedLock#tryConvertToWriteLock() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
protected void process(int chainId, ChainContext context, NulsLogger commonLog) {
StampedLock lock = context.getLock();
long stamp = lock.tryOptimisticRead();
try {
for (; ; stamp = lock.writeLock()) {
if (stamp == 0L) {
continue;
}
// possibly racy reads
SortedSet<Chain> forkChains = BlockChainManager.getForkChains(chainId);
if (!lock.validate(stamp)) {
continue;
}
if (forkChains.isEmpty()) {
break;
}
context.printChains();
//遍历当前分叉链,与主链进行比对,找出最大高度差,与默认参数chainSwtichThreshold对比,确定要切换的分叉链
Chain masterChain = BlockChainManager.getMasterChain(chainId);
ChainParameters parameters = context.getParameters();
int chainSwtichThreshold = parameters.getChainSwtichThreshold();
Chain switchChain = new Chain();
int maxHeightDifference = 0;
for (Chain forkChain : forkChains) {
int temp = (int) (forkChain.getEndHeight() - masterChain.getEndHeight());
if (temp > maxHeightDifference) {
maxHeightDifference = temp;
switchChain = forkChain;
}
}
commonLog.debug("chainId-" + chainId + ", maxHeightDifference:" + maxHeightDifference + ", chainSwtichThreshold:" + chainSwtichThreshold);
//高度差不够
if (maxHeightDifference < chainSwtichThreshold) {
break;
}
stamp = lock.tryConvertToWriteLock(stamp);
if (stamp == 0L) {
continue;
}
// exclusive access
//进行切换,切换前变更模块运行状态
context.setStatus(StatusEnum.SWITCHING);
ConsensusCall.notice(chainId, MODULE_WAITING);
TransactionCall.notice(chainId, MODULE_WAITING);
if (BlockChainManager.switchChain(chainId, masterChain, switchChain)) {
commonLog.info("chainId-" + chainId + ", switchChain success");
} else {
commonLog.info("chainId-" + chainId + ", switchChain fail, auto rollback success");
}
context.printChains();
ConsensusCall.notice(chainId, MODULE_WORKING);
TransactionCall.notice(chainId, MODULE_WORKING);
break;
}
} finally {
context.setStatus(StatusEnum.RUNNING);
if (StampedLock.isWriteLockStamp(stamp)) {
lock.unlockWrite(stamp);
}
}
}
@Override
protected void process(int chainId, ChainContext context, NulsLogger commonLog) {
ChainParameters parameters = context.getParameters();
int orphanChainMaxAge = parameters.getOrphanChainMaxAge();
StampedLock lock = context.getLock();
long stamp = lock.tryOptimisticRead();
try {
for (; ; stamp = lock.writeLock()) {
if (stamp == 0L) {
continue;
}
// possibly racy reads
SortedSet<Chain> orphanChains = BlockChainManager.getOrphanChains(chainId);
if (!lock.validate(stamp)) {
continue;
}
if (orphanChains.isEmpty()) {
break;
}
stamp = lock.tryConvertToWriteLock(stamp);
if (stamp == 0L) {
continue;
}
// exclusive access
List<Node> availableNodes = NetworkCall.getAvailableNodes(chainId);
//维护现有孤儿链,尝试在链首增加区块
context.setStatus(UPDATE_ORPHAN_CHAINS);
long l = System.nanoTime();
for (Chain orphanChain : orphanChains) {
maintainOrphanChain(chainId, orphanChain, availableNodes, orphanChainMaxAge);
//孤儿链维护时间超过十秒,就退出
if (System.nanoTime() - l > 10000000000L) {
break;
}
}
break;
}
} finally {
context.setStatus(RUNNING);
if (StampedLock.isWriteLockStamp(stamp)) {
lock.unlockWrite(stamp);
}
}
}
/**
* 清理分叉链
*
* @param chainId
* @param heightRange
* @param context
*/
private void forkChainsCleaner(int chainId, int heightRange, ChainContext context) {
StampedLock lock = context.getLock();
long stamp = lock.tryOptimisticRead();
NulsLogger logger = context.getLogger();
try {
for (; ; stamp = lock.writeLock()) {
if (stamp == 0L) {
continue;
}
// possibly racy reads
//1.清理链起始高度位于主链最新高度增减30(可配置)范围外的分叉链
SortedSet<Chain> forkChains = BlockChainManager.getForkChains(chainId);
if (!lock.validate(stamp)) {
continue;
}
if (forkChains.isEmpty()) {
break;
}
stamp = lock.tryConvertToWriteLock(stamp);
if (stamp == 0L) {
continue;
}
// exclusive access
Chain masterChain = BlockChainManager.getMasterChain(chainId);
long latestHeight = masterChain.getEndHeight();
SortedSet<Chain> deleteSet = new TreeSet<>(Chain.COMPARATOR);
//1.标记
for (Chain forkChain : forkChains) {
if (latestHeight - forkChain.getStartHeight() > heightRange || masterChain.getHashList().contains(forkChain.getEndHash())) {
//清理orphanChain,并递归清理orphanChain的所有子链
deleteSet.add(forkChain);
}
}
//2.清理
for (Chain chain : deleteSet) {
BlockChainManager.deleteForkChain(chainId, chain, true);
logger.info("remove fork chain, chain:" + chain);
}
break;
}
} finally {
if (StampedLock.isWriteLockStamp(stamp)) {
lock.unlockWrite(stamp);
}
}
}
/**
* 清理孤儿链
* @param chainId
* @param heightRange
* @param context
* @param orphanChainMaxAge
*/
private void orphanChainsCleaner(int chainId, int heightRange, ChainContext context, int orphanChainMaxAge) {
StampedLock lock = context.getLock();
long stamp = lock.tryOptimisticRead();
NulsLogger logger = context.getLogger();
try {
for (; ; stamp = lock.writeLock()) {
if (stamp == 0L) {
continue;
}
// possibly racy reads
//1.清理链起始高度位于主链最新高度增减30(可配置)范围外的孤儿链
SortedSet<Chain> orphanChains = BlockChainManager.getOrphanChains(chainId);
if (!lock.validate(stamp)) {
continue;
}
if (orphanChains.isEmpty()) {
break;
}
stamp = lock.tryConvertToWriteLock(stamp);
if (stamp == 0L) {
continue;
}
// exclusive access
Chain masterChain = BlockChainManager.getMasterChain(chainId);
long latestHeight = masterChain.getEndHeight();
SortedSet<Chain> deleteSet = new TreeSet<>(Chain.COMPARATOR);
//1.标记
for (Chain orphanChain : orphanChains) {
if (Math.abs(orphanChain.getStartHeight() - latestHeight) > heightRange || orphanChain.getAge().get() > orphanChainMaxAge) {
//清理orphanChain,并递归清理orphanChain的所有子链
deleteSet.add(orphanChain);
}
}
//2.清理
for (Chain chain : deleteSet) {
BlockChainManager.deleteOrphanChain(chainId, chain);
logger.info("remove orphan chain, chain:" + chain);
}
break;
}
} finally {
if (StampedLock.isWriteLockStamp(stamp)) {
lock.unlockWrite(stamp);
}
}
}