java.util.concurrent.atomic.AtomicReference#compareAndSet()源码实例Demo

下面列出了java.util.concurrent.atomic.AtomicReference#compareAndSet() 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: gemfirexd-oss   文件: StatsAggregator.java
public void decInt(String attributeName,Integer oldVal) {
  if (oldVal == 0) {
    return;
  }
  AtomicReference<Number> ar = aggregateMap.get(attributeName);
  Number curVal;
  for (;;) {
    Number expectedVal = ar.get();
    if (expectedVal.intValue() != 0) {
      curVal = expectedVal.intValue() - oldVal;
    } else {
      return;
    }
    if (ar.compareAndSet(expectedVal, curVal)) {
      return;
    }
  }
}
 
源代码2 项目: openjdk-jdk8u-backup   文件: Phaser.java
/**
 * Removes and signals threads from queue for phase.
 */
private void releaseWaiters(int phase) {
    QNode q;   // first element of queue
    Thread t;  // its thread
    AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
    while ((q = head.get()) != null &&
           q.phase != (int)(root.state >>> PHASE_SHIFT)) {
        if (head.compareAndSet(q, q.next) &&
            (t = q.thread) != null) {
            q.thread = null;
            LockSupport.unpark(t);
        }
    }
}
 
源代码3 项目: jdk-1.7-annotated   文件: Phaser.java
/**
 * Variant of releaseWaiters that additionally tries to remove any
 * nodes no longer waiting for advance due to timeout or
 * interrupt. Currently, nodes are removed only if they are at
 * head of queue, which suffices to reduce memory footprint in
 * most usages.
 *
 * @return current phase on exit
 */
private int abortWait(int phase) {
    AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
    for (;;) {
        Thread t;
        QNode q = head.get();
        int p = (int)(root.state >>> PHASE_SHIFT);
        if (q == null || ((t = q.thread) != null && q.phase == p))
            return p;
        if (head.compareAndSet(q, q.next) && t != null) {
            q.thread = null;
            LockSupport.unpark(t);
        }
    }
}
 
源代码4 项目: baratine   文件: WorkerDeliverBase.java
private boolean wakeSelf()
{
  AtomicReference<State> stateRef = _stateRef;
  
  State oldState;
  State newState;
  
  do {
    oldState = stateRef.get();
    newState = oldState.toWake();
  } while (! stateRef.compareAndSet(oldState, newState));

  return (oldState.isIdle() && newState.isActive());
}
 
源代码5 项目: jdk8u_jdk   文件: Phaser.java
/**
 * Variant of releaseWaiters that additionally tries to remove any
 * nodes no longer waiting for advance due to timeout or
 * interrupt. Currently, nodes are removed only if they are at
 * head of queue, which suffices to reduce memory footprint in
 * most usages.
 *
 * @return current phase on exit
 */
private int abortWait(int phase) {
    AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
    for (;;) {
        Thread t;
        QNode q = head.get();
        int p = (int)(root.state >>> PHASE_SHIFT);
        if (q == null || ((t = q.thread) != null && q.phase == p))
            return p;
        if (head.compareAndSet(q, q.next) && t != null) {
            q.thread = null;
            LockSupport.unpark(t);
        }
    }
}
 
源代码6 项目: baratine   文件: ChannelHttp2.java
@Override
boolean toCloseRead(AtomicReference<StateChannel> stateRef)
{
  if (stateRef.compareAndSet(CLOSE_WRITE, CLOSE)) {
    return true;
  }
  else {
    return stateRef.get().toCloseRead(stateRef);
  }
}
 
源代码7 项目: multiway-pool   文件: EliminationStack.java
/**
 * Scans the arena searching for a waiting consumer to exchange with.
 *
 * @param e the element to try to exchange
 * @return if the element was successfully transfered
 */
boolean scanAndTransferToWaiter(E e, int start) {
  for (int i = 0; i < ARENA_LENGTH; i++) {
    int index = (start + i) & ARENA_MASK;
    AtomicReference<Object> slot = arena[index];
    // if some thread is waiting to receive an element then attempt to provide it
    if ((slot.get() == WAITER) && slot.compareAndSet(WAITER, e)) {
      return true;
    }
  }
  return false;
}
 
源代码8 项目: bazel   文件: LegacyDynamicSpawnStrategy.java
private static SandboxedSpawnStrategy.StopConcurrentSpawns lockOutputFiles(
    SandboxedSpawnStrategy token, @Nullable AtomicReference<SpawnStrategy> outputWriteBarrier) {
  if (outputWriteBarrier == null) {
    return null;
  } else {
    return () -> {
      if (outputWriteBarrier.get() != token && !outputWriteBarrier.compareAndSet(null, token)) {
        throw new DynamicInterruptedException(
            "Execution stopped because other strategy finished first");
      }
    };
  }
}
 
源代码9 项目: reactor-core   文件: Schedulers.java
/**
 * Get a {@link CachedScheduler} out of the {@code reference} or create one using the
 * {@link Supplier} if the reference is empty, effectively creating a single instance
 * to be reused as a default scheduler for the given {@code key} category.
 *
 * @param reference the cache reference that holds the scheduler
 * @param key the "name" for the Scheduler's category/type
 * @param supplier the {@link Scheduler} generator to use and wrap into a {@link CachedScheduler}.
 * Note that in case of a race, an extraneous Scheduler can be created, but it'll get
 * immediately {@link Scheduler#dispose() disposed}.
 * @return a {@link CachedScheduler} to be reused, either pre-existing or created
 */
static CachedScheduler cache(AtomicReference<CachedScheduler> reference, String key, Supplier<Scheduler> supplier) {
	CachedScheduler s = reference.get();
	if (s != null) {
		return s;
	}
	s = new CachedScheduler(key, supplier.get());
	if (reference.compareAndSet(null, s)) {
		return s;
	}
	//the reference was updated in the meantime with a cached scheduler
	//fallback to it and dispose the extraneous one
	s._dispose();
	return reference.get();
}
 
源代码10 项目: protonpack   文件: CollectorUtils.java
private static <T> void uniqueAccumulate(AtomicReference<T> a, T t) {
    if (t == null) {
        return;
    }
    if (!a.compareAndSet(null, t)) {
        throw new NonUniqueValueException(a.get(), t);
    }
}
 
源代码11 项目: baratine   文件: WorkerDeliverBase.java
private void runStarted(Outbox outbox, M tailMsg)
{
  ClassLoader classLoader = _classLoader;
  Thread thread = Thread.currentThread();
  boolean isDebug = false;
  String oldThreadName = null;
  
  Object oldContext = outbox.getAndSetContext(context());
  
  try {
    thread.setContextClassLoader(classLoader);
    isDebug = isDebug();
    
    if (isDebug) {
      oldThreadName = thread.getName();
      thread.setName(_deliver.getName());
    }
    
    AtomicReference<State> stateRef = _stateRef;
    
    while (true) {
      runImpl(outbox, tailMsg);
      
      tailMsg = null;
      // tailMsg = outbox.flushAfterTask();
      
      State state = stateRef.get();
      State stateIdle = state.toIdle();
      
      if (state.isClosed() || stateIdle.isIdle()) {
        return;
      }
        
      stateRef.compareAndSet(state, State.ACTIVE);
      //thread.setContextClassLoader(classLoader);
    }
  } catch (Throwable e) {
    log.log(Level.FINER, e.toString(), e);
    return;
  } finally {
    outbox.getAndSetContext(oldContext);
    // ContextOutbox.setCurrent(null);
    
    toIdle();
    
    if (isDebug) {
      thread.setName(oldThreadName);
    }
  }
}
 
源代码12 项目: TencentKona-8   文件: Phaser.java
/**
 * Possibly blocks and waits for phase to advance unless aborted.
 * Call only on root phaser.
 *
 * @param phase current phase
 * @param node if non-null, the wait node to track interrupt and timeout;
 * if null, denotes noninterruptible wait
 * @return current phase
 */
private int internalAwaitAdvance(int phase, QNode node) {
    // assert root == this;
    releaseWaiters(phase-1);          // ensure old queue clean
    boolean queued = false;           // true when node is enqueued
    int lastUnarrived = 0;            // to increase spins upon change
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
        if (node == null) {           // spinning in noninterruptible mode
            int unarrived = (int)s & UNARRIVED_MASK;
            if (unarrived != lastUnarrived &&
                (lastUnarrived = unarrived) < NCPU)
                spins += SPINS_PER_ARRIVAL;
            boolean interrupted = Thread.interrupted();
            if (interrupted || --spins < 0) { // need node to record intr
                node = new QNode(this, phase, false, false, 0L);
                node.wasInterrupted = interrupted;
            }
        }
        else if (node.isReleasable()) // done or aborted
            break;
        else if (!queued) {           // push onto queue
            AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
            QNode q = node.next = head.get();
            if ((q == null || q.phase == phase) &&
                (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
                queued = head.compareAndSet(q, node);
        }
        else {
            try {
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                node.wasInterrupted = true;
            }
        }
    }

    if (node != null) {
        if (node.thread != null)
            node.thread = null;       // avoid need for unpark()
        if (node.wasInterrupted && !node.interruptible)
            Thread.currentThread().interrupt();
        if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
            return abortWait(phase); // possibly clean up on abort
    }
    releaseWaiters(phase);
    return p;
}
 
源代码13 项目: qpid-broker-j   文件: LastValueQueueList.java
/**
 * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary.
 */
@Override
public ConflationQueueEntry add(final ServerMessage message, final MessageEnqueueRecord enqueueRecord)
{
    final ConflationQueueEntry addedEntry = (ConflationQueueEntry) super.add(message, enqueueRecord);

    final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
    if (keyValue != null)
    {
        if(LOGGER.isDebugEnabled())
        {
            LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue);
        }

        final AtomicReference<ConflationQueueEntry> referenceToEntry = new AtomicReference<ConflationQueueEntry>(addedEntry);
        AtomicReference<ConflationQueueEntry> entryReferenceFromMap;
        ConflationQueueEntry entryFromMap;

        // Iterate until we have got a valid atomic reference object and either the referent is newer than the current
        // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value
        // indicating that the reference object is no longer valid (it is being removed from the map).
        boolean keepTryingToUpdateEntryReference;
        do
        {
            do
            {
                entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry);

                // entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value)  
                entryFromMap = entryReferenceFromMap.get();
            }
            while(entryFromMap == _deleteInProgress);

            boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0;

            keepTryingToUpdateEntryReference = entryFromMapIsOlder
                    && !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry);
        }
        while(keepTryingToUpdateEntryReference);

        if (entryFromMap == _newerEntryAlreadyBeenAndGone)
        {
            discardEntry(addedEntry);
        }
        else if (entryFromMap.compareTo(addedEntry) > 0)
        {
            if(LOGGER.isDebugEnabled())
            {
                LOGGER.debug("New entry " + addedEntry.getEntryId() + " for message " + addedEntry.getMessage().getMessageNumber() + " being immediately discarded because a newer entry arrived. The newer entry is: " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
            }
            discardEntry(addedEntry);
        }
        else if (entryFromMap.compareTo(addedEntry) < 0)
        {
            if(LOGGER.isDebugEnabled())
            {
                LOGGER.debug("Entry " + addedEntry + " for message " + addedEntry.getMessage().getMessageNumber() + " replacing older entry " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
            }
            discardEntry(entryFromMap);
        }

        addedEntry.setLatestValueReference(entryReferenceFromMap);
    }

    return addedEntry;
}
 
源代码14 项目: smallrye-mutiny   文件: UniBlockingAwait.java
public static <T> T await(Uni<T> upstream, Duration duration) {
    nonNull(upstream, "upstream");
    validate(duration);

    CountDownLatch latch = new CountDownLatch(1);
    AtomicReference<T> reference = new AtomicReference<>();
    AtomicReference<Throwable> referenceToFailure = new AtomicReference<>();
    UniSubscriber<T> subscriber = new UniSubscriber<T>() {
        @Override
        public void onSubscribe(UniSubscription subscription) {
            // Do nothing.
        }

        @Override
        public void onItem(T item) {
            reference.set(item);
            latch.countDown();
        }

        @Override
        public void onFailure(Throwable failure) {
            referenceToFailure.compareAndSet(null, failure);
            latch.countDown();
        }
    };
    AbstractUni.subscribe(upstream, subscriber);
    try {
        if (duration != null) {
            if (!latch.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
                referenceToFailure.compareAndSet(null, new TimeoutException());
            }
        } else {
            latch.await();
        }
    } catch (InterruptedException e) {
        referenceToFailure.compareAndSet(null, e);
        Thread.currentThread().interrupt();
    }

    Throwable throwable = referenceToFailure.get();
    if (throwable != null) {
        if (throwable instanceof RuntimeException) {
            throw (RuntimeException) throwable;
        }
        throw new CompletionException(throwable);
    } else {
        return reference.get();
    }
}
 
源代码15 项目: jdk8u60   文件: Phaser.java
/**
 * Possibly blocks and waits for phase to advance unless aborted.
 * Call only on root phaser.
 *
 * @param phase current phase
 * @param node if non-null, the wait node to track interrupt and timeout;
 * if null, denotes noninterruptible wait
 * @return current phase
 */
private int internalAwaitAdvance(int phase, QNode node) {
    // assert root == this;
    releaseWaiters(phase-1);          // ensure old queue clean
    boolean queued = false;           // true when node is enqueued
    int lastUnarrived = 0;            // to increase spins upon change
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
        if (node == null) {           // spinning in noninterruptible mode
            int unarrived = (int)s & UNARRIVED_MASK;
            if (unarrived != lastUnarrived &&
                (lastUnarrived = unarrived) < NCPU)
                spins += SPINS_PER_ARRIVAL;
            boolean interrupted = Thread.interrupted();
            if (interrupted || --spins < 0) { // need node to record intr
                node = new QNode(this, phase, false, false, 0L);
                node.wasInterrupted = interrupted;
            }
        }
        else if (node.isReleasable()) // done or aborted
            break;
        else if (!queued) {           // push onto queue
            AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
            QNode q = node.next = head.get();
            if ((q == null || q.phase == phase) &&
                (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
                queued = head.compareAndSet(q, node);
        }
        else {
            try {
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                node.wasInterrupted = true;
            }
        }
    }

    if (node != null) {
        if (node.thread != null)
            node.thread = null;       // avoid need for unpark()
        if (node.wasInterrupted && !node.interruptible)
            Thread.currentThread().interrupt();
        if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
            return abortWait(phase); // possibly clean up on abort
    }
    releaseWaiters(phase);
    return p;
}
 
源代码16 项目: jdk1.8-source-analysis   文件: Phaser.java
/**
 * Possibly blocks and waits for phase to advance unless aborted.
 * Call only on root phaser.
 *
 * @param phase current phase
 * @param node if non-null, the wait node to track interrupt and timeout;
 * if null, denotes noninterruptible wait
 * @return current phase
 */
private int internalAwaitAdvance(int phase, QNode node) {
    // assert root == this;
    releaseWaiters(phase-1);          // ensure old queue clean
    boolean queued = false;           // true when node is enqueued
    int lastUnarrived = 0;            // to increase spins upon change
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
        if (node == null) {           // spinning in noninterruptible mode
            int unarrived = (int)s & UNARRIVED_MASK;
            if (unarrived != lastUnarrived &&
                (lastUnarrived = unarrived) < NCPU)
                spins += SPINS_PER_ARRIVAL;
            boolean interrupted = Thread.interrupted();
            if (interrupted || --spins < 0) { // need node to record intr
                node = new QNode(this, phase, false, false, 0L);
                node.wasInterrupted = interrupted;
            }
        }
        else if (node.isReleasable()) // done or aborted
            break;
        else if (!queued) {           // push onto queue
            AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
            QNode q = node.next = head.get();
            if ((q == null || q.phase == phase) &&
                (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
                queued = head.compareAndSet(q, node);
        }
        else {
            try {
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                node.wasInterrupted = true;
            }
        }
    }

    if (node != null) {
        if (node.thread != null)
            node.thread = null;       // avoid need for unpark()
        if (node.wasInterrupted && !node.interruptible)
            Thread.currentThread().interrupt();
        if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
            return abortWait(phase); // possibly clean up on abort
    }
    releaseWaiters(phase);
    return p;
}
 
源代码17 项目: dubbox   文件: DubboMonitor.java
public void send() {
    if (logger.isInfoEnabled()) {
        logger.info("Send statistics to monitor " + getUrl());
    }
    String timestamp = String.valueOf(System.currentTimeMillis());
    for (Map.Entry<Statistics, AtomicReference<long[]>> entry : statisticsMap.entrySet()) {
        // 获取已统计数据
        Statistics statistics = entry.getKey();
        AtomicReference<long[]> reference = entry.getValue();
        long[] numbers = reference.get();
        long success = numbers[0];
        long failure = numbers[1];
        long input = numbers[2];
        long output = numbers[3];
        long elapsed = numbers[4];
        long concurrent = numbers[5];
        long maxInput = numbers[6];
        long maxOutput = numbers[7];
        long maxElapsed = numbers[8];
        long maxConcurrent = numbers[9];
         
        // 发送汇总信息
        URL url = statistics.getUrl()
                .addParameters(MonitorService.TIMESTAMP, timestamp,
                        MonitorService.SUCCESS, String.valueOf(success),
                        MonitorService.FAILURE, String.valueOf(failure), 
                        MonitorService.INPUT, String.valueOf(input), 
                        MonitorService.OUTPUT, String.valueOf(output),
                        MonitorService.ELAPSED, String.valueOf(elapsed),
                        MonitorService.CONCURRENT, String.valueOf(concurrent),
                        MonitorService.MAX_INPUT, String.valueOf(maxInput),
                        MonitorService.MAX_OUTPUT, String.valueOf(maxOutput),
                        MonitorService.MAX_ELAPSED, String.valueOf(maxElapsed),
                        MonitorService.MAX_CONCURRENT, String.valueOf(maxConcurrent)
                        );
        monitorService.collect(url);
        
        // 减掉已统计数据
        long[] current;
        long[] update = new long[LENGTH];
        do {
            current = reference.get();
            if (current == null) {
                update[0] = 0;
                update[1] = 0;
                update[2] = 0;
                update[3] = 0;
                update[4] = 0;
                update[5] = 0;
            } else {
                update[0] = current[0] - success;
                update[1] = current[1] - failure;
                update[2] = current[2] - input;
                update[3] = current[3] - output;
                update[4] = current[4] - elapsed;
                update[5] = current[5] - concurrent;
            }
        } while (! reference.compareAndSet(current, update));
    }
}
 
源代码18 项目: dragonwell8_jdk   文件: Phaser.java
/**
 * Possibly blocks and waits for phase to advance unless aborted.
 * Call only on root phaser.
 *
 * @param phase current phase
 * @param node if non-null, the wait node to track interrupt and timeout;
 * if null, denotes noninterruptible wait
 * @return current phase
 */
private int internalAwaitAdvance(int phase, QNode node) {
    // assert root == this;
    releaseWaiters(phase-1);          // ensure old queue clean
    boolean queued = false;           // true when node is enqueued
    int lastUnarrived = 0;            // to increase spins upon change
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
        if (node == null) {           // spinning in noninterruptible mode
            int unarrived = (int)s & UNARRIVED_MASK;
            if (unarrived != lastUnarrived &&
                (lastUnarrived = unarrived) < NCPU)
                spins += SPINS_PER_ARRIVAL;
            boolean interrupted = Thread.interrupted();
            if (interrupted || --spins < 0) { // need node to record intr
                node = new QNode(this, phase, false, false, 0L);
                node.wasInterrupted = interrupted;
            }
        }
        else if (node.isReleasable()) // done or aborted
            break;
        else if (!queued) {           // push onto queue
            AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
            QNode q = node.next = head.get();
            if ((q == null || q.phase == phase) &&
                (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
                queued = head.compareAndSet(q, node);
        }
        else {
            try {
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                node.wasInterrupted = true;
            }
        }
    }

    if (node != null) {
        if (node.thread != null)
            node.thread = null;       // avoid need for unpark()
        if (node.wasInterrupted && !node.interruptible)
            Thread.currentThread().interrupt();
        if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
            return abortWait(phase); // possibly clean up on abort
    }
    releaseWaiters(phase);
    return p;
}
 
源代码19 项目: failsafe   文件: TimeoutExecutor.java
/**
 * Schedules a separate timeout call that fails with {@link TimeoutExceededException} if the policy's timeout is
 * exceeded.
 */
@Override
@SuppressWarnings("unchecked")
protected Supplier<ExecutionResult> supply(Supplier<ExecutionResult> supplier, Scheduler scheduler) {
  return () -> {
    // Coordinates a result between the timeout and execution threads
    AtomicReference<ExecutionResult> result = new AtomicReference<>();
    Future<Object> timeoutFuture;
    Thread executionThread = Thread.currentThread();

    try {
      // Schedule timeout check
      timeoutFuture = (Future) scheduler.schedule(() -> {
        if (result.getAndUpdate(v -> v != null ? v : ExecutionResult.failure(new TimeoutExceededException(policy)))
          == null) {
          if (policy.canCancel()) {
            // Cancel and interrupt
            execution.cancelled = true;
            if (policy.canInterrupt()) {
              // Guard against race with the execution completing
              synchronized (execution) {
                if (execution.canInterrupt) {
                  execution.record(result.get());
                  execution.interrupted = true;
                  executionThread.interrupt();
                }
              }
            }
          }
        }
        return null;
      }, policy.getTimeout().toNanos(), TimeUnit.NANOSECONDS);
    } catch (Throwable t) {
      // Hard scheduling failure
      return postExecute(ExecutionResult.failure(t));
    }

    // Propagate execution, cancel timeout future if not done, and handle result
    if (result.compareAndSet(null, supplier.get()))
      timeoutFuture.cancel(false);
    return postExecute(result.get());
  };
}
 
源代码20 项目: cyclops   文件: ReactiveSeq.java
default <R> R foldParallel(Function<? super Stream<T>,? extends R> fn){


        Queue<T> queue = QueueFactories.<T>unboundedNonBlockingQueue().build().withTimeout(1);


        AtomicReference<Continuation> ref = new AtomicReference<>(null);
        Continuation cont =
                new Continuation(()->{

                    if(ref.get()==null && ref.compareAndSet(null,Continuation.empty())){
                        try {
                            //use the first consuming thread to tell this Stream onto the Queue
                            this.spliterator().forEachRemaining(queue::offer);
                        }finally {
                            queue.close();
                        }

                    }


                        return Continuation.empty();
                    });
        ;

        queue.addContinuation(cont);
        return fn.apply(queue.jdkStream().parallel());

    }