下面列出了java.util.concurrent.atomic.AtomicLong#lazySet() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Records a read in the buffer and return its write count.
*
* @param bufferIndex the index to the chosen read buffer
* @param node the entry in the page replacement policy
* @return the number of writes on the chosen read buffer
*/
long recordRead(int bufferIndex, Node<K, V> node) {
// The location in the buffer is chosen in a racy fashion as the increment
// is not atomic with the insertion. This means that concurrent reads can
// overlap and overwrite one another, resulting in a lossy buffer.
final AtomicLong counter = readBufferWriteCount[bufferIndex];
final long writeCount = counter.get();
counter.lazySet(writeCount + 1);
final int index = (int) (writeCount & READ_BUFFER_INDEX_MASK);
readBuffers[bufferIndex][index].lazySet(node);
return writeCount;
}
/**
* get returns the last value lazySet in same thread
*/
public void testGetLazySet() {
AtomicLong ai = new AtomicLong(1);
assertEquals(1, ai.get());
ai.lazySet(2);
assertEquals(2, ai.get());
ai.lazySet(-3);
assertEquals(-3, ai.get());
}
@Override
public final boolean offer(final M value,
final long timeout,
final TimeUnit unit)
{
Objects.requireNonNull(value);
// completePoll();
final AtomicLong headRef = _headRef;
final AtomicLong tailRef = _tail;
final int capacity = _capacity;
while (true) {
// final AtomicReferenceArray<T> ring = _ring;
final long tail = tailRef.get();
final long head = headRef.get();
final long nextHead = head + 1;
if (nextHead - tail < capacity) {
_ring.setLazy(head, value);
headRef.lazySet(nextHead);
return true;
}
else {
long offerSequence = _blocker.nextOfferSequence();
if (capacity <= headRef.get() + 1 - tailRef.get()
&& ! _blocker.offerWait(offerSequence, timeout, unit)) {
return false;
}
}
}
}
/**
* Records a read in the buffer and return its write count.
*
* @param bufferIndex the index to the chosen read buffer
* @param node the entry in the page replacement policy
* @return the number of writes on the chosen read buffer
*/
long recordRead(int bufferIndex, Node<K, V> node) {
// The location in the buffer is chosen in a racy fashion as the increment
// is not atomic with the insertion. This means that concurrent reads can
// overlap and overwrite one another, resulting in a lossy buffer.
final AtomicLong counter = readBufferWriteCount[bufferIndex];
final long writeCount = counter.get();
counter.lazySet(writeCount + 1);
final int index = (int) (writeCount & READ_BUFFER_INDEX_MASK);
readBuffers[bufferIndex][index].lazySet(node);
return writeCount;
}
@Test
public void shouldNotCorruptGuardedState() throws InterruptedException {
final AtomicLong sharedState = new AtomicLong(0);
final int producers = 2;
final int writesPerProducer = 10;
final long idleMillis = 1000;
final long millisToAcquireLock = writesPerProducer * (producers - 1) * idleMillis;
final LeaseLock.Pauser pauser = LeaseLock.Pauser.sleep(idleMillis, TimeUnit.MILLISECONDS);
final CountDownLatch finished = new CountDownLatch(producers);
final LeaseLock[] locks = new LeaseLock[producers];
final AtomicInteger lockIndex = new AtomicInteger(0);
final Runnable producerTask = () -> {
final LeaseLock lock = locks[lockIndex.getAndIncrement()];
try {
for (int i = 0; i < writesPerProducer; i++) {
final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(millisToAcquireLock, pauser, () -> true);
if (acquireResult != LeaseLock.AcquireResult.Done) {
throw new IllegalStateException(acquireResult + " from " + Thread.currentThread());
}
//avoid the atomic getAndIncrement operation on purpose
sharedState.lazySet(sharedState.get() + 1);
lock.release();
}
} finally {
finished.countDown();
}
};
final Thread[] producerThreads = new Thread[producers];
for (int i = 0; i < producers; i++) {
locks[i] = lock();
producerThreads[i] = new Thread(producerTask);
}
Stream.of(producerThreads).forEach(Thread::start);
final long maxTestTime = millisToAcquireLock * writesPerProducer * producers;
Assert.assertTrue("Each producers must complete the writes", finished.await(maxTestTime, TimeUnit.MILLISECONDS));
Assert.assertEquals("locks hasn't mutual excluded producers", writesPerProducer * producers, sharedState.get());
}
/**
* Records a read in the buffer and return its write count.
*
* @param bufferIndex the index to the chosen read buffer
* @param node the entry in the page replacement policy
* @return the number of writes on the chosen read buffer
*/
long recordRead(int bufferIndex, Node<K, V> node) {
// The location in the buffer is chosen in a racy fashion as the increment
// is not atomic with the insertion. This means that concurrent reads can
// overlap and overwrite one another, resulting in a lossy buffer.
final AtomicLong counter = readBufferWriteCount[bufferIndex];
final long writeCount = counter.get();
counter.lazySet(writeCount + 1);
final int index = (int) (writeCount & READ_BUFFER_INDEX_MASK);
readBuffers[bufferIndex][index].lazySet(node);
return writeCount;
}
/**
* get returns the last value lazySet in same thread
*/
public void testGetLazySet() {
AtomicLong ai = new AtomicLong(1);
assertEquals(1, ai.get());
ai.lazySet(2);
assertEquals(2, ai.get());
ai.lazySet(-3);
assertEquals(-3, ai.get());
}
/**
* Records a read in the buffer and return its write count.
*
* @param bufferIndex the index to the chosen read buffer
* @param node the entry in the page replacement policy
* @return the number of writes on the chosen read buffer
*/
private long recordRead(int bufferIndex, Node<K, V> node) {
// The location in the buffer is chosen in a racy fashion as the increment
// is not atomic with the insertion. This means that concurrent reads can
// overlap and overwrite one another, resulting in a lossy buffer.
final AtomicLong counter = readBufferWriteCount[bufferIndex];
final long writeCount = counter.get();
counter.lazySet(writeCount + 1);
final int index = (int) (writeCount & READ_BUFFER_INDEX_MASK);
readBuffers[bufferIndex][index].lazySet(node);
return writeCount;
}
/**
* Records a read in the buffer and return its write count.
*
* @param bufferIndex the index to the chosen read buffer
* @param node the entry in the page replacement policy
* @return the number of writes on the chosen read buffer
*/
private long recordRead(int bufferIndex, Node<V> node) {
// The location in the buffer is chosen in a racy fashion as the increment
// is not atomic with the insertion. This means that concurrent reads can
// overlap and overwrite one another, resulting in a lossy buffer.
final AtomicLong counter = readBufferWriteCount[bufferIndex];
final long writeCount = counter.get();
counter.lazySet(writeCount + 1);
final int index = (int) (writeCount & ConcurrentLinkedHashMap.READ_BUFFER_INDEX_MASK);
readBuffers[bufferIndex][index].lazySet(node);
return writeCount;
}
public void sendTrain(final int index) {
final AtomicLong atomicLong = trainNoLong[index];
atomicLong.lazySet(atomicLong.get() + 1);
}