下面列出了java.util.concurrent.atomic.AtomicReference#compareAndSet() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public void decInt(String attributeName,Integer oldVal) {
if (oldVal == 0) {
return;
}
AtomicReference<Number> ar = aggregateMap.get(attributeName);
Number curVal;
for (;;) {
Number expectedVal = ar.get();
if (expectedVal.intValue() != 0) {
curVal = expectedVal.intValue() - oldVal;
} else {
return;
}
if (ar.compareAndSet(expectedVal, curVal)) {
return;
}
}
}
/**
* Removes and signals threads from queue for phase.
*/
private void releaseWaiters(int phase) {
QNode q; // first element of queue
Thread t; // its thread
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
/**
* Variant of releaseWaiters that additionally tries to remove any
* nodes no longer waiting for advance due to timeout or
* interrupt. Currently, nodes are removed only if they are at
* head of queue, which suffices to reduce memory footprint in
* most usages.
*
* @return current phase on exit
*/
private int abortWait(int phase) {
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
for (;;) {
Thread t;
QNode q = head.get();
int p = (int)(root.state >>> PHASE_SHIFT);
if (q == null || ((t = q.thread) != null && q.phase == p))
return p;
if (head.compareAndSet(q, q.next) && t != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
private boolean wakeSelf()
{
AtomicReference<State> stateRef = _stateRef;
State oldState;
State newState;
do {
oldState = stateRef.get();
newState = oldState.toWake();
} while (! stateRef.compareAndSet(oldState, newState));
return (oldState.isIdle() && newState.isActive());
}
/**
* Variant of releaseWaiters that additionally tries to remove any
* nodes no longer waiting for advance due to timeout or
* interrupt. Currently, nodes are removed only if they are at
* head of queue, which suffices to reduce memory footprint in
* most usages.
*
* @return current phase on exit
*/
private int abortWait(int phase) {
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
for (;;) {
Thread t;
QNode q = head.get();
int p = (int)(root.state >>> PHASE_SHIFT);
if (q == null || ((t = q.thread) != null && q.phase == p))
return p;
if (head.compareAndSet(q, q.next) && t != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
@Override
boolean toCloseRead(AtomicReference<StateChannel> stateRef)
{
if (stateRef.compareAndSet(CLOSE_WRITE, CLOSE)) {
return true;
}
else {
return stateRef.get().toCloseRead(stateRef);
}
}
/**
* Scans the arena searching for a waiting consumer to exchange with.
*
* @param e the element to try to exchange
* @return if the element was successfully transfered
*/
boolean scanAndTransferToWaiter(E e, int start) {
for (int i = 0; i < ARENA_LENGTH; i++) {
int index = (start + i) & ARENA_MASK;
AtomicReference<Object> slot = arena[index];
// if some thread is waiting to receive an element then attempt to provide it
if ((slot.get() == WAITER) && slot.compareAndSet(WAITER, e)) {
return true;
}
}
return false;
}
private static SandboxedSpawnStrategy.StopConcurrentSpawns lockOutputFiles(
SandboxedSpawnStrategy token, @Nullable AtomicReference<SpawnStrategy> outputWriteBarrier) {
if (outputWriteBarrier == null) {
return null;
} else {
return () -> {
if (outputWriteBarrier.get() != token && !outputWriteBarrier.compareAndSet(null, token)) {
throw new DynamicInterruptedException(
"Execution stopped because other strategy finished first");
}
};
}
}
/**
* Get a {@link CachedScheduler} out of the {@code reference} or create one using the
* {@link Supplier} if the reference is empty, effectively creating a single instance
* to be reused as a default scheduler for the given {@code key} category.
*
* @param reference the cache reference that holds the scheduler
* @param key the "name" for the Scheduler's category/type
* @param supplier the {@link Scheduler} generator to use and wrap into a {@link CachedScheduler}.
* Note that in case of a race, an extraneous Scheduler can be created, but it'll get
* immediately {@link Scheduler#dispose() disposed}.
* @return a {@link CachedScheduler} to be reused, either pre-existing or created
*/
static CachedScheduler cache(AtomicReference<CachedScheduler> reference, String key, Supplier<Scheduler> supplier) {
CachedScheduler s = reference.get();
if (s != null) {
return s;
}
s = new CachedScheduler(key, supplier.get());
if (reference.compareAndSet(null, s)) {
return s;
}
//the reference was updated in the meantime with a cached scheduler
//fallback to it and dispose the extraneous one
s._dispose();
return reference.get();
}
private static <T> void uniqueAccumulate(AtomicReference<T> a, T t) {
if (t == null) {
return;
}
if (!a.compareAndSet(null, t)) {
throw new NonUniqueValueException(a.get(), t);
}
}
private void runStarted(Outbox outbox, M tailMsg)
{
ClassLoader classLoader = _classLoader;
Thread thread = Thread.currentThread();
boolean isDebug = false;
String oldThreadName = null;
Object oldContext = outbox.getAndSetContext(context());
try {
thread.setContextClassLoader(classLoader);
isDebug = isDebug();
if (isDebug) {
oldThreadName = thread.getName();
thread.setName(_deliver.getName());
}
AtomicReference<State> stateRef = _stateRef;
while (true) {
runImpl(outbox, tailMsg);
tailMsg = null;
// tailMsg = outbox.flushAfterTask();
State state = stateRef.get();
State stateIdle = state.toIdle();
if (state.isClosed() || stateIdle.isIdle()) {
return;
}
stateRef.compareAndSet(state, State.ACTIVE);
//thread.setContextClassLoader(classLoader);
}
} catch (Throwable e) {
log.log(Level.FINER, e.toString(), e);
return;
} finally {
outbox.getAndSetContext(oldContext);
// ContextOutbox.setCurrent(null);
toIdle();
if (isDebug) {
thread.setName(oldThreadName);
}
}
}
/**
* Possibly blocks and waits for phase to advance unless aborted.
* Call only on root phaser.
*
* @param phase current phase
* @param node if non-null, the wait node to track interrupt and timeout;
* if null, denotes noninterruptible wait
* @return current phase
*/
private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // spinning in noninterruptible mode
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
else if (node.isReleasable()) // done or aborted
break;
else if (!queued) { // push onto queue
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}
/**
* Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary.
*/
@Override
public ConflationQueueEntry add(final ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
final ConflationQueueEntry addedEntry = (ConflationQueueEntry) super.add(message, enqueueRecord);
final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
if (keyValue != null)
{
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue);
}
final AtomicReference<ConflationQueueEntry> referenceToEntry = new AtomicReference<ConflationQueueEntry>(addedEntry);
AtomicReference<ConflationQueueEntry> entryReferenceFromMap;
ConflationQueueEntry entryFromMap;
// Iterate until we have got a valid atomic reference object and either the referent is newer than the current
// entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value
// indicating that the reference object is no longer valid (it is being removed from the map).
boolean keepTryingToUpdateEntryReference;
do
{
do
{
entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry);
// entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value)
entryFromMap = entryReferenceFromMap.get();
}
while(entryFromMap == _deleteInProgress);
boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0;
keepTryingToUpdateEntryReference = entryFromMapIsOlder
&& !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry);
}
while(keepTryingToUpdateEntryReference);
if (entryFromMap == _newerEntryAlreadyBeenAndGone)
{
discardEntry(addedEntry);
}
else if (entryFromMap.compareTo(addedEntry) > 0)
{
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("New entry " + addedEntry.getEntryId() + " for message " + addedEntry.getMessage().getMessageNumber() + " being immediately discarded because a newer entry arrived. The newer entry is: " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
}
discardEntry(addedEntry);
}
else if (entryFromMap.compareTo(addedEntry) < 0)
{
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("Entry " + addedEntry + " for message " + addedEntry.getMessage().getMessageNumber() + " replacing older entry " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
}
discardEntry(entryFromMap);
}
addedEntry.setLatestValueReference(entryReferenceFromMap);
}
return addedEntry;
}
public static <T> T await(Uni<T> upstream, Duration duration) {
nonNull(upstream, "upstream");
validate(duration);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<T> reference = new AtomicReference<>();
AtomicReference<Throwable> referenceToFailure = new AtomicReference<>();
UniSubscriber<T> subscriber = new UniSubscriber<T>() {
@Override
public void onSubscribe(UniSubscription subscription) {
// Do nothing.
}
@Override
public void onItem(T item) {
reference.set(item);
latch.countDown();
}
@Override
public void onFailure(Throwable failure) {
referenceToFailure.compareAndSet(null, failure);
latch.countDown();
}
};
AbstractUni.subscribe(upstream, subscriber);
try {
if (duration != null) {
if (!latch.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
referenceToFailure.compareAndSet(null, new TimeoutException());
}
} else {
latch.await();
}
} catch (InterruptedException e) {
referenceToFailure.compareAndSet(null, e);
Thread.currentThread().interrupt();
}
Throwable throwable = referenceToFailure.get();
if (throwable != null) {
if (throwable instanceof RuntimeException) {
throw (RuntimeException) throwable;
}
throw new CompletionException(throwable);
} else {
return reference.get();
}
}
/**
* Possibly blocks and waits for phase to advance unless aborted.
* Call only on root phaser.
*
* @param phase current phase
* @param node if non-null, the wait node to track interrupt and timeout;
* if null, denotes noninterruptible wait
* @return current phase
*/
private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // spinning in noninterruptible mode
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
else if (node.isReleasable()) // done or aborted
break;
else if (!queued) { // push onto queue
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}
/**
* Possibly blocks and waits for phase to advance unless aborted.
* Call only on root phaser.
*
* @param phase current phase
* @param node if non-null, the wait node to track interrupt and timeout;
* if null, denotes noninterruptible wait
* @return current phase
*/
private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // spinning in noninterruptible mode
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
else if (node.isReleasable()) // done or aborted
break;
else if (!queued) { // push onto queue
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}
public void send() {
if (logger.isInfoEnabled()) {
logger.info("Send statistics to monitor " + getUrl());
}
String timestamp = String.valueOf(System.currentTimeMillis());
for (Map.Entry<Statistics, AtomicReference<long[]>> entry : statisticsMap.entrySet()) {
// 获取已统计数据
Statistics statistics = entry.getKey();
AtomicReference<long[]> reference = entry.getValue();
long[] numbers = reference.get();
long success = numbers[0];
long failure = numbers[1];
long input = numbers[2];
long output = numbers[3];
long elapsed = numbers[4];
long concurrent = numbers[5];
long maxInput = numbers[6];
long maxOutput = numbers[7];
long maxElapsed = numbers[8];
long maxConcurrent = numbers[9];
// 发送汇总信息
URL url = statistics.getUrl()
.addParameters(MonitorService.TIMESTAMP, timestamp,
MonitorService.SUCCESS, String.valueOf(success),
MonitorService.FAILURE, String.valueOf(failure),
MonitorService.INPUT, String.valueOf(input),
MonitorService.OUTPUT, String.valueOf(output),
MonitorService.ELAPSED, String.valueOf(elapsed),
MonitorService.CONCURRENT, String.valueOf(concurrent),
MonitorService.MAX_INPUT, String.valueOf(maxInput),
MonitorService.MAX_OUTPUT, String.valueOf(maxOutput),
MonitorService.MAX_ELAPSED, String.valueOf(maxElapsed),
MonitorService.MAX_CONCURRENT, String.valueOf(maxConcurrent)
);
monitorService.collect(url);
// 减掉已统计数据
long[] current;
long[] update = new long[LENGTH];
do {
current = reference.get();
if (current == null) {
update[0] = 0;
update[1] = 0;
update[2] = 0;
update[3] = 0;
update[4] = 0;
update[5] = 0;
} else {
update[0] = current[0] - success;
update[1] = current[1] - failure;
update[2] = current[2] - input;
update[3] = current[3] - output;
update[4] = current[4] - elapsed;
update[5] = current[5] - concurrent;
}
} while (! reference.compareAndSet(current, update));
}
}
/**
* Possibly blocks and waits for phase to advance unless aborted.
* Call only on root phaser.
*
* @param phase current phase
* @param node if non-null, the wait node to track interrupt and timeout;
* if null, denotes noninterruptible wait
* @return current phase
*/
private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // spinning in noninterruptible mode
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) { // need node to record intr
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
}
else if (node.isReleasable()) // done or aborted
break;
else if (!queued) { // push onto queue
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}
/**
* Schedules a separate timeout call that fails with {@link TimeoutExceededException} if the policy's timeout is
* exceeded.
*/
@Override
@SuppressWarnings("unchecked")
protected Supplier<ExecutionResult> supply(Supplier<ExecutionResult> supplier, Scheduler scheduler) {
return () -> {
// Coordinates a result between the timeout and execution threads
AtomicReference<ExecutionResult> result = new AtomicReference<>();
Future<Object> timeoutFuture;
Thread executionThread = Thread.currentThread();
try {
// Schedule timeout check
timeoutFuture = (Future) scheduler.schedule(() -> {
if (result.getAndUpdate(v -> v != null ? v : ExecutionResult.failure(new TimeoutExceededException(policy)))
== null) {
if (policy.canCancel()) {
// Cancel and interrupt
execution.cancelled = true;
if (policy.canInterrupt()) {
// Guard against race with the execution completing
synchronized (execution) {
if (execution.canInterrupt) {
execution.record(result.get());
execution.interrupted = true;
executionThread.interrupt();
}
}
}
}
}
return null;
}, policy.getTimeout().toNanos(), TimeUnit.NANOSECONDS);
} catch (Throwable t) {
// Hard scheduling failure
return postExecute(ExecutionResult.failure(t));
}
// Propagate execution, cancel timeout future if not done, and handle result
if (result.compareAndSet(null, supplier.get()))
timeoutFuture.cancel(false);
return postExecute(result.get());
};
}
default <R> R foldParallel(Function<? super Stream<T>,? extends R> fn){
Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue().build().withTimeout(1);
AtomicReference<Continuation> ref = new AtomicReference<>(null);
Continuation cont =
new Continuation(()->{
if(ref.get()==null && ref.compareAndSet(null,Continuation.empty())){
try {
//use the first consuming thread to tell this Stream onto the Queue
this.spliterator().forEachRemaining(queue::offer);
}finally {
queue.close();
}
}
return Continuation.empty();
});
;
queue.addContinuation(cont);
return fn.apply(queue.jdkStream().parallel());
}