io.reactivex.Scheduler#Worker ( )源码实例Demo

下面列出了io.reactivex.Scheduler#Worker ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: vertx-rx   文件: SchedulerTest.java
private void testUnsubscribeBetweenActions(Supplier<ContextScheduler> scheduler) throws Exception {
  ContextScheduler scheduler2 = scheduler.get();
  Scheduler.Worker worker = scheduler2.createWorker();
  AtomicInteger count = new AtomicInteger();
  CountDownLatch latch = new CountDownLatch(1);
  AtomicReference<Disposable> sub = new AtomicReference<>();
  sub.set(worker.schedulePeriodically(() -> {
    if (count.incrementAndGet() == 4) {
      latch.countDown();
    }
  }, 0, 20, MILLISECONDS));
  awaitLatch(latch);
  sub.get().dispose();
  Thread.sleep(60);
  assertEquals(4, count.get());
}
 
源代码2 项目: rxjava2-extras   文件: Observables.java
private static <T> void startScheduledResetAgain(final long duration, final TimeUnit unit,
                                                 final Scheduler scheduler, final AtomicReference<CachedObservable<T>> cacheRef,
                                                 final AtomicReference<Optional<Scheduler.Worker>> workerRef) {

    Runnable action = new Runnable() {
        @Override
        public void run() {
            cacheRef.get().reset();
        }
    };
    // CAS loop to cancel the current worker and create a new one
    while (true) {
        Optional<Scheduler.Worker> wOld = workerRef.get();
        if (wOld == null) {
            // we are finished
            return;
        }
        Optional<Scheduler.Worker> w = Optional.of(scheduler.createWorker());
        if (workerRef.compareAndSet(wOld, w)) {
            if (wOld.isPresent())
                wOld.get().dispose();
            w.get().schedule(action, duration, unit);
            break;
        }
    }
}
 
源代码3 项目: rxjava2-extras   文件: Flowables.java
private static <T> void startScheduledResetAgain(final long duration, final TimeUnit unit,
        final Scheduler scheduler, final AtomicReference<CachedFlowable<T>> cacheRef,
        final AtomicReference<Optional<Scheduler.Worker>> workerRef) {

    Runnable action = new Runnable() {
        @Override
        public void run() {
            cacheRef.get().reset();
        }
    };
    // CAS loop to cancel the current worker and create a new one
    while (true) {
        Optional<Scheduler.Worker> wOld = workerRef.get();
        if (wOld == null) {
            // we are finished
            return;
        }
        Optional<Scheduler.Worker> w = Optional.of(scheduler.createWorker());
        if (workerRef.compareAndSet(wOld, w)) {
            if (wOld.isPresent())
                wOld.get().dispose();
            w.get().schedule(action, duration, unit);
            break;
        }
    }
}
 
源代码4 项目: Hentoid   文件: AbstractObjectBoxTest.java
@BeforeClass
public static void setUpRxSchedulers() {
    Scheduler immediate = new Scheduler() {
        @Override
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            // this prevents StackOverflowErrors when scheduling with a delay
            return super.scheduleDirect(run, 0, unit);
        }

        @Override
        public Scheduler.Worker createWorker() {
            return new ExecutorScheduler.ExecutorWorker(Runnable::run, false);
        }
    };

    RxJavaPlugins.setInitIoSchedulerHandler(scheduler -> immediate);
    RxJavaPlugins.setInitComputationSchedulerHandler(scheduler -> immediate);
    RxJavaPlugins.setInitNewThreadSchedulerHandler(scheduler -> immediate);
    RxJavaPlugins.setInitSingleSchedulerHandler(scheduler -> immediate);
    RxAndroidPlugins.setInitMainThreadSchedulerHandler(scheduler -> immediate);
}
 
源代码5 项目: vertx-rx   文件: SchedulerTest.java
private void testScheduleDelayed(Supplier<ContextScheduler> scheduler, Consumer<Thread> threadAssert) throws Exception {
  ContextScheduler scheduler2 = scheduler.get();
  Scheduler.Worker worker = scheduler2.createWorker();
  long time = System.currentTimeMillis();
  CountDownLatch latch = new CountDownLatch(1);
  AtomicReference<Thread> thread = new AtomicReference<>();
  AtomicLong execTime = new AtomicLong();
  worker.schedule(() -> {
    thread.set(Thread.currentThread());
    execTime.set(System.currentTimeMillis() - time);
    latch.countDown();
  }, 40, MILLISECONDS);
  awaitLatch(latch);
  threadAssert.accept(thread.get());
  assertTrue(execTime.get() >= 40);
}
 
源代码6 项目: vertx-rx   文件: SchedulerTest.java
private void testSchedulePeriodic(Supplier<ContextScheduler> scheduler, Consumer<Thread> threadAssert) {
  disableThreadChecks();
  ContextScheduler scheduler2 = scheduler.get();
  Scheduler.Worker worker = scheduler2.createWorker();
  AtomicLong time = new AtomicLong(System.currentTimeMillis() - 40);
  AtomicInteger count = new AtomicInteger();
  AtomicReference<Disposable> sub = new AtomicReference<>();
  sub.set(worker.schedulePeriodically(() -> {
    threadAssert.accept(Thread.currentThread());
    if (count.incrementAndGet() > 2) {
      sub.get().dispose();
      testComplete();
    } else {
      long now = System.currentTimeMillis();
      long delta = now - time.get();
      assertTrue("" + delta, delta >= 40);
      time.set(now);
    }
  }, 0, 40, MILLISECONDS));
  await();
}
 
源代码7 项目: vertx-rx   文件: SchedulerTest.java
@Test
public void testPeriodicRescheduleAfterActionBlocking() {
  ContextScheduler scheduler2 = new ContextScheduler(vertx, true);
  Scheduler.Worker worker = scheduler2.createWorker();
  AtomicBoolean b = new AtomicBoolean();
  long time = System.nanoTime();
  worker.schedulePeriodically(() -> {
    if (b.compareAndSet(false, true)) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        fail();
      }
    } else {
      assertTrue(System.nanoTime() - time > NANOSECONDS.convert(20 + 10 + 20, MILLISECONDS));
      worker.dispose();
      testComplete();
    }
  }, 20, 20, MILLISECONDS);
  await();
}
 
源代码8 项目: vertx-rx   文件: SchedulerTest.java
private void testWorkerUnsubscribe(Supplier<ContextScheduler> scheduler) throws Exception {
  ContextScheduler scheduler2 = scheduler.get();
  Scheduler.Worker worker = scheduler2.createWorker();
  CountDownLatch latch = new CountDownLatch(2);
  Disposable sub1 = worker.schedule(latch::countDown, 40, MILLISECONDS);
  Disposable sub2 = worker.schedule(latch::countDown, 40, MILLISECONDS);
  worker.dispose();
  assertTrue(sub1.isDisposed());
  assertTrue(sub2.isDisposed());
  assertFalse(latch.await(40, MILLISECONDS));
  assertEquals(2, latch.getCount());
}
 
@Test public void runningWorkReportsBusy() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new Runnable() {
    @Override public void run() {
      assertBusy();
    }
  });
  delegate.triggerActions();
}
 
@Test public void unsubscribingScheduledWorkWhileRunningWorkReportsBusy() {
  final Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new Runnable() {
    @Override public void run() {
      worker.dispose();
      assertBusy();
    }
  });
  delegate.triggerActions();
}
 
源代码11 项目: vertx-rx   文件: SchedulerTest.java
private void testScheduleImmediatly(Supplier<ContextScheduler> scheduler, Consumer<Thread> threadAssert) throws Exception {
  CountDownLatch latch = new CountDownLatch(1);
  ContextScheduler scheduler2 = scheduler.get();
  Scheduler.Worker worker = scheduler2.createWorker();
  AtomicReference<Thread> thread = new AtomicReference<>();
  worker.schedule(() -> {
    thread.set(Thread.currentThread());
    latch.countDown();
  }, 0, MILLISECONDS);
  awaitLatch(latch);
  threadAssert.accept(thread.get());
}
 
源代码12 项目: vertx-rx   文件: SchedulerTest.java
private void testUnsubscribeDuringExecute(Supplier<ContextScheduler> scheduler) throws Exception {
  ContextScheduler scheduler2 = scheduler.get();
  Scheduler.Worker worker = scheduler2.createWorker();
  AtomicInteger count = new AtomicInteger();
  AtomicReference<Disposable> sub = new AtomicReference<>();
  sub.set(worker.schedulePeriodically(() -> {
    if (count.getAndIncrement() == 0) {
      sub.get().dispose();
    }
  }, 0, 5, MILLISECONDS));
  Thread.sleep(60);
  assertEquals(1, count.get());
}
 
@Test public void scheduledWorkUnsubscribedReportsIdle() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new CountingRunnable()).dispose();
  assertIdle(1);
}
 
@Test public void scheduleWithZeroDelayReportsBusy() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new CountingRunnable(), 0, SECONDS);
  assertBusy();
}
 
@Test public void scheduleWithNonZeroDelayReportsIdle() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new CountingRunnable(), 1, SECONDS);
  assertIdle(0);
}
 
@Test public void schedulePeriodicallyWithZeroDelayReportsBusy() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedulePeriodically(new CountingRunnable(), 0, 1, SECONDS);
  assertBusy();
}
 
@Test public void schedulePeriodicallyWithNonZeroDelayReportsIdle() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedulePeriodically(new CountingRunnable(), 1, 1, SECONDS);
  assertIdle(0);
}
 
@Test public void unsubscribingScheduledWorksReportsIdle() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new CountingRunnable());
  worker.dispose();
  assertIdle(1);
}
 
@Test public void scheduleWorkAfterUnsubscribedReportsIdle() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.dispose();
  worker.schedule(new CountingRunnable());
  assertIdle(0);
}
 
@Test public void finishingWorkWithoutRegisteredCallbackDoesNotCrash() {
  IdlingResourceScheduler scheduler = Rx2Idler.wrap(delegate, "Bob");
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new CountingRunnable());
  delegate.triggerActions();
}
 
 方法所在类