下面列出了java.util.concurrent.atomic.AtomicLongFieldUpdater#compareAndSet() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* Acquire a lock that allows reentry and attempts to acquire the lock while it is
* held can be detected by {@link #releaseReentrantLock(AtomicLongFieldUpdater, long, Object)}.
* <p>
* This lock <strong>must</strong> eventually be released by the same thread that acquired the lock. If the thread
* that acquires this lock is terminated before releasing the lock state is undefined.
* @param lockUpdater The {@link AtomicLongFieldUpdater} used to control the lock state.
* @param owner The owner of the lock object.
* @param <T> The type of object that owns the lock.
* @return {@code 0} if the acquire was unsuccessful, otherwise an identifier that must be passed to a subsequent
* call of {@link #releaseReentrantLock(AtomicLongFieldUpdater, long, Object)}.
*/
public static <T> long tryAcquireReentrantLock(final AtomicLongFieldUpdater<T> lockUpdater, final T owner) {
final long threadId = Thread.currentThread().getId();
for (;;) {
final long prevThreadId = lockUpdater.get(owner);
if (prevThreadId == REENTRANT_LOCK_ZERO_THREAD_ID) {
if (lockUpdater.compareAndSet(owner, REENTRANT_LOCK_ZERO_THREAD_ID, threadId)) {
return threadId;
}
} else if (prevThreadId == threadId || prevThreadId == -threadId) {
return -threadId;
} else if (lockUpdater.compareAndSet(owner, prevThreadId,
prevThreadId > REENTRANT_LOCK_ZERO_THREAD_ID ? -prevThreadId : prevThreadId)) {
return REENTRANT_LOCK_ZERO_THREAD_ID;
}
}
}
@SuppressWarnings("unchecked")
public static long request(long n, Object instance, AtomicLongFieldUpdater updater) {
for (;;) {
long currentDemand = updater.get(instance);
if (currentDemand == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long adjustedDemand = addCap(currentDemand, n);
if (updater.compareAndSet(instance, currentDemand, adjustedDemand)) {
return currentDemand;
}
}
}
@SuppressWarnings("unchecked")
public static long request(long n, Object instance, AtomicLongFieldUpdater updater) {
for (;;) {
long currentDemand = updater.get(instance);
if (currentDemand == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long adjustedDemand = addCap(currentDemand, n);
if (updater.compareAndSet(instance, currentDemand, adjustedDemand)) {
return currentDemand;
}
}
}
/**
* Concurrent addition bound to Long.MAX_VALUE.
* Any concurrent write will "happen before" this operation.
*
* @param <T> the parent instance type
* @param updater current field updater
* @param instance current instance to update
* @param toAdd delta to add
* @return value before addition or Long.MAX_VALUE
*/
private static <T> long addCap(AtomicLongFieldUpdater<T> updater, T instance, long toAdd) {
long r, u;
for (;;) {
r = updater.get(instance);
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
u = r + toAdd;
if (u < 0L) {
u = Long.MAX_VALUE;
}
if (updater.compareAndSet(instance, r, u)) {
return r;
}
}
}
/**
* compareAndSet in one thread enables another waiting for value
* to succeed
*/
public void testCompareAndSetInMultipleThreads() throws Exception {
x = 1;
final AtomicLongFieldUpdater<AtomicLongFieldUpdaterTest> a;
a = updaterFor("x");
Thread t = new Thread(new CheckedRunnable() {
public void realRun() {
while (!a.compareAndSet(AtomicLongFieldUpdaterTest.this, 2, 3))
Thread.yield();
}});
t.start();
assertTrue(a.compareAndSet(this, 1, 2));
t.join(LONG_DELAY_MS);
assertFalse(t.isAlive());
assertEquals(3, a.get(this));
}
static <T> long producedCancellable(AtomicLongFieldUpdater<T> updater, T instance, long n) {
for (; ; ) {
long current = updater.get(instance);
if (current == Long.MIN_VALUE) {
return Long.MIN_VALUE;
}
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long update = current - n;
if (update < 0L) {
reportBadRequest(update);
update = 0L;
}
if (updater.compareAndSet(instance, current, update)) {
return update;
}
}
}
/**
* compareAndSet in one thread enables another waiting for value
* to succeed
*/
public void testCompareAndSetInMultipleThreads() throws Exception {
x = 1;
final AtomicLongFieldUpdater<AtomicLongFieldUpdaterTest> a;
a = updaterFor("x");
Thread t = new Thread(new CheckedRunnable() {
public void realRun() {
while (!a.compareAndSet(AtomicLongFieldUpdaterTest.this, 2, 3))
Thread.yield();
}});
t.start();
assertTrue(a.compareAndSet(this, 1, 2));
t.join(LONG_DELAY_MS);
assertFalse(t.isAlive());
assertEquals(3, a.get(this));
}
/**
* Object arguments for parameters of type T that are not
* instances of the class passed to the newUpdater call will
* result in a ClassCastException being thrown.
*/
public void testFieldUpdaters_ClassCastException() {
// Use raw types to allow passing wrong object type, provoking CCE
final AtomicLongFieldUpdater longUpdater = aLongFieldUpdater();
final AtomicIntegerFieldUpdater intUpdater = anIntFieldUpdater();
final AtomicReferenceFieldUpdater refUpdater = anIntegerFieldUpdater();
final Object obj = new Object();
for (Object x : new Object[]{ new Object(), null }) {
Runnable[] throwingActions = {
() -> longUpdater.get(x),
() -> intUpdater.get(x),
() -> refUpdater.get(x),
() -> longUpdater.set(x, 17L),
() -> intUpdater.set(x, 17),
() -> refUpdater.set(x, (Integer) 17),
() -> longUpdater.addAndGet(x, 17L),
() -> intUpdater.addAndGet(x, 17),
() -> longUpdater.getAndUpdate(x, y -> y),
() -> intUpdater.getAndUpdate(x, y -> y),
() -> refUpdater.getAndUpdate(x, y -> y),
() -> longUpdater.compareAndSet(x, 17L, 42L),
() -> intUpdater.compareAndSet(x, 17, 42),
() -> refUpdater.compareAndSet(x, (Integer) 17, (Integer) 42),
};
assertThrows(ClassCastException.class, throwingActions);
}
}
/**
* Perform a potential post-completion request accounting.
*
* @param <T> the output value type
* @param <F> the field type holding the requested amount
* @param n the requested amount
* @param actual the consumer of values
* @param queue the queue holding the available values
* @param field the field updater for the requested amount
* @param instance the parent instance for the requested field
* @param isCancelled callback to detect cancellation
* @return true if the state indicates a completion state.
*/
static <T, F> boolean postCompleteRequest(long n,
Subscriber<? super T> actual,
Queue<T> queue,
AtomicLongFieldUpdater<F> field,
F instance,
BooleanSupplier isCancelled) {
for (; ; ) {
long r = field.get(instance);
// extract the current request amount
long r0 = r & REQUESTED_MASK;
// preserve COMPLETED_MASK and calculate new requested amount
long u = (r & COMPLETED_MASK) | Operators.addCap(r0, n);
if (field.compareAndSet(instance, r, u)) {
// (complete, 0) -> (complete, n) transition then replay
if (r == COMPLETED_MASK) {
postCompleteDrain(n | COMPLETED_MASK, actual, queue, field, instance, isCancelled);
return true;
}
// (active, r) -> (active, r + n) transition then continue with requesting from upstream
return false;
}
}
}
/**
* Tries draining the queue if the source just completed.
*
* @param <T> the output value type
* @param <F> the field type holding the requested amount
* @param actual the consumer of values
* @param queue the queue holding available values
* @param field the field updater holding the requested amount
* @param instance the parent instance of the requested field
* @param isCancelled callback to detect cancellation
*/
public static <T, F> void postComplete(CoreSubscriber<? super T> actual,
Queue<T> queue,
AtomicLongFieldUpdater<F> field,
F instance,
BooleanSupplier isCancelled) {
if (queue.isEmpty()) {
actual.onComplete();
return;
}
if (postCompleteDrain(field.get(instance), actual, queue, field, instance, isCancelled)) {
return;
}
for (; ; ) {
long r = field.get(instance);
if ((r & COMPLETED_MASK) != 0L) {
return;
}
long u = r | COMPLETED_MASK;
// (active, r) -> (complete, r) transition
if (field.compareAndSet(instance, r, u)) {
// if the requested amount was non-zero, drain the queue
if (r != 0L) {
postCompleteDrain(u, actual, queue, field, instance, isCancelled);
}
return;
}
}
}
/**
* Perform a potential post-completion request accounting.
*
* @param <T> the output value type
* @param <F> the field type holding the requested amount
* @param n the request amount
* @param actual the consumer of values
* @param queue the queue of available values
* @param field the field updater for the requested amount
* @param instance the parent instance of the requested field
* @param isCancelled callback to detect cancellation
* @param error if not null, the error to signal after the queue has been drained
* @return true if the state indicates a completion state.
*/
public static <T, F> boolean postCompleteRequestDelayError(long n,
Subscriber<? super T> actual,
Queue<T> queue,
AtomicLongFieldUpdater<F> field,
F instance,
BooleanSupplier isCancelled, Throwable error) {
for (; ; ) {
long r = field.get(instance);
// extract the current request amount
long r0 = r & REQUESTED_MASK;
// preserve COMPLETED_MASK and calculate new requested amount
long u = (r & COMPLETED_MASK) | Operators.addCap(r0, n);
if (field.compareAndSet(instance, r, u)) {
// (complete, 0) -> (complete, n) transition then replay
if (r == COMPLETED_MASK) {
postCompleteDrainDelayError(n | COMPLETED_MASK, actual, queue, field, instance, isCancelled, error);
return true;
}
// (active, r) -> (active, r + n) transition then continue with requesting from upstream
return false;
}
}
}
/**
* Tries draining the queue if the source just completed.
*
* @param <T> the output value type
* @param <F> the field type holding the requested amount
* @param actual the consumer of values
* @param queue the queue of available values
* @param field the field updater for the requested amount
* @param instance the parent instance of the requested field
* @param isCancelled callback to detect cancellation
* @param error if not null, the error to signal after the queue has been drained
*/
public static <T, F> void postCompleteDelayError(CoreSubscriber<? super T> actual,
Queue<T> queue,
AtomicLongFieldUpdater<F> field,
F instance,
BooleanSupplier isCancelled,
@Nullable Throwable error) {
if (queue.isEmpty()) {
if (error == null) {
actual.onComplete();
} else {
actual.onError(error);
}
return;
}
if (postCompleteDrainDelayError(field.get(instance), actual, queue, field, instance, isCancelled, error)) {
return;
}
for (; ; ) {
long r = field.get(instance);
if ((r & COMPLETED_MASK) != 0L) {
return;
}
long u = r | COMPLETED_MASK;
// (active, r) -> (complete, r) transition
if (field.compareAndSet(instance, r, u)) {
// if the requested amount was non-zero, drain the queue
if (r != 0L) {
postCompleteDrainDelayError(u, actual, queue, field, instance, isCancelled, error);
}
return;
}
}
}
/**
* Concurrent addition bound to Long.MAX_VALUE.
* Any concurrent write will "happen before" this operation.
*
* @param <T> the parent instance type
* @param updater current field updater
* @param instance current instance to update
* @param toAdd delta to add
* @return value before addition or Long.MAX_VALUE
*/
public static <T> long addCap(AtomicLongFieldUpdater<T> updater, T instance, long toAdd) {
long r, u;
for (;;) {
r = updater.get(instance);
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
u = addCap(r, toAdd);
if (updater.compareAndSet(instance, r, u)) {
return r;
}
}
}
/**
* Concurrent subtraction bound to 0, mostly used to decrement a request tracker by
* the amount produced by the operator. Any concurrent write will "happen before"
* this operation.
*
* @param <T> the parent instance type
* @param updater current field updater
* @param instance current instance to update
* @param toSub delta to subtract
* @return value after subtraction or zero
*/
public static <T> long produced(AtomicLongFieldUpdater<T> updater, T instance, long toSub) {
long r, u;
do {
r = updater.get(instance);
if (r == 0 || r == Long.MAX_VALUE) {
return r;
}
u = subOrZero(r, toSub);
} while (!updater.compareAndSet(instance, r, u));
return u;
}
/**
* Add the amount {@code n} to the given field, capped to {@link Long#MAX_VALUE},
* unless the field is already at {@link Long#MAX_VALUE} OR {@link Long#MIN_VALUE}.
* Return the value before the update.
*
* @param updater the field to update
* @param instance the instance bearing the field
* @param n the value to add
* @param <T> the type of the field-bearing instance
*
* @return the old value of the field, before update.
*/
static <T> long addCapCancellable(AtomicLongFieldUpdater<T> updater, T instance,
long n) {
for (; ; ) {
long r = updater.get(instance);
if (r == Long.MIN_VALUE || r == Long.MAX_VALUE) {
return r;
}
long u = addCap(r, n);
if (updater.compareAndSet(instance, r, u)) {
return r;
}
}
}
/**
* Atomically adds the value to the atomic variable, capping the sum at Long.MAX_VALUE
* and returning the original value.
* @param <T> the type of the parent class of the field
* @param updater the field updater
* @param instance the instance of the field to update
* @param n the value to add, n > 0, not validated
* @return the original value before the add
*/
public static <T> long getAndAddCap(AtomicLongFieldUpdater<T> updater, T instance, long n) {
for (; ; ) {
long r = updater.get(instance);
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long u = addCap(r, n);
if (updater.compareAndSet(instance, r, u)) {
return r;
}
}
}
/**
* Adds {@code n} to {@code requested} field and returns the value prior to
* addition once the addition is successful (uses CAS semantics). If
* overflows then sets {@code requested} field to {@code Long.MAX_VALUE}.
*
* @param requested
* atomic field updater for a request count
* @param object
* contains the field updated by the updater
* @param n
* the number of requests to add to the requested count
* @return requested value just prior to successful addition
*/
public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
// add n to field but check for overflow
while (true) {
long current = requested.get(object);
long next = current + n;
// check for overflow
if (next < 0) {
next = Long.MAX_VALUE;
}
if (requested.compareAndSet(object, current, next)) {
return current;
}
}
}
/**
* Adds {@code n} to {@code requested} field and returns the value prior to
* addition once the addition is successful (uses CAS semantics). If
* overflows then sets {@code requested} field to {@code Long.MAX_VALUE}.
*
* @param requested
* atomic field updater for a request count
* @param object
* contains the field updated by the updater
* @param n
* the number of requests to add to the requested count
* @param <T>
* then type of the volatile being updated
* @return requested value just prior to successful addition
*/
public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
// add n to field but check for overflow
while (true) {
long current = requested.get(object);
long next = current + n;
// check for overflow
if (next < 0) {
next = Long.MAX_VALUE;
}
if (requested.compareAndSet(object, current, next)) {
return current;
}
}
}