下面列出了java.util.concurrent.locks.StampedLock#unlockWrite() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static Writer interruptibleWriter(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate) {
return new Writer("InterruptibleWriter") { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
if (timeout < 0)
stamp(sl.writeLockInterruptibly());
else
stamp(sl.tryWriteLock(timeout, unit));
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} catch (Throwable x) { thrown(x);
} finally { if (stamp() != 0L) sl.unlockWrite(stamp()); } }};
}
static Writer interruptibleWriter(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate) {
return new Writer("InterruptibleWriter") { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
if (timeout < 0)
stamp(sl.writeLockInterruptibly());
else
stamp(sl.tryWriteLock(timeout, unit));
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} catch (Throwable x) { thrown(x);
} finally { if (stamp() != 0L) sl.unlockWrite(stamp()); } }};
}
static Writer interruptibleWriter(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate) {
return new Writer("InterruptibleWriter") { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
if (timeout < 0)
stamp(sl.writeLockInterruptibly());
else
stamp(sl.tryWriteLock(timeout, unit));
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} catch (Throwable x) { thrown(x);
} finally { if (stamp() != 0L) sl.unlockWrite(stamp()); } }};
}
static Writer interruptibleWriter(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate) {
return new Writer("InterruptibleWriter") { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
if (timeout < 0)
stamp(sl.writeLockInterruptibly());
else
stamp(sl.tryWriteLock(timeout, unit));
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} catch (Throwable x) { thrown(x);
} finally { if (stamp() != 0L) sl.unlockWrite(stamp()); } }};
}
static Writer interruptibleWriter(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate) {
return new Writer("InterruptibleWriter") { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
if (timeout < 0)
stamp(sl.writeLockInterruptibly());
else
stamp(sl.tryWriteLock(timeout, unit));
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} catch (Throwable x) { thrown(x);
} finally { if (stamp() != 0L) sl.unlockWrite(stamp()); } }};
}
static Writer interruptibleWriter(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate) {
return new Writer("InterruptibleWriter") { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
if (timeout < 0)
stamp(sl.writeLockInterruptibly());
else
stamp(sl.tryWriteLock(timeout, unit));
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} catch (Throwable x) { thrown(x);
} finally { if (stamp() != 0L) sl.unlockWrite(stamp()); } }};
}
public void addOrUpdateRegion(final Region region) {
Requires.requireNonNull(region, "region");
Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");
final long regionId = region.getId();
final byte[] startKey = BytesUtil.nullToEmpty(region.getStartKey());
final StampedLock stampedLock = this.stampedLock;
final long stamp = stampedLock.writeLock();
try {
this.regionTable.put(regionId, region.copy());
this.rangeTable.put(startKey, regionId);
} finally {
stampedLock.unlockWrite(stamp);
}
}
public void splitRegion(final long leftId, final Region right) {
Requires.requireNonNull(right, "right");
Requires.requireNonNull(right.getRegionEpoch(), "right.regionEpoch");
final StampedLock stampedLock = this.stampedLock;
final long stamp = stampedLock.writeLock();
try {
final Region left = this.regionTable.get(leftId);
Requires.requireNonNull(left, "left");
final byte[] leftStartKey = BytesUtil.nullToEmpty(left.getStartKey());
final byte[] leftEndKey = left.getEndKey();
final long rightId = right.getId();
final byte[] rightStartKey = right.getStartKey();
final byte[] rightEndKey = right.getEndKey();
Requires.requireNonNull(rightStartKey, "rightStartKey");
Requires.requireTrue(BytesUtil.compare(leftStartKey, rightStartKey) < 0,
"leftStartKey must < rightStartKey");
if (leftEndKey == null || rightEndKey == null) {
Requires.requireTrue(leftEndKey == rightEndKey, "leftEndKey must == rightEndKey");
} else {
Requires.requireTrue(BytesUtil.compare(leftEndKey, rightEndKey) == 0, "leftEndKey must == rightEndKey");
Requires.requireTrue(BytesUtil.compare(rightStartKey, rightEndKey) < 0,
"rightStartKey must < rightEndKey");
}
final RegionEpoch leftEpoch = left.getRegionEpoch();
leftEpoch.setVersion(leftEpoch.getVersion() + 1);
left.setEndKey(rightStartKey);
this.regionTable.put(rightId, right.copy());
this.rangeTable.put(rightStartKey, rightId);
} finally {
stampedLock.unlockWrite(stamp);
}
}
/**
* A serialized lock deserializes as unlocked
*/
public void testSerialization() {
StampedLock lock = new StampedLock();
lock.writeLock();
StampedLock clone = serialClone(lock);
assertTrue(lock.isWriteLocked());
assertFalse(clone.isWriteLocked());
long s = clone.writeLock();
assertTrue(clone.isWriteLocked());
clone.unlockWrite(s);
assertFalse(clone.isWriteLocked());
}
/**
* A stamp obtained from an unsuccessful lock operation does not validate
*/
public void testValidate2() throws InterruptedException {
StampedLock lock = new StampedLock();
long s = assertNonZero(lock.writeLock());
assertTrue(lock.validate(s));
assertFalse(lock.validate(lock.tryWriteLock()));
assertFalse(lock.validate(lock.tryWriteLock(0L, SECONDS)));
assertFalse(lock.validate(lock.tryReadLock()));
assertFalse(lock.validate(lock.tryReadLock(0L, SECONDS)));
assertFalse(lock.validate(lock.tryOptimisticRead()));
lock.unlockWrite(s);
}
static Writer writer(final StampedLock sl, final Phaser gate) {
return new Writer() { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
stamp(sl.writeLock());
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} finally { sl.unlockWrite(stamp()); } }};
}
/**
* tryOptimisticRead stamp does not validate if a write lock intervenes
*/
public void testValidateOptimisticWriteLocked() {
final StampedLock lock = new StampedLock();
final long p = assertValid(lock, lock.tryOptimisticRead());
final long s = assertValid(lock, lock.writeLock());
assertFalse(lock.validate(p));
assertEquals(0L, lock.tryOptimisticRead());
assertTrue(lock.validate(s));
lock.unlockWrite(s);
}
static Writer writer(final StampedLock sl, final Phaser gate) {
return new Writer() { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
stamp(sl.writeLock());
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} finally { sl.unlockWrite(stamp()); } }};
}
static Writer writer(final StampedLock sl, final Phaser gate) {
return new Writer() { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
stamp(sl.writeLock());
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} finally { sl.unlockWrite(stamp()); } }};
}
static Writer writer(final StampedLock sl, final Phaser gate) {
return new Writer() { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
stamp(sl.writeLock());
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} finally { sl.unlockWrite(stamp()); } }};
}
/**
* toString indicates current lock state
*/
public void testToString() {
StampedLock lock = new StampedLock();
assertTrue(lock.toString().contains("Unlocked"));
long s = lock.writeLock();
assertTrue(lock.toString().contains("Write-locked"));
lock.unlockWrite(s);
s = lock.readLock();
assertTrue(lock.toString().contains("Read-locks"));
}
static Writer writer(final StampedLock sl, final Phaser gate) {
return new Writer() { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
stamp(sl.writeLock());
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} finally { sl.unlockWrite(stamp()); } }};
}
@Override
protected void process(int chainId, ChainContext context, NulsLogger commonLog) {
StampedLock lock = context.getLock();
long stamp = lock.tryOptimisticRead();
try {
for (; ; stamp = lock.writeLock()) {
if (stamp == 0L) {
continue;
}
// possibly racy reads
SortedSet<Chain> forkChains = BlockChainManager.getForkChains(chainId);
if (!lock.validate(stamp)) {
continue;
}
if (forkChains.isEmpty()) {
break;
}
context.printChains();
//遍历当前分叉链,与主链进行比对,找出最大高度差,与默认参数chainSwtichThreshold对比,确定要切换的分叉链
Chain masterChain = BlockChainManager.getMasterChain(chainId);
ChainParameters parameters = context.getParameters();
int chainSwtichThreshold = parameters.getChainSwtichThreshold();
Chain switchChain = new Chain();
int maxHeightDifference = 0;
for (Chain forkChain : forkChains) {
int temp = (int) (forkChain.getEndHeight() - masterChain.getEndHeight());
if (temp > maxHeightDifference) {
maxHeightDifference = temp;
switchChain = forkChain;
}
}
commonLog.debug("chainId-" + chainId + ", maxHeightDifference:" + maxHeightDifference + ", chainSwtichThreshold:" + chainSwtichThreshold);
//高度差不够
if (maxHeightDifference < chainSwtichThreshold) {
break;
}
stamp = lock.tryConvertToWriteLock(stamp);
if (stamp == 0L) {
continue;
}
// exclusive access
//进行切换,切换前变更模块运行状态
context.setStatus(StatusEnum.SWITCHING);
ConsensusCall.notice(chainId, MODULE_WAITING);
TransactionCall.notice(chainId, MODULE_WAITING);
if (BlockChainManager.switchChain(chainId, masterChain, switchChain)) {
commonLog.info("chainId-" + chainId + ", switchChain success");
} else {
commonLog.info("chainId-" + chainId + ", switchChain fail, auto rollback success");
}
context.printChains();
ConsensusCall.notice(chainId, MODULE_WORKING);
TransactionCall.notice(chainId, MODULE_WORKING);
break;
}
} finally {
context.setStatus(StatusEnum.RUNNING);
if (StampedLock.isWriteLockStamp(stamp)) {
lock.unlockWrite(stamp);
}
}
}
@Override
protected void process(int chainId, ChainContext context, NulsLogger commonLog) {
ChainParameters parameters = context.getParameters();
int orphanChainMaxAge = parameters.getOrphanChainMaxAge();
StampedLock lock = context.getLock();
long stamp = lock.tryOptimisticRead();
try {
for (; ; stamp = lock.writeLock()) {
if (stamp == 0L) {
continue;
}
// possibly racy reads
SortedSet<Chain> orphanChains = BlockChainManager.getOrphanChains(chainId);
if (!lock.validate(stamp)) {
continue;
}
if (orphanChains.isEmpty()) {
break;
}
stamp = lock.tryConvertToWriteLock(stamp);
if (stamp == 0L) {
continue;
}
// exclusive access
List<Node> availableNodes = NetworkCall.getAvailableNodes(chainId);
//维护现有孤儿链,尝试在链首增加区块
context.setStatus(UPDATE_ORPHAN_CHAINS);
long l = System.nanoTime();
for (Chain orphanChain : orphanChains) {
maintainOrphanChain(chainId, orphanChain, availableNodes, orphanChainMaxAge);
//孤儿链维护时间超过十秒,就退出
if (System.nanoTime() - l > 10000000000L) {
break;
}
}
break;
}
} finally {
context.setStatus(RUNNING);
if (StampedLock.isWriteLockStamp(stamp)) {
lock.unlockWrite(stamp);
}
}
}
/**
* 清理孤儿链
* @param chainId
* @param heightRange
* @param context
* @param orphanChainMaxAge
*/
private void orphanChainsCleaner(int chainId, int heightRange, ChainContext context, int orphanChainMaxAge) {
StampedLock lock = context.getLock();
long stamp = lock.tryOptimisticRead();
NulsLogger logger = context.getLogger();
try {
for (; ; stamp = lock.writeLock()) {
if (stamp == 0L) {
continue;
}
// possibly racy reads
//1.清理链起始高度位于主链最新高度增减30(可配置)范围外的孤儿链
SortedSet<Chain> orphanChains = BlockChainManager.getOrphanChains(chainId);
if (!lock.validate(stamp)) {
continue;
}
if (orphanChains.isEmpty()) {
break;
}
stamp = lock.tryConvertToWriteLock(stamp);
if (stamp == 0L) {
continue;
}
// exclusive access
Chain masterChain = BlockChainManager.getMasterChain(chainId);
long latestHeight = masterChain.getEndHeight();
SortedSet<Chain> deleteSet = new TreeSet<>(Chain.COMPARATOR);
//1.标记
for (Chain orphanChain : orphanChains) {
if (Math.abs(orphanChain.getStartHeight() - latestHeight) > heightRange || orphanChain.getAge().get() > orphanChainMaxAge) {
//清理orphanChain,并递归清理orphanChain的所有子链
deleteSet.add(orphanChain);
}
}
//2.清理
for (Chain chain : deleteSet) {
BlockChainManager.deleteOrphanChain(chainId, chain);
logger.info("remove orphan chain, chain:" + chain);
}
break;
}
} finally {
if (StampedLock.isWriteLockStamp(stamp)) {
lock.unlockWrite(stamp);
}
}
}