下面列出了java.util.concurrent.ForkJoinPool#managedBlock ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void run() {
int oldStatus;
int newStatus;
do {
oldStatus = this.status;
newStatus = (oldStatus | RUNNING) & ~CUED;
} while (!STATUS.compareAndSet(this, oldStatus, newStatus));
if ((oldStatus & CUED) != 0) {
this.theater.taskWillRun(this.task);
try {
if (this.task instanceof Task && ((Task) this.task).taskWillBlock()) {
ForkJoinPool.managedBlock(this);
} else {
this.task.runTask();
}
this.theater.taskDidRun(this.task);
} catch (Throwable error) {
this.theater.taskDidFail(this.task, error);
}
}
do {
oldStatus = this.status;
newStatus = oldStatus & ~RUNNING;
} while (!STATUS.compareAndSet(this, oldStatus, newStatus));
if ((newStatus & CUED) != 0) {
this.theater.taskWillCue(this.task);
if (this.task instanceof Task) {
((Task) this.task).taskWillCue();
}
this.theater.execute(this);
}
}
/**
* Returns raw result after waiting, or null if interrupted, or
* throws TimeoutException on timeout.
*/
private Object timedGet(long nanos) throws TimeoutException {
if (Thread.interrupted())
return null;
if (nanos <= 0L)
throw new TimeoutException();
long d = System.nanoTime() + nanos;
Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
boolean queued = false;
Object r;
// We intentionally don't spin here (as waitingGet does) because
// the call to nanoTime() above acts much like a spin.
while ((r = result) == null) {
if (!queued)
queued = tryPushStack(q);
else if (q.interruptControl < 0 || q.nanos <= 0L) {
q.thread = null;
cleanStack();
if (q.interruptControl < 0)
return null;
throw new TimeoutException();
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q.interruptControl < 0)
r = null;
q.thread = null;
postComplete();
return r;
}
/**
* Returns raw result after waiting, or null if interrupted, or
* throws TimeoutException on timeout.
*/
private Object timedGet(long nanos) throws TimeoutException {
if (Thread.interrupted())
return null;
if (nanos <= 0L)
throw new TimeoutException();
long d = System.nanoTime() + nanos;
Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
boolean queued = false;
Object r;
// We intentionally don't spin here (as waitingGet does) because
// the call to nanoTime() above acts much like a spin.
while ((r = result) == null) {
if (!queued)
queued = tryPushStack(q);
else if (q.interruptControl < 0 || q.nanos <= 0L) {
q.thread = null;
cleanStack();
if (q.interruptControl < 0)
return null;
throw new TimeoutException();
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q.interruptControl < 0)
r = null;
q.thread = null;
postComplete();
return r;
}
@Override
public void run() {
try {
ForkJoinPool.managedBlock(this);
} catch (InterruptedException ex) {
LOG.log(Level.WARNING, "interrupted wait", ex);
}
}
/**
* Returns raw result after waiting, or null if interrupted, or
* throws TimeoutException on timeout.
*/
private Object timedGet(long nanos) throws TimeoutException {
if (Thread.interrupted())
return null;
if (nanos <= 0L)
throw new TimeoutException();
long d = System.nanoTime() + nanos;
Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
boolean queued = false;
Object r;
// We intentionally don't spin here (as waitingGet does) because
// the call to nanoTime() above acts much like a spin.
while ((r = result) == null) {
if (!queued)
queued = tryPushStack(q);
else if (q.interruptControl < 0 || q.nanos <= 0L) {
q.thread = null;
cleanStack();
if (q.interruptControl < 0)
return null;
throw new TimeoutException();
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q.interruptControl < 0)
r = null;
q.thread = null;
postComplete();
return r;
}
/**
* Wait for the process to exit by calling {@code waitFor}.
* If the thread is interrupted, remember the interrupted state to
* be restored before returning. Use ForkJoinPool.ManagedBlocker
* so that the number of workers in case ForkJoinPool is used is
* compensated when the thread blocks in waitFor().
*
* @return the Process
*/
private Process waitForInternal() {
boolean interrupted = false;
while (true) {
try {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override
public boolean block() throws InterruptedException {
waitFor();
return true;
}
@Override
public boolean isReleasable() {
return !isAlive();
}
});
break;
} catch (InterruptedException x) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}
/**
* Returns raw result after waiting, or null if interrupted, or
* throws TimeoutException on timeout.
*/
private Object timedGet(long nanos) throws TimeoutException {
if (Thread.interrupted())
return null;
if (nanos <= 0L)
throw new TimeoutException();
long d = System.nanoTime() + nanos;
Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
boolean queued = false;
Object r;
// We intentionally don't spin here (as waitingGet does) because
// the call to nanoTime() above acts much like a spin.
while ((r = result) == null) {
if (!queued)
queued = tryPushStack(q);
else if (q.interruptControl < 0 || q.nanos <= 0L) {
q.thread = null;
cleanStack();
if (q.interruptControl < 0)
return null;
throw new TimeoutException();
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q.interruptControl < 0)
r = null;
q.thread = null;
postComplete();
return r;
}
/**
* Implements interruptible condition wait.
* <ol>
* <li>If current thread is interrupted, throw InterruptedException.
* <li>Save lock state returned by {@link #getState}.
* <li>Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li>Block until signalled or interrupted.
* <li>Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li>If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
LockSupport.setCurrentBlocker(this); // for back-compatibility
boolean interrupted = false, cancelled = false;
while (!canReacquire(node)) {
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait(); // awoke while enqueuing
}
LockSupport.setCurrentBlocker(null);
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
public void awaitOnLocalBarrier() {
boolean completed = false;
final IKernelBarrier barrier = localBarrier.get();
while (!completed && barrier != null) {
try {
ForkJoinPool.managedBlock(barrier); //ManagedBlocker already has to be reentrant
completed = true;
} catch (InterruptedException ex) {
//Empty on purpose, either barrier is disabled on InterruptedException or lock will have to complete
}
}
}
/**
* Returns raw result after waiting, or null if interrupted, or
* throws TimeoutException on timeout.
*/
private Object timedGet(long nanos) throws TimeoutException {
if (Thread.interrupted())
return null;
if (nanos <= 0L)
throw new TimeoutException();
long d = System.nanoTime() + nanos;
Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
boolean queued = false;
Object r;
// We intentionally don't spin here (as waitingGet does) because
// the call to nanoTime() above acts much like a spin.
while ((r = result) == null) {
if (!queued)
queued = tryPushStack(q);
else if (q.interruptControl < 0 || q.nanos <= 0L) {
q.thread = null;
cleanStack();
if (q.interruptControl < 0)
return null;
throw new TimeoutException();
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q.interruptControl < 0)
r = null;
q.thread = null;
postComplete();
return r;
}
/**
* Returns raw result after waiting, or null if interruptible and
* interrupted.
*/
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0)
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}
public static <T> T execute(RpcCall<? extends T> call) throws InterruptedException, IOException, OncRpcException, RuntimeException {
final BlockingWrapper<T> blocker = new BlockingWrapper<>(call);
ForkJoinPool.managedBlock(blocker);
return blocker.get();
}
/**
* Returns raw result after waiting, or null if interruptible and
* interrupted.
*/
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0)
spins = SPINS;
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}
/**
* Returns raw result after waiting, or null if interruptible and
* interrupted.
*/
private Object waitingGet(boolean interruptible) {
WaitNode q = null;
boolean queued = false;
int spins = SPINS;
for (Object r;;) {
if ((r = result) != null) {
if (q != null) { // suppress unpark
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible) {
removeWaiter(q);
return null;
}
Thread.currentThread().interrupt();
}
}
postComplete(); // help release others
return r;
}
else if (spins > 0) {
int rnd = ThreadLocalRandom.nextSecondarySeed();
if (rnd == 0)
rnd = ThreadLocalRandom.current().nextInt();
if (rnd >= 0)
--spins;
}
else if (q == null)
q = new WaitNode(interruptible, 0L, 0L);
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
else if (interruptible && q.interruptControl < 0) {
removeWaiter(q);
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ex) {
q.interruptControl = -1;
}
}
}
}
/**
* Returns raw result after waiting, or null if interruptible and
* interrupted.
*/
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0)
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}
/**
* Returns raw result after waiting, or null if interruptible and
* interrupted.
*/
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0)
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}
/**
* Returns raw result after waiting, or null if interruptible and
* interrupted.
*/
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0)
spins = SPINS;
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param nanos time to wait
* @return raw result
*/
private Object timedAwaitDone(long nanos)
throws InterruptedException, TimeoutException {
WaitNode q = null;
boolean queued = false;
for (Object r;;) {
if ((r = result) != null) {
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
removeWaiter(q);
throw new InterruptedException();
}
}
postComplete();
return r;
}
else if (q == null) {
if (nanos <= 0L)
throw new TimeoutException();
long d = System.nanoTime() + nanos;
q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
}
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
else if (q.interruptControl < 0) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q.nanos <= 0L) {
if (result == null) {
removeWaiter(q);
throw new TimeoutException();
}
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ex) {
q.interruptControl = -1;
}
}
}
}
/**
* Returns raw result after waiting, or null if interruptible and
* interrupted.
*/
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
if (spins < 0)
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {
q.thread = null;
cleanStack();
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
postComplete();
return r;
}
/**
* Convenience method that creates and executes the blocker on the current thread, returning the result.
*
* @param queue the queue whose next item should be retrieved
* @param <T> the type of item in the queue
* @return the dequeued item
* @throws InterruptedException if the thread is interrupted while blocking
*/
public static <T> T dequeue(SequentialQueue<T> queue) throws InterruptedException {
ManagedSequentialDequeueBlocker<T> blocker = new ManagedSequentialDequeueBlocker<>(queue);
ForkJoinPool.managedBlock(blocker);
return blocker.getItem();
}