下面列出了java.util.concurrent.atomic.AtomicReferenceArray#lazySet() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public E poll()
{
final long consumerPosition = lvConsumerPosition();
final int offset = calcElementOffset(consumerPosition, this.mask);
final AtomicReferenceArray<E> buffer = this.buffer;
E e;
if ((e = buffer.get(offset)) != null)
{
//can be used a memory_order_relaxed set here, because the consumer position write release the buffer value
buffer.lazySet(offset, null);
//consumer position allows the producers to move the claim limit (aka reduce backpressure)
//hence can be set only after the buffer slot release
soConsumerPosition(consumerPosition + 1);
return e;
}
else
{
return pollMaybeEmpty(buffer, offset, consumerPosition);
}
}
@Override
public E relaxedPoll()
{
final long consumerPosition = lvConsumerPosition();
final int offset = calcElementOffset(consumerPosition, this.mask);
final AtomicReferenceArray<E> buffer = this.buffer;
E e;
if ((e = buffer.get(offset)) != null)
{
//can be used a memory_order_relaxed set here, because the consumer position write release the buffer value
buffer.lazySet(offset, null);
//consumer position allows the producers to move the claim limit (aka reduce backpressure)
//hence can be set only after the buffer slot release
soConsumerPosition(consumerPosition + 1);
}
return e;
}
@Override
public int drain(Consumer<E> c, int limit)
{
final long mask = this.mask;
final AtomicReferenceArray<E> buffer = this.buffer;
for (int i = 0; i < limit; i++)
{
final long consumerPosition = lvConsumerPosition();
final int offset = calcElementOffset(consumerPosition, mask);
E e;
if ((e = buffer.get(offset)) != null)
{
//can be used a memory_order_relaxed set here, because the consumer position write release the buffer value
buffer.lazySet(offset, null);
//consumer position allows the producers to move the claim limit (aka reduce backpressure)
//hence can be set only after the buffer slot release
soConsumerPosition(consumerPosition + 1);
c.accept(e);
}
else
{
return i;
}
}
return limit;
}
/**
* get returns the last value lazySet at index by same thread
*/
public void testGetLazySet() {
AtomicReferenceArray aa = new AtomicReferenceArray(SIZE);
for (int i = 0; i < SIZE; i++) {
aa.lazySet(i, one);
assertSame(one, aa.get(i));
aa.lazySet(i, two);
assertSame(two, aa.get(i));
aa.lazySet(i, m3);
assertSame(m3, aa.get(i));
}
}
private synchronized void grow(int oldDataLength) {
// Note that concurrent polls may happen while we are in grow.
// So we need to use the atomic operations to copy the data.
if (this.data.length() != oldDataLength) return;
AtomicReferenceArray<E> newData = new AtomicReferenceArray<E>(oldDataLength*2);
int idx = 0;
for (int i=0; i < oldDataLength; i++) {
E e = this.data.getAndSet(i, null);
if (e != null) {
newData.lazySet(idx++, e);
}
}
this.data = newData;
this.size.set(idx);
}
@Override
public final void clear(long start, long end)
{
final AtomicReferenceArray<T> ring = _ring;
for (; start < end; start++) {
ring.lazySet(getIndex(start), null);
}
}
private synchronized void grow(int oldDataLength) {
// Note that concurrent polls may happen while we are in grow.
// So we need to use the atomic operations to copy the data.
if (this.data.length() != oldDataLength) return;
AtomicReferenceArray<E> newData = new AtomicReferenceArray<E>(oldDataLength*2);
int idx = 0;
for (int i=0; i < oldDataLength; i++) {
E e = this.data.getAndSet(i, null);
if (e != null) {
newData.lazySet(idx++, e);
}
}
this.data = newData;
this.size.set(idx);
}
@Override
public boolean offer(T e) {
Objects.requireNonNull(e);
long pi = producerIndex;
AtomicReferenceArray<Object> a = producerArray;
int m = mask;
int offset = (int) (pi + 1) & m;
if (a.get(offset) != null) {
offset = (int) pi & m;
AtomicReferenceArray<Object> b = new AtomicReferenceArray<>(m + 2);
producerArray = b;
b.lazySet(offset, e);
a.lazySet(m + 1, b);
a.lazySet(offset, NEXT);
PRODUCER_INDEX.lazySet(this, pi + 1);
}
else {
offset = (int) pi & m;
a.lazySet(offset, e);
PRODUCER_INDEX.lazySet(this, pi + 1);
}
return true;
}
/**
* Offer two elements at the same time.
* <p>Don't use the regular offer() with this at all!
*
* @param first the first value, not null
* @param second the second value, not null
*
* @return true if the queue accepted the two new values
*/
@Override
public boolean test(T first, T second) {
final AtomicReferenceArray<Object> buffer = producerArray;
final long p = producerIndex;
final int m = mask;
int pi = (int) (p + 2) & m;
if (null != buffer.get(pi)) {
final AtomicReferenceArray<Object> newBuffer =
new AtomicReferenceArray<>(m + 2);
producerArray = newBuffer;
pi = (int) p & m;
newBuffer.lazySet(pi + 1, second);// StoreStore
newBuffer.lazySet(pi, first);
buffer.lazySet(buffer.length() - 1, newBuffer);
buffer.lazySet(pi, NEXT); // new buffer is visible after element is
PRODUCER_INDEX.lazySet(this, p + 2);// this ensures correctness on 32bit
// platforms
}
else {
pi = (int) p & m;
buffer.lazySet(pi + 1, second);
buffer.lazySet(pi, first);
PRODUCER_INDEX.lazySet(this, p + 2);
}
return true;
}
@SuppressWarnings("unchecked")
@Override
@Nullable
public T poll() {
long ci = consumerIndex;
AtomicReferenceArray<Object> a = consumerArray;
int m = mask;
int offset = (int) ci & m;
Object o = a.get(offset);
if (o == null) {
return null;
}
if (o == NEXT) {
AtomicReferenceArray<Object> b = (AtomicReferenceArray<Object>) a.get(m + 1);
a.lazySet(m + 1, null);
o = b.get(offset);
a = b;
consumerArray = b;
}
a.lazySet(offset, null);
CONSUMER_INDEX.lazySet(this, ci + 1);
return (T) o;
}
@SuppressWarnings("unchecked")
@Override
public final R getInstance(AtomicReferenceArray[] scopedInstances, int synchronizedScope) {
AtomicReferenceArray array = scopedInstances[scope];
R instance = (R) array.get(index);
if (instance != null) return instance;
instance = doCreateInstance(scopedInstances, synchronizedScope);
array.lazySet(index, instance);
return instance;
}
/**
* get returns the last value lazySet at index by same thread
*/
public void testGetLazySet() {
AtomicReferenceArray aa = new AtomicReferenceArray(SIZE);
for (int i = 0; i < SIZE; i++) {
aa.lazySet(i, one);
assertSame(one, aa.get(i));
aa.lazySet(i, two);
assertSame(two, aa.get(i));
aa.lazySet(i, m3);
assertSame(m3, aa.get(i));
}
}
private E pollMaybeEmpty(AtomicReferenceArray<E> buffer, final int offset, final long consumerPosition)
{
final int activeCycleIndex = activeCycleIndex(lvActiveCycleId());
final long producerCycleClaim = AtomicLongArrayAccess.lvValue(this.producerCycleClaim, activeCycleIndex);
final long producerPosition = producerPosition(
cycleId(producerCycleClaim, this.cycleIdBitShift),
positionOnCycle(producerCycleClaim, this.positionOnCycleMask),
this.cycleLengthLog2);
if (producerPosition == consumerPosition)
{
return null;
}
else
{
E e;
while ((e = buffer.get(offset)) == null)
{
}
//can be used a memory_order_relaxed set here, because the consumer position write release the buffer value
buffer.lazySet(offset, null);
//consumer position allows the producers to move the claim limit (aka reduce backpressure)
//hence can be set only after the buffer slot release
soConsumerPosition(consumerPosition + 1);
return e;
}
}
@Override
public void drain(Consumer<E> c, WaitStrategy w, ExitCondition exit)
{
final AtomicReferenceArray<E> buffer = this.buffer;
final long mask = this.mask;
long consumerPosition = lvConsumerPosition();
int counter = 0;
while (exit.keepRunning())
{
for (int i = 0; i < 4096; i++)
{
final int offset = calcElementOffset(consumerPosition, mask);
final E e = buffer.get(offset);// LoadLoad
if (null == e)
{
counter = w.idle(counter);
continue;
}
consumerPosition++;
counter = 0;
//a plain store would be enough
buffer.lazySet(offset, null);
soConsumerPosition(consumerPosition); // ordered store -> atomic and ordered for size()
c.accept(e);
}
}
}
private static void soElement(AtomicReferenceArray<Object> buffer, int offset, Object e) {
buffer.lazySet(offset, e);
}
static <E> void spRefElement(AtomicReferenceArray<E> buffer, int offset, E value)
{
buffer.lazySet(offset, value); // no weaker form available
}
static void soRefElement(AtomicReferenceArray buffer, int offset, Object value)
{
buffer.lazySet(offset, value);
}