下面列出了java.util.concurrent.AbstractExecutorService#java.util.concurrent.Phaser 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
static Iterator<Reader> readerIterator(final StampedLock sl,
final Phaser gate) {
return new Iterator<Reader>() {
int i = 0;
boolean view = false;
public boolean hasNext() { return true; }
public Reader next() {
switch ((i++)&7) {
case 1: case 4: case 7:
return reader(sl, gate, view ^= true);
case 2: case 5:
return interruptibleReader(sl, -1, SECONDS, gate, view ^= true);
default:
return interruptibleReader(sl, 30, SECONDS, gate, view ^= true); }}
public void remove() {throw new UnsupportedOperationException();}};
}
static Iterator<Writer> writerIterator(final StampedLock sl,
final Phaser gate) {
return new Iterator<Writer>() {
int i = 0;
boolean view = false;
public boolean hasNext() { return true; }
public Writer next() {
switch ((i++)&7) {
case 1: case 4: case 7:
return writer(sl, gate, view ^= true);
case 2: case 5:
return interruptibleWriter(sl, -1, SECONDS, gate, view ^= true);
default:
return interruptibleWriter(sl, 30, SECONDS, gate, view ^= true); }}
public void remove() {throw new UnsupportedOperationException();}};
}
/**
* arriveAndAwaitAdvance continues waiting if interrupted before waiting
*/
public void testArriveAndAwaitAdvanceAfterInterrupt() {
final Phaser phaser = new Phaser();
assertEquals(0, phaser.register());
final CountDownLatch pleaseArrive = new CountDownLatch(1);
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
Thread.currentThread().interrupt();
assertEquals(0, phaser.register());
pleaseArrive.countDown();
assertTrue(Thread.currentThread().isInterrupted());
assertEquals(1, phaser.arriveAndAwaitAdvance());
assertTrue(Thread.interrupted());
}});
await(pleaseArrive);
waitForThreadToEnterWaitState(t);
Thread.currentThread().interrupt();
assertEquals(1, phaser.arriveAndAwaitAdvance());
assertTrue(Thread.interrupted());
awaitTermination(t);
}
static Iterator<Reader> readerIterator(final StampedLock sl,
final Phaser gate) {
return new Iterator<Reader>() {
int i = 0;
boolean view = false;
public boolean hasNext() { return true; }
public Reader next() {
switch ((i++)&7) {
case 1: case 4: case 7:
return reader(sl, gate, view ^= true);
case 2: case 5:
return interruptibleReader(sl, -1, SECONDS, gate, view ^= true);
default:
return interruptibleReader(sl, 30, SECONDS, gate, view ^= true); }}
public void remove() {throw new UnsupportedOperationException();}};
}
static Reader interruptibleReader(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate) {
return new Reader("InterruptibleReader") { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
if (timeout < 0)
stamp(sl.readLockInterruptibly());
else
stamp(sl.tryReadLock(timeout, unit));
check(sl.validate(stamp()));
check(sl.isReadLocked());
check(!sl.isWriteLocked());
} catch (Throwable x) { thrown(x);
} finally { if (stamp() != 0L) sl.unlockRead(stamp()); } }};
}
static Iterator<Writer> writerIterator(final StampedLock sl,
final Phaser gate) {
return new Iterator<Writer>() {
int i = 0;
boolean view = false;
public boolean hasNext() { return true; }
public Writer next() {
switch ((i++)&7) {
case 1: case 4: case 7:
return writer(sl, gate, view ^= true);
case 2: case 5:
return interruptibleWriter(sl, -1, SECONDS, gate, view ^= true);
default:
return interruptibleWriter(sl, LONG_DELAY_MS, MILLISECONDS, gate, view ^= true); }}
public void remove() {throw new UnsupportedOperationException();}};
}
static Reader interruptibleReader(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate) {
return new Reader("InterruptibleReader") { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
if (timeout < 0)
stamp(sl.readLockInterruptibly());
else
stamp(sl.tryReadLock(timeout, unit));
check(sl.validate(stamp()));
check(sl.isReadLocked());
check(!sl.isWriteLocked());
} catch (Throwable x) { thrown(x);
} finally { if (stamp() != 0L) sl.unlockRead(stamp()); } }};
}
static Writer interruptibleWriter(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate) {
return new Writer("InterruptibleWriter") { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
if (timeout < 0)
stamp(sl.writeLockInterruptibly());
else
stamp(sl.tryWriteLock(timeout, unit));
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} catch (Throwable x) { thrown(x);
} finally { if (stamp() != 0L) sl.unlockWrite(stamp()); } }};
}
static Writer interruptibleWriterView(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate) {
return new Writer("InterruptibleWriterView") { public void run() {
if (gate != null ) toTheStartingGate(gate);
Lock wl = sl.asWriteLock();
try {
if (timeout < 0)
wl.lockInterruptibly();
else
wl.tryLock(timeout, unit);
stamp(1L); // got the lock
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} catch (Throwable x) { thrown(x);
} finally { if (stamp() != 0L) wl.unlock(); } }};
}
private static void realMain(String[] args) throws Throwable {
final Phaser phaser = new Phaser(3);
Runnable trivialRunnable = new Runnable() {
public void run() {
phaser.arriveAndAwaitAdvance();
}
};
int count0 = Thread.activeCount();
Executor e1 = newSingleThreadExecutor();
Executor e2 = newSingleThreadExecutor(defaultThreadFactory());
e1.execute(trivialRunnable);
e2.execute(trivialRunnable);
phaser.arriveAndAwaitAdvance();
equal(Thread.activeCount(), count0 + 2);
e1 = e2 = null;
for (int i = 0; i < 10 && Thread.activeCount() > count0; i++)
tryWaitForFinalizersToRun();
for (int i = 0; i < 10; ++i) { // give JVM a chance to settle.
if (Thread.activeCount() == count0)
return;
Thread.sleep(1000);
}
equal(Thread.activeCount(), count0);
}
private static void realMain(String[] args) throws Throwable {
final Phaser phaser = new Phaser(3);
Runnable trivialRunnable = new Runnable() {
public void run() {
phaser.arriveAndAwaitAdvance();
}
};
int count0 = Thread.activeCount();
Executor e1 = newSingleThreadExecutor();
Executor e2 = newSingleThreadExecutor(defaultThreadFactory());
e1.execute(trivialRunnable);
e2.execute(trivialRunnable);
phaser.arriveAndAwaitAdvance();
equal(Thread.activeCount(), count0 + 2);
e1 = e2 = null;
for (int i = 0; i < 10 && Thread.activeCount() > count0; i++)
tryWaitForFinalizersToRun();
for (int i = 0; i < 10; ++i) { // give JVM a chance to settle.
if (Thread.activeCount() == count0)
return;
Thread.sleep(1000);
}
equal(Thread.activeCount(), count0);
}
static Iterator<Writer> writerIterator(final StampedLock sl,
final Phaser gate) {
return new Iterator<Writer>() {
int i = 0;
boolean view = false;
public boolean hasNext() { return true; }
public Writer next() {
switch ((i++)&7) {
case 1: case 4: case 7:
return writer(sl, gate, view ^= true);
case 2: case 5:
return interruptibleWriter(sl, -1, SECONDS, gate, view ^= true);
default:
return interruptibleWriter(sl, 30, SECONDS, gate, view ^= true); }}
public void remove() {throw new UnsupportedOperationException();}};
}
static Writer interruptibleWriter(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate) {
return new Writer("InterruptibleWriter") { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
if (timeout < 0)
stamp(sl.writeLockInterruptibly());
else
stamp(sl.tryWriteLock(timeout, unit));
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} catch (Throwable x) { thrown(x);
} finally { if (stamp() != 0L) sl.unlockWrite(stamp()); } }};
}
private static void checkTerminated(final Phaser phaser) {
check(phaser.isTerminated());
int unarriverParties = phaser.getUnarrivedParties();
int registeredParties = phaser.getRegisteredParties();
int phase = phaser.getPhase();
check(phase < 0);
equal(phase, phaser.arrive());
equal(phase, phaser.arriveAndDeregister());
equal(phase, phaser.arriveAndAwaitAdvance());
equal(phase, phaser.bulkRegister(10));
equal(phase, phaser.register());
try {
equal(phase, phaser.awaitAdvanceInterruptibly(0));
equal(phase, phaser.awaitAdvanceInterruptibly(0, 10, SECONDS));
} catch (Exception ie) {
unexpected(ie);
}
equal(phaser.getUnarrivedParties(), unarriverParties);
equal(phaser.getRegisteredParties(), registeredParties);
}
/**
* arriveAndAwaitAdvance continues waiting if interrupted before waiting
*/
public void testArriveAndAwaitAdvanceAfterInterrupt() {
final Phaser phaser = new Phaser();
assertEquals(0, phaser.register());
final CountDownLatch pleaseArrive = new CountDownLatch(1);
Thread t = newStartedThread(new CheckedRunnable() {
public void realRun() {
Thread.currentThread().interrupt();
assertEquals(0, phaser.register());
pleaseArrive.countDown();
assertTrue(Thread.currentThread().isInterrupted());
assertEquals(1, phaser.arriveAndAwaitAdvance());
assertTrue(Thread.interrupted());
}});
await(pleaseArrive);
waitForThreadToEnterWaitState(t);
Thread.currentThread().interrupt();
assertEquals(1, phaser.arriveAndAwaitAdvance());
assertTrue(Thread.interrupted());
awaitTermination(t);
}
static Writer interruptibleWriter(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate) {
return new Writer("InterruptibleWriter") { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
if (timeout < 0)
stamp(sl.writeLockInterruptibly());
else
stamp(sl.tryWriteLock(timeout, unit));
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} catch (Throwable x) { thrown(x);
} finally { if (stamp() != 0L) sl.unlockWrite(stamp()); } }};
}
static Writer interruptibleWriter(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate) {
return new Writer("InterruptibleWriter") { public void run() {
if (gate != null ) toTheStartingGate(gate);
try {
if (timeout < 0)
stamp(sl.writeLockInterruptibly());
else
stamp(sl.tryWriteLock(timeout, unit));
check(sl.validate(stamp()));
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} catch (Throwable x) { thrown(x);
} finally { if (stamp() != 0L) sl.unlockWrite(stamp()); } }};
}
@Inject
public WriteCloseableDataStore(@ManagedDataStoreDelegate DataStore delegate,
@ManagedTableBackingStoreDelegate TableBackingStore tableBackingStore,
MetricRegistry metricRegistry) {
_delegate = requireNonNull(delegate);
_tableBackingStore = requireNonNull(tableBackingStore);
_writesAccepted = true;
_writerPhaser = new Phaser(1);
_writesRejectedCounter = metricRegistry.counter(MetricRegistry.name("bv.emodb.sor", "WriteCloseableDataStore",
"writesRejected"));
}
/**
* Tests that blocking on a single monitor properly increases the
* blocked count at least by 1. Also asserts that the correct lock name is provided.
*/
private static void testBlockingOnSimpleMonitor() throws Exception {
System.out.println("testBlockingOnSimpleMonitor");
final Object lock1 = new Object();
System.out.println("Lock1 = " + lock1);
final Phaser p = new Phaser(2);
LockerThread lt = newLockerThread(new Runnable() {
@Override
public void run() {
p.arriveAndAwaitAdvance(); // phase[1]
synchronized(lock1) {
System.out.println("[LockerThread obtained Lock1]");
p.arriveAndAwaitAdvance(); // phase[2]
}
p.arriveAndAwaitAdvance(); // phase[3]
}
});
lt.start();
long tid = lt.getId();
ThreadInfo ti = mbean.getThreadInfo(tid);
String lockName = null;
synchronized(lock1) {
p.arriveAndAwaitAdvance(); // phase[1]
waitForThreadState(lt, Thread.State.BLOCKED);
do {
lockName = mbean.getThreadInfo(tid).getLockName();
} while (lockName == null);
}
p.arriveAndAwaitAdvance(); // phase[2]
testBlocked(ti, () -> mbean.getThreadInfo(tid), lockName, lock1);
p.arriveAndDeregister(); // phase[3]
lt.join();
printok();
}
void testTiered() throws Throwable {
Phaser root = new Phaser();
// this is extremely dependent on internal representation
stateField.setLong(root, ((Integer.MAX_VALUE - 1L) << 32) | 1L);
checkState(root, Integer.MAX_VALUE - 1, 0, 0);
Phaser p1 = new Phaser(root, 1);
checkState(root, Integer.MAX_VALUE - 1, 1, 1);
checkState(p1, Integer.MAX_VALUE - 1, 1, 1);
Phaser p2 = new Phaser(root, 2);
checkState(root, Integer.MAX_VALUE - 1, 2, 2);
checkState(p2, Integer.MAX_VALUE - 1, 2, 2);
int ph = Integer.MAX_VALUE - 1;
for (int k = 0; k < 5; k++) {
checkState(root, ph, 2, 2);
checkState(p1, ph, 1, 1);
checkState(p2, ph, 2, 2);
p1.arrive();
checkState(root, ph, 2, 1);
checkState(p1, ph, 1, 0);
checkState(p2, ph, 2, 2);
p2.arrive();
checkState(root, ph, 2, 1);
checkState(p1, ph, 1, 0);
checkState(p2, ph, 2, 1);
p2.arrive();
ph = phaseInc(ph);
checkState(root, ph, 2, 2);
checkState(p1, ph, 1, 1);
checkState(p2, ph, 2, 2);
}
equal(3, ph);
}
static Writer writerView(final StampedLock sl, final Phaser gate) {
return new Writer("WriterView") { public void run() {
if (gate != null ) toTheStartingGate(gate);
Lock wl = sl.asWriteLock();
wl.lock();
try {
stamp(1L); // got the lock
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} finally { wl.unlock(); } }};
}
private static Awaiter awaiter(final Phaser phaser) {
return new Awaiter() { public void run() {
toTheStartingGate();
try {
if (cycleArriveAwaitAdvance.getAndIncrement() % 2 == 0)
phase(phaser.awaitAdvance(phaser.arrive()));
else
phase(phaser.arriveAndAwaitAdvance());
} catch (Throwable result) { result(result); }}};
}
static void adderTest(int nthreads, int incs) {
System.out.print("LongAdder ");
Phaser phaser = new Phaser(nthreads + 1);
LongAdder a = new LongAdder();
for (int i = 0; i < nthreads; ++i)
pool.execute(new AdderTask(a, phaser, incs));
report(nthreads, incs, timeTasks(phaser), a.sum());
}
static Reader interruptibleReader(final StampedLock sl,
final long timeout,
final TimeUnit unit,
final Phaser gate,
final boolean view) {
return view ? interruptibleReaderView(sl, timeout, unit, gate)
: interruptibleReader(sl, timeout, unit, gate);
}
private static Arriver arriver(final Phaser phaser) {
return new Arriver() { public void run() {
toTheStartingGate();
try { phase(phaser.arrive()); }
catch (Throwable result) { result(result); }}};
}
static void casTest(int nthreads, int incs) {
System.out.print("AtomicLong ");
Phaser phaser = new Phaser(nthreads + 1);
AtomicLong a = new AtomicLong();
for (int i = 0; i < nthreads; ++i)
pool.execute(new CasTask(a, phaser, incs));
report(nthreads, incs, timeTasks(phaser), a.get());
}
void testLeaf() throws Throwable {
Phaser phaser = new Phaser();
// this is extremely dependent on internal representation
stateField.setLong(phaser, ((Integer.MAX_VALUE - 1L) << 32) | 1L);
checkState(phaser, Integer.MAX_VALUE - 1, 0, 0);
phaser.register();
checkState(phaser, Integer.MAX_VALUE - 1, 1, 1);
phaser.arrive();
checkState(phaser, Integer.MAX_VALUE, 1, 1);
phaser.arrive();
checkState(phaser, 0, 1, 1);
phaser.arrive();
checkState(phaser, 1, 1, 1);
}
static void casTest(int nthreads, int incs) {
System.out.print("AtomicLong ");
Phaser phaser = new Phaser(nthreads + 1);
AtomicLong a = new AtomicLong();
for (int i = 0; i < nthreads; ++i)
pool.execute(new CasTask(a, phaser, incs));
report(nthreads, incs, timeTasks(phaser), a.get());
}
static Writer writerView(final StampedLock sl, final Phaser gate) {
return new Writer("WriterView") { public void run() {
if (gate != null ) toTheStartingGate(gate);
Lock wl = sl.asWriteLock();
wl.lock();
try {
stamp(1L); // got the lock
check(!sl.isReadLocked());
check(sl.isWriteLocked());
} finally { wl.unlock(); } }};
}
static void casTest(int nthreads, int incs) {
System.out.print("AtomicLong ");
Phaser phaser = new Phaser(nthreads + 1);
AtomicLong a = new AtomicLong();
for (int i = 0; i < nthreads; ++i)
pool.execute(new CasTask(a, phaser, incs));
report(nthreads, incs, timeTasks(phaser), a.get());
}