下面列出了怎么用java.util.concurrent.BrokenBarrierException的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void shouldGenerateUniqueIdsInMultithreadedEnvironment()
throws BrokenBarrierException, InterruptedException {
AwsXRayIdsGenerator generator = new AwsXRayIdsGenerator();
Set<TraceId> traceIds = new CopyOnWriteArraySet<>();
Set<SpanId> spanIds = new CopyOnWriteArraySet<>();
int threads = 8;
int generations = 128;
CyclicBarrier barrier = new CyclicBarrier(threads + 1);
Executor executor = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
executor.execute(new GenerateRunner(generations, generator, barrier, traceIds, spanIds));
}
barrier.await();
barrier.await();
assertThat(traceIds).hasSize(threads * generations);
assertThat(spanIds).hasSize(threads * generations);
}
@Override
public void run() {
while (!aligningDone()) {
align();
// Exceptions cause wrong results but do not report
// that in any way
try {
barrier.await();
} catch (InterruptedException e) {
return;
} catch (BrokenBarrierException e2) {
return;
} catch (CancellationException e3) {
return;
}
}
}
@Override
public void handle(HttpExchange exchange) throws IOException {
count++;
try {
switch(count) {
case 0:
AuthenticationHandler.errorReply(exchange,
"Basic realm=\"realm2\"");
break;
case 1:
t1Cond1.await();
AuthenticationHandler.okReply(exchange);
break;
default:
System.out.println ("Unexpected request");
}
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
@Override
public void run() {
try {
// wait until forceAbort leave monitor
barrier.await();
if (UNSAFE.tryMonitorEnter(monitor)) {
try {
barrier.await();
Thread.sleep(timeout);
} finally {
UNSAFE.monitorExit(monitor);
}
} else {
throw new RuntimeException("Monitor should be entered by " +
"::run() first.");
}
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException("Synchronization error happened.", e);
}
}
@Override
public void handle(HttpExchange exchange) throws IOException {
count++;
try {
switch(count) {
case 0:
AuthenticationHandler.errorReply(exchange,
"Basic realm=\"realm1\"");
break;
case 1:
t1Cond1.await();
t1cond2latch.await();
AuthenticationHandler.okReply(exchange);
break;
default:
System.out.println ("Unexpected request");
}
} catch (InterruptedException |
BrokenBarrierException e)
{
throw new RuntimeException(e);
}
}
@Override
public void handle(HttpExchange exchange) throws IOException {
count++;
try {
switch(count) {
case 0:
AuthenticationHandler.errorReply(exchange,
"Basic realm=\"realm1\"");
break;
case 1:
t1Cond1.await();
AuthenticationHandler.okReply(exchange);
break;
default:
System.out.println ("Unexpected request");
}
} catch (InterruptedException |
BrokenBarrierException e)
{
throw new RuntimeException(e);
}
}
@Override
public void handle(HttpExchange exchange) throws IOException {
count++;
try {
switch(count) {
case 0:
AuthenticationHandler.errorReply(exchange,
"Basic realm=\"realm2\"");
break;
case 1:
t1Cond1.await();
AuthenticationHandler.okReply(exchange);
break;
default:
System.out.println ("Unexpected request");
}
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
Runnable barrierAction = () -> System.out.println("Well done, guys!");
var executor = Executors.newCachedThreadPool();
var barrier = new CyclicBarrier(10, barrierAction);
Runnable task = () -> {
try {
// simulating a task that can take at most 1sec to run
System.out.println("Doing task for " + Thread.currentThread().getName());
Thread.sleep(new Random().nextInt(10) * 100);
System.out.println("Done for " + Thread.currentThread().getName());
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 10; i++) {
executor.execute(task);
}
executor.shutdown();
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
for (int i = 1; i <= 18; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "开始等待其他线程");
try {
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "开始执行业务逻辑,耗时0.5秒");
// 工作线程开始处理,这里用Thread.sleep()来模拟业务处理
Thread.sleep(500);
System.out.println(Thread.currentThread().getName() + "业务逻辑执行完毕");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
private void syncWait(boolean isStart) {
Sync sync = syncBarriers.get();
if (sync != null) {
String step = isStart ? "start" : "end";
logVerbose("server waiting sync on " + step);
CyclicBarrier barrier = isStart ? sync.processingStart : sync.processingEnd;
long waitStart = System.currentTimeMillis();
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
} finally {
barrier.reset();
}
long waitedMillis = System.currentTimeMillis() - waitStart;
logVerbose("waited for {} ms at processing {}", waitedMillis, step);
}
}
@Test
public void subscribeCloseSynchronously() throws Exception {
AtomicReference<Future<?>> futureRef = new AtomicReference<>();
toSource(cbos.connect().afterOnSubscribe(subscription -> {
// We want to increase the chance that the writer thread has to wait for the Subscriber to become
// available, instead of waiting for the requestN demand.
CyclicBarrier barrier = new CyclicBarrier(2);
futureRef.compareAndSet(null, executorService.submit(toRunnable(() -> {
barrier.await();
cbos.close();
})));
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
})).subscribe(subscriber);
Future<?> f = futureRef.get();
assertNotNull(f);
f.get();
assertThat(subscriber.takeTerminal(), is(complete()));
}
@Test
public void subscribeCloseSynchronously() throws Exception {
AtomicReference<Future<?>> futureRef = new AtomicReference<>();
toSource(cpw.connect().afterOnSubscribe(subscription -> {
// We want to increase the chance that the writer thread has to wait for the Subscriber to become
// available, instead of waiting for the requestN demand.
CyclicBarrier barrier = new CyclicBarrier(2);
futureRef.compareAndSet(null, executorService.submit(toRunnable(() -> {
barrier.await();
cpw.close();
})));
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
})).subscribe(subscriber);
Future<?> f = futureRef.get();
assertNotNull(f);
f.get();
assertThat(subscriber.takeTerminal(), is(complete()));
}
@Override
public void run() {
try {
// wait until forceAbort leave monitor
barrier.await();
if (UNSAFE.tryMonitorEnter(monitor)) {
try {
barrier.await();
Thread.sleep(timeout);
} finally {
UNSAFE.monitorExit(monitor);
}
} else {
throw new RuntimeException("Monitor should be entered by " +
"::run() first.");
}
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException("Synchronization error happened.", e);
}
}
@Override
public void handle(HttpExchange exchange) throws IOException {
count++;
try {
switch(count) {
case 0:
AuthenticationHandler.errorReply(exchange,
"Basic realm=\"realm1\"");
break;
case 1:
t1Cond1.await();
t1cond2latch.await();
AuthenticationHandler.okReply(exchange);
break;
default:
System.out.println ("Unexpected request");
}
} catch (InterruptedException |
BrokenBarrierException e)
{
throw new RuntimeException(e);
}
}
@Override
public void handle(HttpExchange exchange) throws IOException {
count++;
try {
switch(count) {
case 0:
AuthenticationHandler.errorReply(exchange,
"Basic realm=\"realm2\"");
break;
case 1:
t1Cond1.await();
t1cond1latch.countDown();
t1cond2latch.await();
AuthenticationHandler.okReply(exchange);
break;
default:
System.out.println ("Unexpected request");
}
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
@Override
public void handle(HttpExchange exchange) throws IOException {
count++;
switch(count) {
case 0:
AuthenticationHandler.errorReply(exchange,
"Basic realm=\"realm1\"");
try {
t1Cond2.await();
} catch (InterruptedException |
BrokenBarrierException e)
{
throw new RuntimeException(e);
}
break;
case 1:
AuthenticationHandler.okReply(exchange);
break;
default:
System.out.println ("Unexpected request");
}
}
@Override
public void handle(HttpExchange exchange) throws IOException {
count++;
switch(count) {
case 0:
AuthenticationHandler.errorReply(exchange,
"Basic realm=\"realm2\"");
try {
t1Cond2.await();
} catch (InterruptedException |
BrokenBarrierException e)
{
throw new RuntimeException(e);
}
t1cond2latch.countDown();
break;
case 1:
AuthenticationHandler.okReply(exchange);
break;
default:
System.out.println ("Unexpected request");
}
}
@Override
public void run() {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
Random r = new Random();
while (!stop.get()) {
Integer key = r.nextInt(NUM_OF_ENTRIES);
int op = r.nextInt(100);
if (op < getPercents) {
oak.zc().get(key);
} else {
oak.zc().put(key, 8);
}
}
}
@Override
public void run() {
try {
// wait until forceAbort leave monitor
barrier.await();
if (UNSAFE.tryMonitorEnter(monitor)) {
try {
barrier.await();
Thread.sleep(timeout);
} finally {
UNSAFE.monitorExit(monitor);
}
} else {
throw new RuntimeException("Monitor should be entered by " +
"::run() first.");
}
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException("Synchronization error happened.", e);
}
}
@Override
public void handle(HttpExchange exchange) throws IOException {
count++;
try {
switch(count) {
case 0:
AuthenticationHandler.errorReply(exchange,
"Basic realm=\"realm1\"");
break;
case 1:
t1Cond1.await();
AuthenticationHandler.okReply(exchange);
break;
default:
System.out.println ("Unexpected request");
}
} catch (InterruptedException |
BrokenBarrierException e)
{
throw new RuntimeException(e);
}
}
@Override
public void handle(HttpExchange exchange) throws IOException {
count++;
try {
switch(count) {
case 0:
AuthenticationHandler.errorReply(exchange,
"Basic realm=\"realm2\"");
break;
case 1:
t1Cond1.await();
AuthenticationHandler.okReply(exchange);
break;
default:
System.out.println ("Unexpected request");
}
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " waiting at barrier 1");
this.barrier1.await();
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " waiting at barrier 2");
this.barrier2.await();
System.out.println(Thread.currentThread().getName() + " done!");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
/**
* An interruption in one party causes others waiting in await to
* throw BrokenBarrierException
*/
public void testAwait1_Interrupted_BrokenBarrier() {
final CyclicBarrier c = new CyclicBarrier(3);
final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
public void realRun() throws Exception {
pleaseInterrupt.countDown();
c.await();
}};
Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
public void realRun() throws Exception {
pleaseInterrupt.countDown();
c.await();
}};
t1.start();
t2.start();
await(pleaseInterrupt);
t1.interrupt();
awaitTermination(t1);
awaitTermination(t2);
}
/**
* A reset of an active barrier causes waiting threads to throw
* BrokenBarrierException
*/
public void testReset_BrokenBarrier() throws InterruptedException {
final CyclicBarrier c = new CyclicBarrier(3);
final CountDownLatch pleaseReset = new CountDownLatch(2);
Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
public void realRun() throws Exception {
pleaseReset.countDown();
c.await();
}};
Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
public void realRun() throws Exception {
pleaseReset.countDown();
c.await();
}};
t1.start();
t2.start();
await(pleaseReset);
awaitNumberWaiting(c, 2);
c.reset();
awaitTermination(t1);
awaitTermination(t2);
}
@Override
public void handle(HttpExchange exchange) throws IOException {
count++;
try {
switch(count) {
case 0:
AuthenticationHandler.errorReply(exchange,
"Basic realm=\"realm1\"");
break;
case 1:
t1Cond1.await();
AuthenticationHandler.okReply(exchange);
break;
default:
System.out.println ("Unexpected request");
}
} catch (InterruptedException |
BrokenBarrierException e)
{
throw new RuntimeException(e);
}
}
/**
* An interruption in one party causes others waiting in timed await to
* throw BrokenBarrierException
*/
public void testAwait2_Interrupted_BrokenBarrier() throws Exception {
final CyclicBarrier c = new CyclicBarrier(3);
final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
public void realRun() throws Exception {
pleaseInterrupt.countDown();
c.await(LONG_DELAY_MS, MILLISECONDS);
}};
Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
public void realRun() throws Exception {
pleaseInterrupt.countDown();
c.await(LONG_DELAY_MS, MILLISECONDS);
}};
t1.start();
t2.start();
await(pleaseInterrupt);
t1.interrupt();
awaitTermination(t1);
awaitTermination(t2);
}
public void await() {
if(0 == delay) {
log.info("No pause between retry");
return;
}
final Timer wakeup = new Timer();
final CyclicBarrier wait = new CyclicBarrier(2);
// Schedule for immediate execution with an interval of 1s
wakeup.scheduleAtFixedRate(new PauserTimerTask(wait), 0, 1000);
try {
// Wait for notify from wakeup timer
wait.await();
}
catch(InterruptedException | BrokenBarrierException e) {
log.error(e.getMessage(), e);
}
}
@Test
public void testConcurrent() throws Exception {
final TransferQueue queue = new TransferQueue(1);
final DownloadTransfer transfer = new DownloadTransfer(new Host(new TestProtocol()), new Path("/t", EnumSet.of(Path.Type.directory)), null);
queue.add(transfer, new DisabledProgressListener());
final AtomicBoolean added = new AtomicBoolean();
final CyclicBarrier wait = new CyclicBarrier(2);
new Thread(new Runnable() {
@Override
public void run() {
queue.add(new DownloadTransfer(new Host(new TestProtocol()), new Path("/t", EnumSet.of(Path.Type.directory)), null), new DisabledProgressListener());
added.set(true);
try {
wait.await();
}
catch(InterruptedException | BrokenBarrierException e) {
fail();
}
}
}).start();
assertFalse(added.get());
queue.remove(transfer);
wait.await();
assertTrue(added.get());
}
@Override
protected void sleep(long millis) throws InterruptedException {
try {
this.tickBarrier.await();
this.tickBarrier.await();
} catch (BrokenBarrierException cause) {
throw new RuntimeException(cause);
}
}
void halfTick() {
try {
tickBarrier.await();
} catch (BrokenBarrierException | InterruptedException error) {
throw new TestException(error);
}
}