下面列出了java.util.concurrent.atomic.AtomicReferenceArray#length() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* This method is a convenience for testing. Code should call {@link
* MapMakerInternalMap#containsValue} directly.
*/
@VisibleForTesting
boolean containsValue(Object value) {
try {
if (count != 0) { // read-volatile
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int length = table.length();
for (int i = 0; i < length; ++i) {
for (ReferenceEntry<K, V> e = table.get(i); e != null; e = e.getNext()) {
V entryValue = getLiveValue(e);
if (entryValue == null) {
continue;
}
if (map.valueEquivalence.equivalent(value, entryValue)) {
return true;
}
}
}
}
return false;
} finally {
postReadCleanup();
}
}
/**
* Removes an entry whose key has been garbage collected.
*/
@CanIgnoreReturnValue
boolean reclaimKey(ReferenceEntry<K, V> entry, int hash) {
lock();
try {
int newCount = count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
if (e == entry) {
++modCount;
ReferenceEntry<K, V> newFirst = removeFromChain(first, e);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return true;
}
}
return false;
} finally {
unlock();
}
}
public V put(int key, V value)
{
int hash = this.hash(key);
AtomicReferenceArray currentArray = this.table;
int length = currentArray.length();
int index = ConcurrentIntObjectHashMap.indexFor(hash, length);
Object o = currentArray.get(index);
if (o == null)
{
Entry<V> newEntry = new Entry<V>(key, value, null);
if (currentArray.compareAndSet(index, null, newEntry))
{
this.addToSize(1);
return null;
}
}
return this.slowPut(key, value, hash, currentArray);
}
boolean remove(Object key, int hash, Object value) {
lock();
try {
preWriteCleanup();
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
boolean explicitRemoval = false;
if (map.valueEquivalence.equivalent(value, entryValue)) {
explicitRemoval = true;
} else if (isCollected(valueReference)) {
// TODO(kak): Remove this branch
} else {
return false;
}
++modCount;
ReferenceEntry<K, V> newFirst = removeFromChain(first, e);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return explicitRemoval;
}
}
return false;
} finally {
unlock();
}
}
/** Removes an entry whose key has been garbage collected. */
boolean reclaimKey(ReferenceEntry<K, V> entry, int hash) {
lock();
try {
int newCount = count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
if (e == entry) {
++modCount;
ReferenceEntry<K, V> newFirst =
removeValueFromChain(
first,
e,
e.getKey(),
hash,
e.getValueReference().get(),
e.getValueReference(),
RemovalCause.COLLECTED);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return true;
}
}
return false;
} finally {
unlock();
postWriteCleanup();
}
}
public void forEachPrimitive(ObjIntConsumer<V> action) {
AtomicReferenceArray<V> values = this.values;
for (int index = 0; index < values.length(); index++) {
V value = values.get(index);
if (value != null) {
action.accept(value, index);
}
}
}
@Override
public boolean containsValue(@Nullable Object value) {
// does not impact recency ordering
if (value == null) {
return false;
}
// This implementation is patterned after ConcurrentHashMap, but without the locking. The only
// way for it to return a false negative would be for the target value to jump around in the map
// such that none of the subsequent iterations observed it, despite the fact that at every point
// in time it was present somewhere int the map. This becomes increasingly unlikely as
// CONTAINS_VALUE_RETRIES increases, though without locking it is theoretically possible.
long now = ticker.read();
final Segment<K, V>[] segments = this.segments;
long last = -1L;
for (int i = 0; i < CONTAINS_VALUE_RETRIES; i++) {
long sum = 0L;
for (Segment<K, V> segment : segments) {
// ensure visibility of most recent completed write
int unused = segment.count; // read-volatile
AtomicReferenceArray<ReferenceEntry<K, V>> table = segment.table;
for (int j = 0; j < table.length(); j++) {
for (ReferenceEntry<K, V> e = table.get(j); e != null; e = e.getNext()) {
V v = segment.getLiveValue(e, now);
if (v != null && valueEquivalence.equivalent(value, v)) {
return true;
}
}
}
sum += segment.modCount;
}
if (sum == last) {
break;
}
last = sum;
}
return false;
}
void clear() {
if (count != 0) { // read-volatile
lock();
try {
long now = map.ticker.read();
preWriteCleanup(now);
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
for (int i = 0; i < table.length(); ++i) {
for (ReferenceEntry<K, V> e = table.get(i); e != null; e = e.getNext()) {
// Loading references aren't actually in the map yet.
if (e.getValueReference().isActive()) {
K key = e.getKey();
V value = e.getValueReference().get();
RemovalCause cause =
(key == null || value == null) ? RemovalCause.COLLECTED : RemovalCause.EXPLICIT;
enqueueNotification(
key, e.getHash(), value, e.getValueReference().getWeight(), cause);
}
}
}
for (int i = 0; i < table.length(); ++i) {
table.set(i, null);
}
clearReferenceQueues();
writeQueue.clear();
accessQueue.clear();
readCount.set(0);
++modCount;
count = 0; // write-volatile
} finally {
unlock();
postWriteCleanup();
}
}
}
@Override
protected ExistsResponse newResponse(ExistsRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
boolean exists = false;
List<ShardOperationFailedException> shardFailures = null;
// if docs do exist, the last response will have exists = true (since we early terminate the shard requests)
for (int i = shardsResponses.length() - 1; i >= 0 ; i--) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = new ArrayList<>();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
successfulShards++;
if ((exists = ((ShardExistsResponse) shardResponse).exists())) {
successfulShards = shardsResponses.length() - failedShards;
break;
}
}
}
return new ExistsResponse(exists, shardsResponses.length(), successfulShards, failedShards, shardFailures);
}
void clear() {
if (count != 0) { // read-volatile
lock();
try {
long now = map.ticker.read();
preWriteCleanup(now);
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
for (int i = 0; i < table.length(); ++i) {
for (ReferenceEntry<K, V> e = table.get(i); e != null; e = e.getNext()) {
// Loading references aren't actually in the map yet.
if (e.getValueReference().isActive()) {
K key = e.getKey();
V value = e.getValueReference().get();
RemovalCause cause = (key == null || value == null) ? RemovalCause.COLLECTED : RemovalCause.EXPLICIT;
enqueueNotification(key, e.getHash(), value, e.getValueReference().getWeight(), cause);
}
}
}
for (int i = 0; i < table.length(); ++i) {
table.set(i, null);
}
clearReferenceQueues();
writeQueue.clear();
accessQueue.clear();
readCount.set(0);
++modCount;
count = 0; // write-volatile
} finally {
unlock();
postWriteCleanup();
}
}
}
@CanIgnoreReturnValue
V remove(Object key, int hash) {
lock();
try {
preWriteCleanup();
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
if (entryValue != null) {
// TODO(kak): Remove this branch
} else if (isCollected(valueReference)) {
// TODO(kak): Remove this branch
} else {
return null;
}
++modCount;
ReferenceEntry<K, V> newFirst = removeFromChain(first, e);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return entryValue;
}
}
return null;
} finally {
unlock();
}
}
@Nullable
V replace(K key, int hash, V newValue) {
lock();
try {
long now = map.ticker.read();
preWriteCleanup(now);
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
if (entryValue == null) {
if (valueReference.isActive()) {
// If the value disappeared, this entry is partially collected.
int newCount = this.count - 1;
++modCount;
ReferenceEntry<K, V> newFirst = removeValueFromChain(first, e, entryKey, hash, entryValue, valueReference, RemovalCause.COLLECTED);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
}
return null;
}
++modCount;
enqueueNotification(key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
setValue(e, key, newValue, now);
evictEntries(e);
return entryValue;
}
}
return null;
} finally {
unlock();
postWriteCleanup();
}
}
/**
* @param ignite Node.
* @param cacheName Cache name.
* @return Cache free lists data (partition number to map of buckets to tails and buckets size).
*/
private Map<Integer, T2<Map<Integer, long[]>, int[]>> getFreeListData(Ignite ignite, String cacheName) throws IgniteCheckedException {
GridCacheProcessor cacheProc = ((IgniteEx)ignite).context().cache();
GridCacheContext ctx = cacheProc.cache(cacheName).context();
List<GridDhtLocalPartition> parts = ctx.topology().localPartitions();
assertTrue(!parts.isEmpty());
assertEquals(ctx.affinity().partitions(), parts.size());
Map<Integer, T2<Map<Integer, long[]>, int[]>> res = new HashMap<>();
boolean foundNonEmpty = false;
boolean foundTails = false;
cacheProc.context().database().checkpointReadLock();
try {
for (GridDhtLocalPartition part : parts) {
AbstractFreeList freeList = (AbstractFreeList)part.dataStore().rowStore().freeList();
if (freeList == null)
// Lazy store.
continue;
// Flush free-list onheap cache to page memory.
freeList.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
AtomicReferenceArray<PagesList.Stripe[]> buckets = GridTestUtils.getFieldValue(freeList,
AbstractFreeList.class, "buckets");
AtomicLongArray bucketsSize = GridTestUtils.getFieldValue(freeList, PagesList.class, "bucketsSize");
assertNotNull(buckets);
assertNotNull(bucketsSize);
assertTrue(buckets.length() > 0);
assertEquals(bucketsSize.length(), buckets.length());
Map<Integer, long[]> tailsPerBucket = new HashMap<>();
for (int i = 0; i < buckets.length(); i++) {
PagesList.Stripe[] tails = buckets.get(i);
long ids[] = null;
if (tails != null) {
ids = new long[tails.length];
for (int j = 0; j < tails.length; j++)
ids[j] = tails[j].tailId;
}
tailsPerBucket.put(i, ids);
if (tails != null) {
assertTrue(tails.length > 0);
foundTails = true;
}
}
int[] cntsPerBucket = new int[bucketsSize.length()];
for (int i = 0; i < bucketsSize.length(); i++) {
cntsPerBucket[i] = (int)bucketsSize.get(i);
if (cntsPerBucket[i] > 0)
foundNonEmpty = true;
}
res.put(part.id(), new T2<>(tailsPerBucket, cntsPerBucket));
}
}
finally {
cacheProc.context().database().checkpointReadUnlock();
}
assertTrue(foundNonEmpty);
assertTrue(foundTails);
return res;
}
public static void verifyCapturedThreads(final AtomicReferenceArray<Thread> capturedThreads) {
for (int i = 0; i < capturedThreads.length(); i++) {
final Thread capturedThread = capturedThreads.get(i);
assertThat("Unexpected captured thread at index: " + i, capturedThread, notNullValue());
}
}
private List<MemoryBlock> getTinyFreeBlocks() {
List<MemoryBlock> value = new ArrayList<MemoryBlock>();
AtomicReferenceArray<SyncChunkStack> chunkStacks = this.freeList.tinyFreeLists;
for (int i = 0; i < chunkStacks.length(); i++) {
if (chunkStacks.get(i) == null) continue;
long addr = chunkStacks.get(i).topAddr;
final int size = Chunk.getSize(addr);
final long address = addr;
final int freeListId = i;
while (addr != 0L) {
value.add(new MemoryBlockNode(new MemoryBlock() {
@Override
public State getState() {
return State.DEALLOCATED;
}
@Override
public long getMemoryAddress() {
return address;
}
@Override
public int getBlockSize() {
return size;
}
@Override
public MemoryBlock getNextBlock() {
throw new UnsupportedOperationException();
}
@Override
public int getSlabId() {
throw new UnsupportedOperationException();
}
@Override
public int getFreeListId() {
return freeListId;
}
@Override
public int getRefCount() {
return 0;
}
@Override
public String getDataType() {
return "N/A";
}
@Override
public boolean isSerialized() {
return false;
}
@Override
public boolean isCompressed() {
return false;
}
@Override
public Object getDataValue() {
return null;
}
@Override
public ChunkType getChunkType() {
return null;
}
}));
addr = Chunk.getNext(addr);
}
}
return value;
}
/** Removes an entry whose value has been garbage collected. */
boolean reclaimValue(K key, int hash, ValueReference<K, V> valueReference) {
lock();
try {
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
ValueReference<K, V> v = e.getValueReference();
if (v == valueReference) {
++modCount;
ReferenceEntry<K, V> newFirst =
removeValueFromChain(
first,
e,
entryKey,
hash,
valueReference.get(),
valueReference,
RemovalCause.COLLECTED);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return true;
}
return false;
}
}
return false;
} finally {
unlock();
if (!isHeldByCurrentThread()) { // don't cleanup inside of put
postWriteCleanup();
}
}
}
ListenableFuture<V> lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
boolean createNewEntry = true;
lock();
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
preWriteCleanup(now);
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();
if (valueReference.isLoading()) {
createNewEntry = false;
} else {
V value = valueReference.get();
if (value == null) {
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
} else if (map.isExpired(e, now)) {
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accomodate an incorrect expiration queue.
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
} else {
recordLockedRead(e, now);
statsCounter.recordHits(1);
// we were concurrent with loading; don't consider refresh
return Futures.immediateFuture(value);
}
// immediately reuse invalid entries
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
if (createNewEntry) {
loadingValueReference = new LoadingValueReference<>();
if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();
postWriteCleanup();
}
if (createNewEntry) {
try {
// Synchronizes on the entry to allow failing fast when a recursive load is
// detected. This may be circumvented when an entry is copied, but will fail fast most
// of the time.
synchronized (e) {
return loadAsync(key, hash, loadingValueReference, loader);
}
} finally {
statsCounter.recordMisses(1);
}
} else {
// The entry already exists. Wait for loading.
return waitForLoadingValue(e, key, valueReference);
}
}
/**
* Returns the index of the node in the given {@link AtomicReferenceArray} whose edge starts with the given
* first character.
* <p/>
* This method expects that some constraints are enforced on the {@link AtomicReferenceArray}:
* <ul>
* <li>
* The array must already be in ascending sorted order of the first character of the edge for each node
* </li>
* <li>
* No entries in the array can be null
* </li>
* <li>
* Any existing node in the array cannot be swapped concurrently for another unless the edge associated
* with the other node also starts with the same first character
* </li>
* </ul>
* If these constraints are enforced as expected, then this method will have deterministic behaviour even in the
* face of concurrent modification.
*
* @param childNodes An {@link AtomicReferenceArray} of {@link com.googlecode.concurrenttrees.radix.node.Node} objects, which is used in accordance with
* the constraints documented in this method
*
* @param edgeFirstCharacter The first character of the edge for which the associated node is required
* @return The index of the node representing the indicated edge, or a value < 0 if no such node exists in the
* array
*/
public static int binarySearchForEdge(AtomicReferenceArray<Node> childNodes, Character edgeFirstCharacter) {
// inspired by Collections#indexedBinarySearch()
int low = 0;
int high = childNodes.length() - 1;
while (low <= high) {
int mid = (low + high) >>> 1;
Node midVal = childNodes.get(mid);
int cmp = midVal.getIncomingEdgeFirstCharacter().compareTo(edgeFirstCharacter);
if (cmp < 0)
low = mid + 1;
else if (cmp > 0)
high = mid - 1;
else
return mid; // key found
}
return -(low + 1); // key not found
}
boolean remove(Object key, int hash, Object value) {
lock();
try {
long now = map.ticker.read();
preWriteCleanup(now);
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
RemovalCause cause;
if (map.valueEquivalence.equivalent(value, entryValue)) {
cause = RemovalCause.EXPLICIT;
} else if (entryValue == null && valueReference.isActive()) {
cause = RemovalCause.COLLECTED;
} else {
// currently loading
return false;
}
++modCount;
ReferenceEntry<K, V> newFirst =
removeValueFromChain(first, e, entryKey, hash, entryValue, valueReference, cause);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return (cause == RemovalCause.EXPLICIT);
}
}
return false;
} finally {
unlock();
postWriteCleanup();
}
}
@Nullable
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
long now = map.ticker.read();
preWriteCleanup(now);
int newCount = this.count + 1;
if (newCount > this.threshold) { // ensure capacity
expand();
newCount = this.count + 1;
}
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
// Look for an existing entry.
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
// We found an existing entry.
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
if (entryValue == null) {
++modCount;
if (valueReference.isActive()) {
enqueueNotification(key, hash, entryValue, valueReference.getWeight(), RemovalCause.COLLECTED);
setValue(e, key, value, now);
newCount = this.count; // count remains unchanged
} else {
setValue(e, key, value, now);
newCount = this.count + 1;
}
this.count = newCount; // write-volatile
evictEntries(e);
return null;
}
else if (onlyIfAbsent) {
// Mimic
// "if (!map.containsKey(key)) ...
// else return map.get(key);
recordLockedRead(e, now);
return entryValue;
} else {
// clobber existing entry, count remains unchanged
++modCount;
enqueueNotification(key, hash, entryValue, valueReference.getWeight(), RemovalCause.REPLACED);
setValue(e, key, value, now);
evictEntries(e);
return entryValue;
}
}
}
// Create a new entry.
++modCount;
ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
setValue(newEntry, key, value, now);
table.set(index, newEntry);
newCount = this.count + 1;
this.count = newCount; // write-volatile
evictEntries(newEntry);
return null;
} finally {
unlock();
postWriteCleanup();
}
}