下面列出了io.reactivex.Scheduler#createWorker ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads, long timeout, TimeUnit unit) {
final CountDownLatch latch = new CountDownLatch(numThreads);
for (int i = 1; i <= numThreads; i++) {
final Worker worker = scheduler.createWorker();
worker.schedule(new Runnable() {
@Override
public void run() {
worker.dispose();
latch.countDown();
}
});
}
try {
boolean finished = latch.await(timeout, unit);
if (!finished) {
throw new RuntimeException("timeout occured waiting for work to finish");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
InsertTimeoutSubscriber(Subscriber<? super T> downstream, Function<? super T, ? extends Long> timeout,
TimeUnit unit, Function<? super T, ? extends T> value, Scheduler scheduler) {
this.downstream = downstream;
this.timeout = timeout;
this.unit = unit;
this.value = value;
this.queue = new MpscLinkedQueue<T>();
this.requested = new AtomicLong();
this.inserted = new AtomicLong();
this.terminated = new AtomicReference<Object>();
this.scheduled = new AtomicReference<Disposable>();
this.worker = scheduler.createWorker();
}
@Test
public void testDispose() {
Scheduler s = SchedulerHelper.withThreadId(Schedulers.trampoline(), "boo");
Worker w = s.createWorker();
Assert.assertFalse(w.isDisposed());
w.dispose();
Assert.assertTrue(w.isDisposed());
}