下面列出了io.reactivex.Scheduler#Worker ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void testUnsubscribeBetweenActions(Supplier<ContextScheduler> scheduler) throws Exception {
ContextScheduler scheduler2 = scheduler.get();
Scheduler.Worker worker = scheduler2.createWorker();
AtomicInteger count = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Disposable> sub = new AtomicReference<>();
sub.set(worker.schedulePeriodically(() -> {
if (count.incrementAndGet() == 4) {
latch.countDown();
}
}, 0, 20, MILLISECONDS));
awaitLatch(latch);
sub.get().dispose();
Thread.sleep(60);
assertEquals(4, count.get());
}
private static <T> void startScheduledResetAgain(final long duration, final TimeUnit unit,
final Scheduler scheduler, final AtomicReference<CachedObservable<T>> cacheRef,
final AtomicReference<Optional<Scheduler.Worker>> workerRef) {
Runnable action = new Runnable() {
@Override
public void run() {
cacheRef.get().reset();
}
};
// CAS loop to cancel the current worker and create a new one
while (true) {
Optional<Scheduler.Worker> wOld = workerRef.get();
if (wOld == null) {
// we are finished
return;
}
Optional<Scheduler.Worker> w = Optional.of(scheduler.createWorker());
if (workerRef.compareAndSet(wOld, w)) {
if (wOld.isPresent())
wOld.get().dispose();
w.get().schedule(action, duration, unit);
break;
}
}
}
private static <T> void startScheduledResetAgain(final long duration, final TimeUnit unit,
final Scheduler scheduler, final AtomicReference<CachedFlowable<T>> cacheRef,
final AtomicReference<Optional<Scheduler.Worker>> workerRef) {
Runnable action = new Runnable() {
@Override
public void run() {
cacheRef.get().reset();
}
};
// CAS loop to cancel the current worker and create a new one
while (true) {
Optional<Scheduler.Worker> wOld = workerRef.get();
if (wOld == null) {
// we are finished
return;
}
Optional<Scheduler.Worker> w = Optional.of(scheduler.createWorker());
if (workerRef.compareAndSet(wOld, w)) {
if (wOld.isPresent())
wOld.get().dispose();
w.get().schedule(action, duration, unit);
break;
}
}
}
@BeforeClass
public static void setUpRxSchedulers() {
Scheduler immediate = new Scheduler() {
@Override
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// this prevents StackOverflowErrors when scheduling with a delay
return super.scheduleDirect(run, 0, unit);
}
@Override
public Scheduler.Worker createWorker() {
return new ExecutorScheduler.ExecutorWorker(Runnable::run, false);
}
};
RxJavaPlugins.setInitIoSchedulerHandler(scheduler -> immediate);
RxJavaPlugins.setInitComputationSchedulerHandler(scheduler -> immediate);
RxJavaPlugins.setInitNewThreadSchedulerHandler(scheduler -> immediate);
RxJavaPlugins.setInitSingleSchedulerHandler(scheduler -> immediate);
RxAndroidPlugins.setInitMainThreadSchedulerHandler(scheduler -> immediate);
}
private void testScheduleDelayed(Supplier<ContextScheduler> scheduler, Consumer<Thread> threadAssert) throws Exception {
ContextScheduler scheduler2 = scheduler.get();
Scheduler.Worker worker = scheduler2.createWorker();
long time = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Thread> thread = new AtomicReference<>();
AtomicLong execTime = new AtomicLong();
worker.schedule(() -> {
thread.set(Thread.currentThread());
execTime.set(System.currentTimeMillis() - time);
latch.countDown();
}, 40, MILLISECONDS);
awaitLatch(latch);
threadAssert.accept(thread.get());
assertTrue(execTime.get() >= 40);
}
private void testSchedulePeriodic(Supplier<ContextScheduler> scheduler, Consumer<Thread> threadAssert) {
disableThreadChecks();
ContextScheduler scheduler2 = scheduler.get();
Scheduler.Worker worker = scheduler2.createWorker();
AtomicLong time = new AtomicLong(System.currentTimeMillis() - 40);
AtomicInteger count = new AtomicInteger();
AtomicReference<Disposable> sub = new AtomicReference<>();
sub.set(worker.schedulePeriodically(() -> {
threadAssert.accept(Thread.currentThread());
if (count.incrementAndGet() > 2) {
sub.get().dispose();
testComplete();
} else {
long now = System.currentTimeMillis();
long delta = now - time.get();
assertTrue("" + delta, delta >= 40);
time.set(now);
}
}, 0, 40, MILLISECONDS));
await();
}
@Test
public void testPeriodicRescheduleAfterActionBlocking() {
ContextScheduler scheduler2 = new ContextScheduler(vertx, true);
Scheduler.Worker worker = scheduler2.createWorker();
AtomicBoolean b = new AtomicBoolean();
long time = System.nanoTime();
worker.schedulePeriodically(() -> {
if (b.compareAndSet(false, true)) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
fail();
}
} else {
assertTrue(System.nanoTime() - time > NANOSECONDS.convert(20 + 10 + 20, MILLISECONDS));
worker.dispose();
testComplete();
}
}, 20, 20, MILLISECONDS);
await();
}
private void testWorkerUnsubscribe(Supplier<ContextScheduler> scheduler) throws Exception {
ContextScheduler scheduler2 = scheduler.get();
Scheduler.Worker worker = scheduler2.createWorker();
CountDownLatch latch = new CountDownLatch(2);
Disposable sub1 = worker.schedule(latch::countDown, 40, MILLISECONDS);
Disposable sub2 = worker.schedule(latch::countDown, 40, MILLISECONDS);
worker.dispose();
assertTrue(sub1.isDisposed());
assertTrue(sub2.isDisposed());
assertFalse(latch.await(40, MILLISECONDS));
assertEquals(2, latch.getCount());
}
@Test public void runningWorkReportsBusy() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new Runnable() {
@Override public void run() {
assertBusy();
}
});
delegate.triggerActions();
}
@Test public void unsubscribingScheduledWorkWhileRunningWorkReportsBusy() {
final Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new Runnable() {
@Override public void run() {
worker.dispose();
assertBusy();
}
});
delegate.triggerActions();
}
private void testScheduleImmediatly(Supplier<ContextScheduler> scheduler, Consumer<Thread> threadAssert) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ContextScheduler scheduler2 = scheduler.get();
Scheduler.Worker worker = scheduler2.createWorker();
AtomicReference<Thread> thread = new AtomicReference<>();
worker.schedule(() -> {
thread.set(Thread.currentThread());
latch.countDown();
}, 0, MILLISECONDS);
awaitLatch(latch);
threadAssert.accept(thread.get());
}
private void testUnsubscribeDuringExecute(Supplier<ContextScheduler> scheduler) throws Exception {
ContextScheduler scheduler2 = scheduler.get();
Scheduler.Worker worker = scheduler2.createWorker();
AtomicInteger count = new AtomicInteger();
AtomicReference<Disposable> sub = new AtomicReference<>();
sub.set(worker.schedulePeriodically(() -> {
if (count.getAndIncrement() == 0) {
sub.get().dispose();
}
}, 0, 5, MILLISECONDS));
Thread.sleep(60);
assertEquals(1, count.get());
}
@Test public void scheduledWorkUnsubscribedReportsIdle() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new CountingRunnable()).dispose();
assertIdle(1);
}
@Test public void scheduleWithZeroDelayReportsBusy() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new CountingRunnable(), 0, SECONDS);
assertBusy();
}
@Test public void scheduleWithNonZeroDelayReportsIdle() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new CountingRunnable(), 1, SECONDS);
assertIdle(0);
}
@Test public void schedulePeriodicallyWithZeroDelayReportsBusy() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedulePeriodically(new CountingRunnable(), 0, 1, SECONDS);
assertBusy();
}
@Test public void schedulePeriodicallyWithNonZeroDelayReportsIdle() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedulePeriodically(new CountingRunnable(), 1, 1, SECONDS);
assertIdle(0);
}
@Test public void unsubscribingScheduledWorksReportsIdle() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new CountingRunnable());
worker.dispose();
assertIdle(1);
}
@Test public void scheduleWorkAfterUnsubscribedReportsIdle() {
Scheduler.Worker worker = scheduler.createWorker();
worker.dispose();
worker.schedule(new CountingRunnable());
assertIdle(0);
}
@Test public void finishingWorkWithoutRegisteredCallbackDoesNotCrash() {
IdlingResourceScheduler scheduler = Rx2Idler.wrap(delegate, "Bob");
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new CountingRunnable());
delegate.triggerActions();
}