下面列出了怎么用java.util.concurrent.atomic.AtomicLongArray的API类实例代码及写法,或者点击链接到github查看源代码。
public SystemResourcesCounter(Time probeInterval) {
probeIntervalMs = probeInterval.toMilliseconds();
checkState(this.probeIntervalMs > 0);
setName(SystemResourcesCounter.class.getSimpleName() + " probing thread");
cpuUsagePerProcessor = new AtomicReferenceArray<>(hardwareAbstractionLayer.getProcessor().getLogicalProcessorCount());
NetworkIF[] networkIFs = hardwareAbstractionLayer.getNetworkIFs();
bytesReceivedPerInterface = new long[networkIFs.length];
bytesSentPerInterface = new long[networkIFs.length];
receiveRatePerInterface = new AtomicLongArray(networkIFs.length);
sendRatePerInterface = new AtomicLongArray(networkIFs.length);
networkInterfaceNames = new String[networkIFs.length];
for (int i = 0; i < networkInterfaceNames.length; i++) {
networkInterfaceNames[i] = networkIFs[i].getName();
}
}
/**
* All Atomic getAndUpdate methods throw NullPointerException on
* null function argument
*/
public void testGetAndUpdateNPE() {
Runnable[] throwingActions = {
() -> new AtomicLong().getAndUpdate(null),
() -> new AtomicInteger().getAndUpdate(null),
() -> new AtomicReference().getAndUpdate(null),
() -> new AtomicLongArray(1).getAndUpdate(0, null),
() -> new AtomicIntegerArray(1).getAndUpdate(0, null),
() -> new AtomicReferenceArray(1).getAndUpdate(0, null),
() -> aLongFieldUpdater().getAndUpdate(this, null),
() -> anIntFieldUpdater().getAndUpdate(this, null),
() -> anIntegerFieldUpdater().getAndUpdate(this, null),
////() -> aLongFieldUpdater().getAndUpdate(null, Atomic8Test::addLong17),
////() -> anIntFieldUpdater().getAndUpdate(null, Atomic8Test::addInt17),
////() -> anIntegerFieldUpdater().getAndUpdate(null, Atomic8Test::addInteger17),
};
assertThrows(NullPointerException.class, throwingActions);
}
/**
* compareAndSet in one thread enables another waiting for value
* to succeed
*/
public void testCompareAndSetInMultipleThreads() throws InterruptedException {
final AtomicLongArray a = new AtomicLongArray(1);
a.set(0, 1);
Thread t = new Thread(new CheckedRunnable() {
public void realRun() {
while (!a.compareAndSet(0, 2, 3))
Thread.yield();
}});
t.start();
assertTrue(a.compareAndSet(0, 1, 2));
t.join(LONG_DELAY_MS);
assertFalse(t.isAlive());
assertEquals(3, a.get(0));
}
@Override
public void request(long n) {
if (Operators.validate(n)) {
AtomicLongArray ra = parent.requests;
for (;;) {
long r = ra.get(index);
if (r == Long.MAX_VALUE) {
return;
}
long u = Operators.addCap(r, n);
if (ra.compareAndSet(index, r, u)) {
break;
}
}
if (parent.subscriberCount == length) {
parent.drain();
}
}
}
@Override
public E relaxedPoll() {
final AtomicLongArray sBuffer = sequenceBuffer;
final int mask = this.mask;
long cIndex;
int seqOffset;
long seq;
long expectedSeq;
do {
cIndex = lvConsumerIndex();
seqOffset = calcCircularLongElementOffset(cIndex, mask);
seq = lvLongElement(sBuffer, seqOffset);
expectedSeq = cIndex + 1;
if (seq < expectedSeq) {
return null;
}
} while (// another consumer beat us to it
seq > expectedSeq || // failed the CAS
!casConsumerIndex(cIndex, cIndex + 1));
final int offset = calcCircularRefElementOffset(cIndex, mask);
final E e = lpRefElement(buffer, offset);
spRefElement(buffer, offset, null);
soLongElement(sBuffer, seqOffset, cIndex + mask + 1);
return e;
}
/**
* get and set for out of bound indices throw IndexOutOfBoundsException
*/
public void testIndexing() {
AtomicLongArray aa = new AtomicLongArray(SIZE);
for (int index : new int[] { -1, SIZE }) {
final int j = index;
final Runnable[] tasks = {
() -> aa.getPlain(j),
() -> aa.getOpaque(j),
() -> aa.getAcquire(j),
() -> aa.setPlain(j, 1),
() -> aa.setOpaque(j, 1),
() -> aa.setRelease(j, 1),
() -> aa.compareAndExchange(j, 1, 2),
() -> aa.compareAndExchangeAcquire(j, 1, 2),
() -> aa.compareAndExchangeRelease(j, 1, 2),
() -> aa.weakCompareAndSetPlain(j, 1, 2),
() -> aa.weakCompareAndSetVolatile(j, 1, 2),
() -> aa.weakCompareAndSetAcquire(j, 1, 2),
() -> aa.weakCompareAndSetRelease(j, 1, 2),
};
assertThrows(IndexOutOfBoundsException.class, tasks);
}
}
/**
* Fill this state from the serialized data which exists in this ByteArrayOrdinalMap
*
* @param ordinalMap
*/
public void populateFromByteOrdinalMap(final ByteArrayOrdinalMap ordinalMap) {
ByteDataBuffer byteData = ordinalMap.getByteData();
AtomicLongArray pointersAndOrdinals = ordinalMap.getPointersAndOrdinals();
FastBlobDeserializationRecord rec = new FastBlobDeserializationRecord(getSchema(), byteData.getUnderlyingArray());
for (int i = 0; i < pointersAndOrdinals.length(); i++) {
long pointerAndOrdinal = pointersAndOrdinals.get(i);
if(!ByteArrayOrdinalMap.isPointerAndOrdinalEmpty(pointerAndOrdinal)) {
long pointer = ByteArrayOrdinalMap.getPointer(pointerAndOrdinal);
int ordinal = ByteArrayOrdinalMap.getOrdinal(pointerAndOrdinal);
int sizeOfData = VarInt.readVInt(byteData.getUnderlyingArray(), pointer);
pointer += VarInt.sizeOfVInt(sizeOfData);
rec.position(pointer);
add(ordinal, rec);
}
}
}
/**
* Get the segment at <code>segmentIndex</code>. If this segment does not yet exist, create it.
*
* @param segmentIndex the segment index
* @return the segment
*/
private AtomicLongArray getSegment(int segmentIndex) {
ThreadSafeBitSetSegments visibleSegments = segments.get();
while(visibleSegments.numSegments() <= segmentIndex) {
/// Thread safety: newVisibleSegments contains all of the segments from the currently visible segments, plus extra.
/// all of the segments in the currently visible segments are canonical and will not change.
ThreadSafeBitSetSegments newVisibleSegments = new ThreadSafeBitSetSegments(visibleSegments, segmentIndex + 1, numLongsPerSegment);
/// because we are using a compareAndSet, if this thread "wins the race" and successfully sets this variable, then the segments
/// which are newly defined in newVisibleSegments become canonical.
if(segments.compareAndSet(visibleSegments, newVisibleSegments)) {
visibleSegments = newVisibleSegments;
} else {
/// If we "lose the race" and are growing the ThreadSafeBitSet segments larger,
/// then we will gather the new canonical sets from the update which we missed on the next iteration of this loop.
/// Newly defined segments in newVisibleSegments will be discarded, they do not get to become canonical.
visibleSegments = segments.get();
}
}
return visibleSegments.getSegment(segmentIndex);
}
private ThreadSafeBitSetSegments(int numSegments, int segmentLength) {
AtomicLongArray segments[] = new AtomicLongArray[numSegments];
for(int i=0;i<numSegments;i++) {
segments[i] = new AtomicLongArray(segmentLength);
}
/// Thread safety: Because this.segments is final, the preceding operations in this constructor are guaranteed to be visible to any
/// other thread which accesses this.segments.
this.segments = segments;
}
/**
* decrementAndGet decrements and returns current value
*/
public void testDecrementAndGet() {
AtomicLongArray aa = new AtomicLongArray(SIZE);
for (int i = 0; i < SIZE; i++) {
aa.set(i, 1);
assertEquals(0, aa.decrementAndGet(i));
assertEquals(-1, aa.decrementAndGet(i));
assertEquals(-2, aa.decrementAndGet(i));
assertEquals(-2, aa.get(i));
}
}
private static long[] toArray(AtomicLongArray a)
{
final long[] r = new long[a.length()];
for (int i = 0 ; i < r.length ; i++)
r[i] = a.get(i);
return r;
}
long getWord(int bitIndex) {
if (bitIndex < 0) {
throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex);
}
int arrayIndex = arrayIndex(bitIndex);
AtomicLongArray array = arrays.get(arrayIndex);
if (array == null) {
return 0;
}
int wordIndexInArray = wordIndexInArray(bitIndex);
return array.get(wordIndexInArray);
}
/**
* Returns the index of the first bit that is set to {@code true}
* that occurs on or after the specified starting index. If no such
* bit exists then {@code -1} is returned.
* <p/>
* <p>To iterate over the {@code true} bits,
* use the following loop:
* <p/>
* <pre> {@code
* for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i+1)) {
* // operate on index i here
* }}</pre>
*
* @param fromIndex the index to start checking from (inclusive)
* @return the index of the next set bit, or {@code -1} if there
* is no such bit
* @throws IndexOutOfBoundsException if the specified index is negative
*/
public int nextSetBit(int fromIndex) {
if (fromIndex < 0) {
throw new IndexOutOfBoundsException("bitIndex < 0: " + fromIndex);
}
int arrayIndex;
AtomicLongArray array = null;
for (arrayIndex = arrayIndex(fromIndex); arrayIndex < arrays.length() && (array = arrays.get(arrayIndex)) == null; arrayIndex++);
if (array == null) {
return -1;
}
int wordIndexInArray = wordIndexInArray(fromIndex);
long word = array.get(wordIndexInArray) & (WORD_MASK << fromIndex);
while (true) {
if (word != 0) {
return ((1<<arrayIndex)-1 + wordIndexInArray) * BITS_PER_WORD + Long.numberOfTrailingZeros(word);
}
if (++wordIndexInArray == array.length()) {
wordIndexInArray = 0;
for (++arrayIndex; arrayIndex != arrays.length() && (array = arrays.get(arrayIndex)) == null; arrayIndex++);
if (array == null) {
return -1;
}
}
word = array.get(wordIndexInArray);
}
}
private ByteArrayOrdinalMap(long keys[], ByteDataBuffer byteData, FreeOrdinalTracker freeOrdinalTracker, int keyArraySize) {
this.freeOrdinalTracker = freeOrdinalTracker;
this.byteData = byteData;
AtomicLongArray pointersAndOrdinals = emptyKeyArray(keyArraySize);
populateNewHashArray(pointersAndOrdinals, keys);
this.pointersAndOrdinals = pointersAndOrdinals;
this.size = keys.length;
this.sizeBeforeGrow = keyArraySize * 7 / 10; /// 70% load factor
}
/**
* get returns the last value lazySet at index by same thread
*/
public void testGetLazySet() {
AtomicLongArray aa = new AtomicLongArray(SIZE);
for (int i = 0; i < SIZE; i++) {
aa.lazySet(i, 1);
assertEquals(1, aa.get(i));
aa.lazySet(i, 2);
assertEquals(2, aa.get(i));
aa.lazySet(i, -3);
assertEquals(-3, aa.get(i));
}
}
/**
* AtomicLongArray updateAndGet updates with supplied function and
* returns result.
*/
public void testLongArrayUpdateAndGet() {
AtomicLongArray a = new AtomicLongArray(1);
a.set(0, 1);
assertEquals(18L, a.updateAndGet(0, Atomic8Test::addLong17));
assertEquals(35L, a.updateAndGet(0, Atomic8Test::addLong17));
assertEquals(35L, a.get(0));
}
AggregateDao(DataSource dataSource, List<CappedDatabase> rollupCappedDatabases,
ConfigRepositoryImpl configRepository, TransactionTypeDao transactionTypeDao,
FullQueryTextDao fullQueryTextDao) throws Exception {
this.dataSource = dataSource;
this.rollupCappedDatabases = rollupCappedDatabases;
this.configRepository = configRepository;
this.transactionTypeDao = transactionTypeDao;
this.fullQueryTextDao = fullQueryTextDao;
List<RollupConfig> rollupConfigs = configRepository.getRollupConfigs();
for (int i = 0; i < rollupConfigs.size(); i++) {
String overallTableName = "aggregate_tt_rollup_" + castUntainted(i);
dataSource.syncTable(overallTableName, overallAggregatePointColumns);
dataSource.syncIndexes(overallTableName, ImmutableList.<Index>of(
ImmutableIndex.of(overallTableName + "_idx", overallAggregateIndexColumns)));
String transactionTableName = "aggregate_tn_rollup_" + castUntainted(i);
dataSource.syncTable(transactionTableName, transactionAggregateColumns);
dataSource.syncIndexes(transactionTableName, ImmutableList.<Index>of(ImmutableIndex
.of(transactionTableName + "_idx", transactionAggregateIndexColumns)));
}
// don't need last_rollup_times table like in GaugeValueDao since there is already index
// on capture_time so these queries are relatively fast
long[] lastRollupTimes = new long[rollupConfigs.size()];
lastRollupTimes[0] = 0;
for (int i = 1; i < lastRollupTimes.length; i++) {
lastRollupTimes[i] = dataSource.queryForLong("select ifnull(max(capture_time), 0)"
+ " from aggregate_tt_rollup_" + castUntainted(i));
}
this.lastRollupTimes = new AtomicLongArray(lastRollupTimes);
// TODO initial rollup in case store is not called in a reasonable time
}
void setupSubscribers() {
int m = subscribers.length;
for (int i = 0; i < m; i++) {
if (cancelled) {
return;
}
int j = i;
SUBSCRIBER_COUNT.lazySet(this, i + 1);
subscribers[i].onSubscribe(new Subscription() {
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
AtomicLongArray ra = requests;
for (;;) {
long r = ra.get(j);
if (r == Long.MAX_VALUE) {
return;
}
long u = BackpressureHelper.addCap(r, n);
if (ra.compareAndSet(j, r, u)) {
break;
}
}
if (subscriberCount == m) {
drain();
}
}
}
@Override
public void cancel() {
ParallelDispatcher.this.cancel();
}
});
}
}
/**
* AtomicLongArray getAndUpdate returns previous value and updates
* result of supplied function
*/
public void testLongArrayGetAndUpdate() {
AtomicLongArray a = new AtomicLongArray(1);
a.set(0, 1);
assertEquals(1L, a.getAndUpdate(0, Atomic8Test::addLong17));
assertEquals(18L, a.getAndUpdate(0, Atomic8Test::addLong17));
assertEquals(35L, a.get(0));
}
private ImmutablePercentileSampleWindow(
long minRtt,
int maxInFlight,
boolean didDrop,
AtomicLongArray observedRtts,
int sampleCount,
double percentile
) {
this.minRtt = minRtt;
this.maxInFlight = maxInFlight;
this.didDrop = didDrop;
this.observedRtts = observedRtts;
this.sampleCount = sampleCount;
this.percentile = percentile;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
// Read in array length and allocate array
int length = s.readInt();
this.longs = new AtomicLongArray(length);
// Read in all elements in the proper order.
for (int i = 0; i < length; i++) {
set(i, s.readDouble());
}
}
/**
* All Atomic updateAndGet methods throw NullPointerException on null function argument
*/
public void testUpdateAndGetNPE() {
Runnable[] throwingActions = {
() -> new AtomicLong().updateAndGet(null),
() -> new AtomicInteger().updateAndGet(null),
() -> new AtomicReference().updateAndGet(null),
() -> new AtomicLongArray(1).updateAndGet(0, null),
() -> new AtomicIntegerArray(1).updateAndGet(0, null),
() -> new AtomicReferenceArray(1).updateAndGet(0, null),
() -> aLongFieldUpdater().updateAndGet(this, null),
() -> anIntFieldUpdater().updateAndGet(this, null),
() -> anIntegerFieldUpdater().updateAndGet(this, null),
};
assertThrows(NullPointerException.class, throwingActions);
}
/**
* AtomicLongArray getAndUpdate returns previous value and updates
* result of supplied function
*/
public void testLongArrayGetAndUpdate() {
AtomicLongArray a = new AtomicLongArray(1);
a.set(0, 1);
assertEquals(1L, a.getAndUpdate(0, Atomic8Test::addLong17));
assertEquals(18L, a.getAndUpdate(0, Atomic8Test::addLong17));
assertEquals(35L, a.get(0));
}
/**
* AtomicLongArray updateAndGet updates with supplied function and
* returns result.
*/
public void testLongArrayUpdateAndGet() {
AtomicLongArray a = new AtomicLongArray(1);
a.set(0, 1);
assertEquals(18L, a.updateAndGet(0, Atomic8Test::addLong17));
assertEquals(35L, a.updateAndGet(0, Atomic8Test::addLong17));
assertEquals(35L, a.get(0));
}
/**
* AtomicLongArray getAndAccumulate returns previous value and updates
* with supplied function.
*/
public void testLongArrayGetAndAccumulate() {
AtomicLongArray a = new AtomicLongArray(1);
a.set(0, 1);
assertEquals(1L, a.getAndAccumulate(0, 2L, Long::sum));
assertEquals(3L, a.getAndAccumulate(0, 3L, Long::sum));
assertEquals(6L, a.get(0));
}
/**
* @return the number of bits which are set in this bit set.
*/
public int cardinality() {
ThreadSafeBitSetSegments segments = this.segments.get();
int numSetBits = 0;
for(int i=0;i<segments.numSegments();i++) {
AtomicLongArray segment = segments.getSegment(i);
for(int j=0;j<segment.length();j++) {
numSetBits += Long.bitCount(segment.get(j));
}
}
return numSetBits;
}
/**
* compareAndSet succeeds in changing value if equal to expected else fails
*/
public void testCompareAndSet() {
AtomicLongArray aa = new AtomicLongArray(SIZE);
for (int i = 0; i < SIZE; i++) {
aa.set(i, 1);
assertTrue(aa.compareAndSet(i, 1, 2));
assertTrue(aa.compareAndSet(i, 2, -4));
assertEquals(-4, aa.get(i));
assertFalse(aa.compareAndSet(i, -5, 7));
assertEquals(-4, aa.get(i));
assertTrue(aa.compareAndSet(i, -4, 7));
assertEquals(7, aa.get(i));
}
}
private static JsonArray arrayForLogStats(AtomicLongArray data) {
final JsonArray result = new JsonArray();
for (int i = 0; i < data.length(); i++) {
result.add(data.get(i));
}
return result;
}
/**
* All Atomic getAndAccumulate methods throw NullPointerException
* on null function argument
*/
public void testGetAndAccumulateNPE() {
Runnable[] throwingActions = {
() -> new AtomicLong().getAndAccumulate(1L, null),
() -> new AtomicInteger().getAndAccumulate(1, null),
() -> new AtomicReference().getAndAccumulate(one, null),
() -> new AtomicLongArray(1).getAndAccumulate(0, 1L, null),
() -> new AtomicIntegerArray(1).getAndAccumulate(0, 1, null),
() -> new AtomicReferenceArray(1).getAndAccumulate(0, one, null),
() -> aLongFieldUpdater().getAndAccumulate(this, 1L, null),
() -> anIntFieldUpdater().getAndAccumulate(this, 1, null),
() -> anIntegerFieldUpdater().getAndAccumulate(this, one, null),
};
assertThrows(NullPointerException.class, throwingActions);
}
/**
* getOpaque returns the last value set
*/
public void testGetOpaqueSet() {
AtomicLongArray aa = new AtomicLongArray(SIZE);
for (int i = 0; i < SIZE; i++) {
aa.set(i, 1);
assertEquals(1, aa.getOpaque(i));
aa.set(i, 2);
assertEquals(2, aa.getOpaque(i));
aa.set(i, -3);
assertEquals(-3, aa.getOpaque(i));
}
}