下面列出了怎么用java.util.concurrent.atomic.AtomicReferenceArray的API类实例代码及写法,或者点击链接到github查看源代码。
public void run() {
FutureTask[] callbackArray = new FutureTask[callbacksList.size()];
AtomicReferenceArray<FutureTask> bufferList =
new AtomicReferenceArray<FutureTask>(callbacksList.toArray(callbackArray));
ExecutorService executor = Executors.newFixedThreadPool(10);
try {
for (int i = 0; i < bufferList.length(); i++) {
Thread.sleep(sleepTime);
FutureTask f = bufferList.get(i);
if (!f.isDone()) {
executor.submit(f).get();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException ee) {
ee.printStackTrace();
} finally {
executor.shutdownNow();
}
}
public SystemResourcesCounter(Time probeInterval) {
probeIntervalMs = probeInterval.toMilliseconds();
checkState(this.probeIntervalMs > 0);
setName(SystemResourcesCounter.class.getSimpleName() + " probing thread");
cpuUsagePerProcessor = new AtomicReferenceArray<>(hardwareAbstractionLayer.getProcessor().getLogicalProcessorCount());
NetworkIF[] networkIFs = hardwareAbstractionLayer.getNetworkIFs();
bytesReceivedPerInterface = new long[networkIFs.length];
bytesSentPerInterface = new long[networkIFs.length];
receiveRatePerInterface = new AtomicLongArray(networkIFs.length);
sendRatePerInterface = new AtomicLongArray(networkIFs.length);
networkInterfaceNames = new String[networkIFs.length];
for (int i = 0; i < networkInterfaceNames.length; i++) {
networkInterfaceNames[i] = networkIFs[i].getName();
}
}
@VisibleForTesting
@GuardedBy("this")
boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) {
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
if (e == entry) {
++modCount;
ReferenceEntry<K, V> newFirst = removeValueFromChain(first, e, e.getKey(), hash, e.getValueReference().get(), e.getValueReference(), cause);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return true;
}
}
return false;
}
private void sequentialForEachKeyValue(IntObjectProcedure<V> block, AtomicReferenceArray currentArray, int start, int end)
{
for (int i = start; i < end; i++)
{
Object o = currentArray.get(i);
if (o == RESIZED || o == RESIZING)
{
throw new ConcurrentModificationException("can't iterate while resizing!");
}
Entry<V> e = (Entry<V>) o;
while (e != null)
{
int key = e.getKey();
Object value = e.getValue();
block.value(key, (V) value);
e = e.getNext();
}
}
}
@Override
public int drain(final Consumer<E> c, final int limit) {
if (null == c)
throw new IllegalArgumentException("c is null");
if (limit < 0)
throw new IllegalArgumentException("limit is negative: " + limit);
if (limit == 0)
return 0;
final AtomicReferenceArray<E> buffer = this.buffer;
final int mask = this.mask;
final long cIndex = lpConsumerIndex();
for (int i = 0; i < limit; i++) {
final long index = cIndex + i;
final int offset = calcCircularRefElementOffset(index, mask);
final E e = lvRefElement(buffer, offset);
if (null == e) {
return i;
}
spRefElement(buffer, offset, null);
// ordered store -> atomic and ordered for size()
soConsumerIndex(index + 1);
c.accept(e);
}
return limit;
}
/** Implementation for get and containsKey */
private final Object internalGet(Object k) {
int h = spread(k.hashCode());
retry: for (AtomicReferenceArray<Node> tab = table; tab != null;) {
Node e, p; Object ek, ev; int eh; // locals to read fields once
for (e = tabAt(tab, (tab.length() - 1) & h); e != null; e = e.next) {
if ((eh = e.hash) == MOVED) {
if ((ek = e.key) instanceof TreeBin) // search TreeBin
return ((TreeBin)ek).getValue(h, k);
else { // restart with new table
tab = (AtomicReferenceArray<Node>)ek;
continue retry;
}
}
else if ((eh & HASH_BITS) == h && (ev = e.val) != null &&
((ek = e.key) == k || k.equals(ek)))
return ev;
}
break;
}
return null;
}
@SuppressWarnings("unchecked")
public <K> Binding<T> mapDependency(@NotNull Key<K> dependency, @NotNull Function<? super K, ? extends K> fn) {
return new Binding<>(dependencies,
(compiledBindings, threadsafe, scope, slot) ->
compiler.compile(new CompiledBindingLocator() {
@Override
public @NotNull <Q> CompiledBinding<Q> get(Key<Q> key) {
CompiledBinding<Q> originalBinding = compiledBindings.get(key);
if (!key.equals(dependency)) return originalBinding;
return new CompiledBinding<Q>() {
@Nullable
@Override
public Q getInstance(AtomicReferenceArray[] scopedInstances, int synchronizedScope) {
Q instance = originalBinding.getInstance(scopedInstances, synchronizedScope);
return (Q) fn.apply((K) instance);
}
};
}
}, threadsafe, scope, slot), location);
}
/**
* This method is a convenience for testing. Code should call {@link LocalCache#containsValue}
* directly.
*/
@VisibleForTesting
boolean containsValue(Object value) {
try {
if (count != 0) { // read-volatile
long now = map.ticker.read();
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int length = table.length();
for (int i = 0; i < length; ++i) {
for (ReferenceEntry<K, V> e = table.get(i); e != null; e = e.getNext()) {
V entryValue = getLiveValue(e, now);
if (entryValue == null) {
continue;
}
if (map.valueEquivalence.equivalent(value, entryValue)) {
return true;
}
}
}
}
return false;
} finally {
postReadCleanup();
}
}
/**
* Removes an entry whose key has been garbage collected.
*/
@CanIgnoreReturnValue
boolean reclaimKey(ReferenceEntry<K, V> entry, int hash) {
lock();
try {
int newCount = count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
if (e == entry) {
++modCount;
ReferenceEntry<K, V> newFirst = removeFromChain(first, e);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return true;
}
}
return false;
} finally {
unlock();
}
}
public ConcurrentOffHeapWeakHolder(int initialCapacity)
{
if (initialCapacity < 0)
{
throw new IllegalArgumentException("Illegal Initial Capacity: " + initialCapacity);
}
if (initialCapacity > MAXIMUM_CAPACITY)
{
initialCapacity = MAXIMUM_CAPACITY;
}
int threshold = initialCapacity;
threshold += threshold >> 1; // threshold = length * 0.75
int capacity = 1;
while (capacity < threshold)
{
capacity <<= 1;
}
if (capacity >= PARTITIONED_SIZE_THRESHOLD)
{
this.partitionedSize = new AtomicIntegerArray(SIZE_BUCKETS * 16); // we want 7 extra slots and 64 bytes for each slot. int is 4 bytes, so 64 bytes is 16 ints.
}
this.table = new AtomicReferenceArray(capacity + 1);
}
@Override
protected DfsOnlyResponse newResponse(DfsOnlyRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
AtomicArray<DfsSearchResult> dfsResults = new AtomicArray<>(shardsResponses.length());
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// simply ignore non active shards
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = new ArrayList<>();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
dfsResults.set(i, ((ShardDfsOnlyResponse) shardResponse).getDfsSearchResult());
successfulShards++;
}
}
AggregatedDfs dfs = searchPhaseController.aggregateDfs(dfsResults);
return new DfsOnlyResponse(dfs, shardsResponses.length(), successfulShards, failedShards, shardFailures, buildTookInMillis(request));
}
public SpscLinkedArrayQueue(final int bufferSize) {
int p2capacity = SpscArrayQueue.roundToPowerOfTwo(Math.max(8, bufferSize));
int mask = p2capacity - 1;
AtomicReferenceArray<Object> buffer = new AtomicReferenceArray<>(p2capacity + 1);
producerBuffer = buffer;
producerMask = mask;
adjustLookAheadStep(p2capacity);
consumerBuffer = buffer;
consumerMask = mask;
producerLookAhead = mask - 1L; // we know it's all empty to start with
soProducerIndex(0L);
}
@Override
public Iterator<E> iterator() {
return new Iterator<E>() {
private final AtomicReferenceArray<E> itData = data;
private int idx = 0;
private E nextElem = null;
@Override
public boolean hasNext() {
E next = null;
do {
if (this.idx >= this.itData.length()) {
next = null;
break;
}
next = this.itData.get(this.idx);
if (next == null) {
this.idx++;
}
} while (next == null);
this.nextElem = next;
return next != null;
}
@Override
public E next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
this.idx++;
return this.nextElem;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
boolean remove(Object key, int hash, Object value) {
lock();
try {
long now = map.ticker.read();
preWriteCleanup(now);
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
RemovalCause cause;
if (map.valueEquivalence.equivalent(value, entryValue)) {
cause = RemovalCause.EXPLICIT;
} else if (entryValue == null && valueReference.isActive()) {
cause = RemovalCause.COLLECTED;
} else {
// currently loading
return false;
}
++modCount;
ReferenceEntry<K, V> newFirst = removeValueFromChain(first, e, entryKey, hash, entryValue, valueReference, cause);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return (cause == RemovalCause.EXPLICIT);
}
}
return false;
} finally {
unlock();
postWriteCleanup();
}
}
/**
* Migrates the data from the atomic array to a {@link List} for easier
* consumption.
*
* @param data the per-thread results from the test
* @return the per-thread results as a standard collection
*/
private static <T> List<T> toList(AtomicReferenceArray<T> data) {
List<T> list = new ArrayList<>(data.length());
for (int i = 0; i < data.length(); i++) {
list.add(data.get(i));
}
return list;
}
@Nonnull
private <TRef extends PsiReference, TResult> Map<TRef, TResult> getMap(boolean physical, int index) {
AtomicReferenceArray<Map> array = physical ? myPhysicalMaps : myNonPhysicalMaps;
Map map = array.get(index);
while (map == null) {
Map newMap = createWeakMap();
map = array.compareAndSet(index, null, newMap) ? newMap : array.get(index);
}
//noinspection unchecked
return map;
}
boolean remove(Object key, int hash, Object value) {
lock();
try {
long now = map.ticker.read();
preWriteCleanup(now);
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
RemovalCause cause;
if (map.valueEquivalence.equivalent(value, entryValue)) {
cause = RemovalCause.EXPLICIT;
} else if (entryValue == null && valueReference.isActive()) {
cause = RemovalCause.COLLECTED;
} else {
// currently loading
return false;
}
++modCount;
ReferenceEntry<K, V> newFirst = removeValueFromChain(first, e, entryKey, hash, entryValue, valueReference, cause);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return (cause == RemovalCause.EXPLICIT);
}
}
return false;
} finally {
unlock();
postWriteCleanup();
}
}
private static SparseBitSet copyWithNewLength(SparseBitSet sparseBitSet, int newLength, int lengthToClone, int newMaxValue) {
AtomicReferenceArray<Bucket> compactBuckets = new AtomicReferenceArray<Bucket>(newLength);
for (int i = 0; i < lengthToClone; i++) {
if (sparseBitSet.buckets.get(i) != null) compactBuckets.set(i, sparseBitSet.buckets.get(i));
}
return new SparseBitSet(newMaxValue, compactBuckets);
}
V replace(K key, int hash, V newValue) {
lock();
try {
preWriteCleanup();
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
// If the value disappeared, this entry is partially collected,
// and we should pretend like it doesn't exist.
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
if (entryValue == null) {
if (isCollected(valueReference)) {
int newCount = this.count - 1;
++modCount;
ReferenceEntry<K, V> newFirst = removeFromChain(first, e);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
}
return null;
}
++modCount;
setValue(e, newValue);
return entryValue;
}
}
return null;
} finally {
unlock();
}
}
EagerOperation(
EagerSession session,
TFE_Op opNativeHandle,
TFE_TensorHandle[] outputNativeHandles,
String type,
String name) {
this.session = session;
this.type = type;
this.name = name;
this.opHandle = opNativeHandle;
this.outputHandles = outputNativeHandles;
session.attach(opNativeHandle);
session.attach(outputNativeHandles);
this.outputTensors = new AtomicReferenceArray<>(outputNativeHandles.length);
}
boolean removeLoadingValue(K key, int hash, LoadingValueReference<K, V> valueReference) {
lock();
try {
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
ValueReference<K, V> v = e.getValueReference();
if (v == valueReference) {
if (valueReference.isActive()) {
e.setValueReference(valueReference.getOldValue());
} else {
ReferenceEntry<K, V> newFirst = removeEntryFromChain(first, e);
table.set(index, newFirst);
}
return true;
}
return false;
}
}
return false;
} finally {
unlock();
postWriteCleanup();
}
}
/**
* getAcquire returns the last value set
*/
public void testGetAcquireSet() {
AtomicReferenceArray<Integer> aa = new AtomicReferenceArray<>(SIZE);
for (int i = 0; i < SIZE; i++) {
aa.set(i, one);
assertEquals(one, aa.getAcquire(i));
aa.set(i, two);
assertEquals(two, aa.getAcquire(i));
aa.set(i, m3);
assertEquals(m3, aa.getAcquire(i));
}
}
private void basicFree(long addr, int idx, AtomicReferenceArray<SyncChunkStack> freeLists) {
SyncChunkStack clq = freeLists.get(idx);
if (clq != null) {
clq.offer(addr);
} else {
clq = new SyncChunkStack();
clq.offer(addr);
if (!freeLists.compareAndSet(idx, null, clq)) {
clq = freeLists.get(idx);
clq.offer(addr);
}
}
}
/**
* get returns the last value set at index
*/
public void testGetSet() {
AtomicReferenceArray aa = new AtomicReferenceArray(SIZE);
for (int i = 0; i < SIZE; i++) {
aa.set(i, one);
assertSame(one, aa.get(i));
aa.set(i, two);
assertSame(two, aa.get(i));
aa.set(i, m3);
assertSame(m3, aa.get(i));
}
}
V replace(K key, int hash, V newValue) {
lock();
try {
preWriteCleanup();
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
// If the value disappeared, this entry is partially collected,
// and we should pretend like it doesn't exist.
ValueReference<K, V> valueReference = e.getValueReference();
V entryValue = valueReference.get();
if (entryValue == null) {
if (isCollected(valueReference)) {
int newCount = this.count - 1;
++modCount;
ReferenceEntry<K, V> newFirst = removeFromChain(first, e);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
}
return null;
}
++modCount;
setValue(e, newValue);
return entryValue;
}
}
return null;
} finally {
unlock();
}
}
@Override
public E relaxedPeek() {
final AtomicReferenceArray<E> buffer = this.buffer;
final int mask = this.mask;
long currentConsumerIndex;
long nextConsumerIndex = lvConsumerIndex();
E e;
do {
currentConsumerIndex = nextConsumerIndex;
e = lvRefElement(buffer, calcCircularRefElementOffset(currentConsumerIndex, mask));
// sandwich the element load between 2 consumer index loads
nextConsumerIndex = lvConsumerIndex();
} while (nextConsumerIndex != currentConsumerIndex);
return e;
}
/**
* getAndSet returns previous value and sets to given value at given index
*/
public void testGetAndSet() {
AtomicReferenceArray aa = new AtomicReferenceArray(SIZE);
for (int i = 0; i < SIZE; i++) {
aa.set(i, one);
assertSame(one, aa.getAndSet(i, zero));
assertSame(zero, aa.getAndSet(i, m10));
assertSame(m10, aa.getAndSet(i, one));
}
}
MulticastPublisher(Publisher<T> original, int expectedSubscribers, int maxQueueSize, Executor executor) {
super(executor);
if (expectedSubscribers < 2) {
throw new IllegalArgumentException("expectedSubscribers: " + expectedSubscribers + " (expected >=2)");
}
if (maxQueueSize < 1) {
throw new IllegalArgumentException("maxQueueSize: " + maxQueueSize + " (expected >=1)");
}
this.original = original;
notCancelledCount = expectedSubscribers;
this.maxQueueSize = maxQueueSize;
subscribers = new AtomicReferenceArray<>(expectedSubscribers);
}
/**
* This method returns a copy of the injector cache - a map of all already created non-transient instances at the current scope.
*/
public Map<Key<?>, Object> peekInstances() {
Map<Key<?>, Object> result = new HashMap<>();
AtomicReferenceArray scopeCache = scopeCaches[scopeCaches.length - 1];
for (Entry<Key<?>, Integer> entry : localSlotMapping.entrySet()) {
Object value = scopeCache.get(entry.getValue());
if (value != null) {
result.put(entry.getKey(), value);
}
}
return result;
}
/**
* Removes an entry whose value has been garbage collected.
*/
@CanIgnoreReturnValue
boolean reclaimValue(K key, int hash, ValueReference<K, V> valueReference) {
lock();
try {
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
ValueReference<K, V> v = e.getValueReference();
if (v == valueReference) {
++modCount;
ReferenceEntry<K, V> newFirst = removeFromChain(first, e);
newCount = this.count - 1;
table.set(index, newFirst);
this.count = newCount; // write-volatile
return true;
}
return false;
}
}
return false;
} finally {
unlock();
}
}