下面列出了java.util.concurrent.locks.StampedLock#readLock() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public Region getRegionById(final long regionId) {
final StampedLock stampedLock = this.stampedLock;
long stamp = stampedLock.tryOptimisticRead();
// validate() emit a load-fence, but no store-fence. So you should only have
// load instructions inside a block of tryOptimisticRead() / validate(),
// because it is meant to the a read-only operation, and therefore, it is fine
// to use the loadFence() function to avoid re-ordering.
Region region = safeCopy(this.regionTable.get(regionId));
if (!stampedLock.validate(stamp)) {
stamp = stampedLock.readLock();
try {
region = safeCopy(this.regionTable.get(regionId));
} finally {
stampedLock.unlockRead(stamp);
}
}
return region;
}
/**
* Returns the list of regions to which the keys belongs.
*/
public Map<Region, List<byte[]>> findRegionsByKeys(final List<byte[]> keys) {
Requires.requireNonNull(keys, "keys");
final Map<Region, List<byte[]>> regionMap = Maps.newHashMap();
final StampedLock stampedLock = this.stampedLock;
final long stamp = stampedLock.readLock();
try {
for (final byte[] key : keys) {
final Region region = findRegionByKeyWithoutLock(key);
regionMap.computeIfAbsent(region, k -> Lists.newArrayList()).add(key);
}
return regionMap;
} finally {
stampedLock.unlockRead(stamp);
}
}
/**
* Returns the list of regions to which the keys belongs.
*/
public Map<Region, List<KVEntry>> findRegionsByKvEntries(final List<KVEntry> kvEntries) {
Requires.requireNonNull(kvEntries, "kvEntries");
final Map<Region, List<KVEntry>> regionMap = Maps.newHashMap();
final StampedLock stampedLock = this.stampedLock;
final long stamp = stampedLock.readLock();
try {
for (final KVEntry kvEntry : kvEntries) {
final Region region = findRegionByKeyWithoutLock(kvEntry.getKey());
regionMap.computeIfAbsent(region, k -> Lists.newArrayList()).add(kvEntry);
}
return regionMap;
} finally {
stampedLock.unlockRead(stamp);
}
}
/**
* Returns the startKey of next region.
*/
public byte[] findStartKeyOfNextRegion(final byte[] key) {
Requires.requireNonNull(key, "key");
final StampedLock stampedLock = this.stampedLock;
long stamp = stampedLock.tryOptimisticRead();
// get the least key strictly greater than the given key
byte[] nextStartKey = this.rangeTable.higherKey(key);
if (!stampedLock.validate(stamp)) {
stamp = stampedLock.readLock();
try {
// get the least key strictly greater than the given key
nextStartKey = this.rangeTable.higherKey(key);
} finally {
stampedLock.unlockRead(stamp);
}
}
return nextStartKey;
}
/**
* Get the cached leader of the group, return it when found, null otherwise.
* Make sure calls {@link #refreshLeader(CliClientService, String, int)} already
* before invoke this method.
*
* @param groupId raft group id
* @return peer of leader
*/
public PeerId selectLeader(final String groupId) {
Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id");
final GroupConf gc = this.groupConfTable.get(groupId);
if (gc == null) {
return null;
}
final StampedLock stampedLock = gc.stampedLock;
long stamp = stampedLock.tryOptimisticRead();
PeerId leader = gc.leader;
if (!stampedLock.validate(stamp)) {
stamp = stampedLock.readLock();
try {
leader = gc.leader;
} finally {
stampedLock.unlockRead(stamp);
}
}
return leader;
}
/**
* Get the configuration by groupId, returns null when not found.
*
* @param groupId raft group id
* @return configuration of the group id
*/
public Configuration getConfiguration(final String groupId) {
Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id");
final GroupConf gc = this.groupConfTable.get(groupId);
if (gc == null) {
return null;
}
final StampedLock stampedLock = gc.stampedLock;
long stamp = stampedLock.tryOptimisticRead();
Configuration conf = gc.conf;
if (!stampedLock.validate(stamp)) {
stamp = stampedLock.readLock();
try {
conf = gc.conf;
} finally {
stampedLock.unlockRead(stamp);
}
}
return conf;
}
/**
* Multiple threads can hold a read lock when not write-locked
*/
public void testMultipleReadLocks() {
final StampedLock lock = new StampedLock();
final long s = lock.readLock();
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() throws InterruptedException {
long s2 = lock.tryReadLock();
assertValid(lock, s2);
lock.unlockRead(s2);
long s3 = lock.tryReadLock(LONG_DELAY_MS, MILLISECONDS);
assertValid(lock, s3);
lock.unlockRead(s3);
long s4 = lock.readLock();
assertValid(lock, s4);
lock.unlockRead(s4);
lock.asReadLock().lock();
lock.asReadLock().unlock();
lock.asReadLock().lockInterruptibly();
lock.asReadLock().unlock();
lock.asReadLock().tryLock(Long.MIN_VALUE, DAYS);
lock.asReadLock().unlock();
}});
awaitTermination(t);
lock.unlockRead(s);
}
/**
* writeLock() succeeds only after a reading thread unlocks
*/
public void testWriteAfterReadLock() throws InterruptedException {
final CountDownLatch aboutToLock = new CountDownLatch(1);
final StampedLock lock = new StampedLock();
long rs = lock.readLock();
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
aboutToLock.countDown();
long s = lock.writeLock();
assertTrue(lock.isWriteLocked());
assertFalse(lock.isReadLocked());
lock.unlockWrite(s);
}});
aboutToLock.await();
waitForThreadToEnterWaitState(t);
assertFalse(lock.isWriteLocked());
assertTrue(lock.isReadLocked());
lock.unlockRead(rs);
awaitTermination(t);
assertUnlocked(lock);
}
/**
* writeLock() succeeds only after reading threads unlock
*/
public void testWriteAfterMultipleReadLocks() {
final StampedLock lock = new StampedLock();
long s = lock.readLock();
Thread t1 = newStartedThread(new CheckedRunnable() {
public void realRun() {
long rs = lock.readLock();
lock.unlockRead(rs);
}});
awaitTermination(t1);
Thread t2 = newStartedThread(new CheckedRunnable() {
public void realRun() {
long ws = lock.writeLock();
lock.unlockWrite(ws);
}});
assertTrue(lock.isReadLocked());
assertFalse(lock.isWriteLocked());
lock.unlockRead(s);
awaitTermination(t2);
assertUnlocked(lock);
}
/**
* readLock() succeed only after a writing thread unlocks
*/
public void testReadAfterWriteLock() {
final StampedLock lock = new StampedLock();
final CountDownLatch threadsStarted = new CountDownLatch(2);
final long s = lock.writeLock();
final Runnable acquireReleaseReadLock = new CheckedRunnable() {
public void realRun() {
threadsStarted.countDown();
long rs = lock.readLock();
assertTrue(lock.isReadLocked());
assertFalse(lock.isWriteLocked());
lock.unlockRead(rs);
}};
Thread t1 = newStartedThread(acquireReleaseReadLock);
Thread t2 = newStartedThread(acquireReleaseReadLock);
await(threadsStarted);
waitForThreadToEnterWaitState(t1);
waitForThreadToEnterWaitState(t2);
assertTrue(lock.isWriteLocked());
assertFalse(lock.isReadLocked());
releaseWriteLock(lock, s);
awaitTermination(t1);
awaitTermination(t2);
assertUnlocked(lock);
}
@Override
public Collection<RegisterMeta> lookup(RegisterMeta.ServiceMeta serviceMeta) {
RegisterValue value = registries.get(serviceMeta);
if (value == null) {
return Collections.emptyList();
}
// do not try optimistic read
final StampedLock stampedLock = value.lock;
final long stamp = stampedLock.readLock();
try {
return Lists.newArrayList(value.metaSet);
} finally {
stampedLock.unlockRead(stamp);
}
}
@Override
public Map<ServiceMeta, Integer> consumers() {
Map<ServiceMeta, Integer> result = Maps.newHashMap();
for (Map.Entry<RegisterMeta.ServiceMeta, RegisterValue> entry : registries.entrySet()) {
RegisterValue value = entry.getValue();
final StampedLock stampedLock = value.lock;
long stamp = stampedLock.tryOptimisticRead();
int optimisticVal = value.metaSet.size();
if (stampedLock.validate(stamp)) {
result.put(entry.getKey(), optimisticVal);
continue;
}
stamp = stampedLock.readLock();
try {
result.put(entry.getKey(), value.metaSet.size());
} finally {
stampedLock.unlockRead(stamp);
}
}
return result;
}
/**
* Returns the region to which the key belongs.
*/
public Region findRegionByKey(final byte[] key) {
Requires.requireNonNull(key, "key");
final StampedLock stampedLock = this.stampedLock;
final long stamp = stampedLock.readLock();
try {
return findRegionByKeyWithoutLock(key);
} finally {
stampedLock.unlockRead(stamp);
}
}
/**
* Returns the list of regions covered by startKey and endKey.
*/
public List<Region> findRegionsByKeyRange(final byte[] startKey, final byte[] endKey) {
final StampedLock stampedLock = this.stampedLock;
final long stamp = stampedLock.readLock();
try {
final byte[] realStartKey = BytesUtil.nullToEmpty(startKey);
final NavigableMap<byte[], Long> subRegionMap;
if (endKey == null) {
subRegionMap = this.rangeTable.tailMap(realStartKey, false);
} else {
subRegionMap = this.rangeTable.subMap(realStartKey, false, endKey, true);
}
final List<Region> regionList = Lists.newArrayListWithCapacity(subRegionMap.size() + 1);
final Map.Entry<byte[], Long> headEntry = this.rangeTable.floorEntry(realStartKey);
if (headEntry == null) {
reportFail(startKey);
throw reject(startKey, "fail to find region by startKey");
}
regionList.add(safeCopy(this.regionTable.get(headEntry.getValue())));
for (final Long regionId : subRegionMap.values()) {
regionList.add(safeCopy(this.regionTable.get(regionId)));
}
return regionList;
} finally {
stampedLock.unlockRead(stamp);
}
}
/**
* interruptible operations throw InterruptedException when read locked and interrupted
*/
public void testInterruptibleOperationsThrowInterruptedExceptionReadLockedInterrupted() {
final StampedLock lock = new StampedLock();
long s = lock.readLock();
Action[] interruptibleLockBlockingActions = {
() -> lock.writeLockInterruptibly(),
() -> lock.tryWriteLock(Long.MAX_VALUE, DAYS),
() -> lock.asWriteLock().lockInterruptibly(),
() -> lock.asWriteLock().tryLock(Long.MAX_VALUE, DAYS),
};
shuffle(interruptibleLockBlockingActions);
assertThrowInterruptedExceptionWhenInterrupted(interruptibleLockBlockingActions);
}
/**
* tryReadLock succeeds if read locked but not write locked
*/
public void testTryLockWhenReadLocked() {
final StampedLock lock = new StampedLock();
long s = lock.readLock();
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
long rs = lock.tryReadLock();
assertValid(lock, rs);
lock.unlockRead(rs);
}});
awaitTermination(t);
lock.unlockRead(s);
}
/**
* tryWriteLock fails when read locked
*/
public void testTryWriteLockWhenReadLocked() {
final StampedLock lock = new StampedLock();
long s = lock.readLock();
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
threadAssertEquals(0L, lock.tryWriteLock());
}});
awaitTermination(t);
lock.unlockRead(s);
}
/**
* toString indicates current lock state
*/
public void testToString() {
StampedLock lock = new StampedLock();
assertTrue(lock.toString().contains("Unlocked"));
long s = lock.writeLock();
assertTrue(lock.toString().contains("Write-locked"));
lock.unlockWrite(s);
s = lock.readLock();
assertTrue(lock.toString().contains("Read-locks"));
}
/**
* tryConvertToReadLock succeeds for valid stamps
*/
public void testTryConvertToReadLock() throws InterruptedException {
StampedLock lock = new StampedLock();
long s, p;
assertEquals(0L, lock.tryConvertToReadLock(0L));
s = assertValid(lock, lock.tryOptimisticRead());
p = assertValid(lock, lock.tryConvertToReadLock(s));
assertTrue(lock.isReadLocked());
assertEquals(1, lock.getReadLockCount());
assertTrue(lock.validate(s));
lock.unlockRead(p);
s = assertValid(lock, lock.tryOptimisticRead());
lock.readLock();
p = assertValid(lock, lock.tryConvertToReadLock(s));
assertTrue(lock.isReadLocked());
assertEquals(2, lock.getReadLockCount());
lock.unlockRead(p);
lock.unlockRead(p);
assertUnlocked(lock);
for (BiConsumer<StampedLock, Long> readUnlocker : readUnlockers()) {
for (Function<StampedLock, Long> writeLocker : writeLockers()) {
s = assertValid(lock, writeLocker.apply(lock));
p = assertValid(lock, lock.tryConvertToReadLock(s));
assertFalse(lock.validate(s));
assertTrue(lock.isReadLocked());
assertEquals(1, lock.getReadLockCount());
readUnlocker.accept(lock, p);
}
for (Function<StampedLock, Long> readLocker : readLockers()) {
s = assertValid(lock, readLocker.apply(lock));
assertEquals(s, lock.tryConvertToReadLock(s));
assertTrue(lock.validate(s));
assertTrue(lock.isReadLocked());
assertEquals(1, lock.getReadLockCount());
readUnlocker.accept(lock, s);
}
}
}
/**
* Locks all segments in specific order to prevent deadlocks
*/
void lockAllBucketsRead() {
for (StampedLock lock : lockAry) {
lock.readLock();
}
}