类io.reactivex.rxjava3.core.Scheduler源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.core.Scheduler的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
 * stream as a {@link Completable} every time it receives an effect from the upstream effects
 * observable. This Completable will be subscribed on the specified {@link Scheduler}. This will
 * result in calling the provided Action on the specified scheduler every time an effect is
 * dispatched to the created effect transformer.
 *
 * @param doEffect the {@link Action} to be run every time the effect is requested
 * @param scheduler the {@link Scheduler} that the action should be run on
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     RxMobius.SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromAction(
    final Action doEffect, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream
          .flatMapCompletable(
              new Function<F, CompletableSource>() {
                @Override
                public CompletableSource apply(F f) throws Exception {
                  return scheduler == null
                      ? Completable.fromAction(doEffect)
                      : Completable.fromAction(doEffect).subscribeOn(scheduler);
                }
              })
          .toObservable();
    }
  };
}
 
源代码2 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
 * the stream as a {@link Completable} every time it receives an effect from the upstream effects
 * observable. This will result in calling the consumer on the specified scheduler, and passing it
 * the requested effect object.
 *
 * @param doEffect the {@link Consumer} to be run every time the effect is requested
 * @param scheduler the {@link Scheduler} to be used when invoking the consumer
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     RxMobius.SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromConsumer(
    final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream
          .flatMapCompletable(
              new Function<F, CompletableSource>() {
                @Override
                public CompletableSource apply(final F effect) {
                  Completable completable =
                      Completable.fromAction(
                          new Action() {
                            @Override
                            public void run() throws Throwable {
                              doEffect.accept(effect);
                            }
                          });
                  return scheduler == null ? completable : completable.subscribeOn(scheduler);
                }
              })
          .toObservable();
    }
  };
}
 
源代码3 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Function} into
 * the stream as an {@link Observable} every time it receives an effect from the upstream effects
 * observable. This will result in calling the function on the specified scheduler, and passing it
 * the requested effect object then emitting its returned value.
 *
 * @param function the {@link Function} to be invoked every time the effect is requested
 * @param scheduler the {@link Scheduler} to be used when invoking the function
 * @param <F> the type of Effect this transformer handles
 * @param <E> the type of Event this transformer emits
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     RxMobius.SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromFunction(
    final Function<F, E> function, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream.flatMap(
          new Function<F, ObservableSource<E>>() {
            @Override
            public ObservableSource<E> apply(@NonNull F f) {
              Observable<E> eventObservable =
                  Observable.fromSupplier(
                      new Supplier<E>() {
                        @Override
                        public E get() throws Throwable {
                          return function.apply(f);
                        }
                      });
              return scheduler == null ? eventObservable : eventObservable.subscribeOn(scheduler);
            }
          });
    }
  };
}
 
源代码4 项目: RxRelay   文件: ReplayRelay.java
SizeAndTimeBoundReplayBuffer(int maxSize, long maxAge, TimeUnit unit, Scheduler scheduler) {
    if (maxSize <= 0) {
        throw new IllegalArgumentException("maxSize > 0 required but it was " + maxSize);
    }
    if (maxAge <= 0) {
        throw new IllegalArgumentException("maxAge > 0 required but it was " + maxAge);
    }
    if (unit == null) throw new NullPointerException("unit == null");
    if (scheduler == null) throw new NullPointerException("scheduler == null");
    this.maxSize = maxSize;
    this.maxAge = maxAge;
    this.unit = unit;
    this.scheduler = scheduler;
    TimedNode<T> h = new TimedNode<T>(null, 0L);
    this.tail = h;
    this.head = h;
}
 
源代码5 项目: RxRelay   文件: ReplayRelay.java
SizeAndTimeBoundReplayBuffer(int maxSize, long maxAge, TimeUnit unit, Scheduler scheduler) {
    if (maxSize <= 0) {
        throw new IllegalArgumentException("maxSize > 0 required but it was " + maxSize);
    }
    if (maxAge <= 0) {
        throw new IllegalArgumentException("maxAge > 0 required but it was " + maxAge);
    }
    if (unit == null) throw new NullPointerException("unit == null");
    if (scheduler == null) throw new NullPointerException("scheduler == null");
    this.maxSize = maxSize;
    this.maxAge = maxAge;
    this.unit = unit;
    this.scheduler = scheduler;
    TimedNode<T> h = new TimedNode<T>(null, 0L);
    this.tail = h;
    this.head = h;
}
 
源代码6 项目: RxIdler   文件: Rx3Idler.java
/**
 * Returns a function which wraps the supplied {@link Scheduler} in one which notifies Espresso as
 * to whether it is currently executing work or not.
 * <p>
 * Note: Work scheduled in the future does not mark the idling resource as busy.
 */
@SuppressWarnings("ConstantConditions") // Public API guarding.
@CheckResult @NonNull
public static Function<Supplier<Scheduler>, Scheduler> create(@NonNull final String name) {
  if (name == null) throw new NullPointerException("name == null");
  return delegate -> {
    IdlingResourceScheduler scheduler =
        new DelegatingIdlingResourceScheduler(delegate.get(), name);
    IdlingRegistry.getInstance().register(scheduler);
    return scheduler;
  };
}
 
源代码7 项目: RxIdler   文件: Rx3Idler.java
/**
 * Wraps the supplied {@link Scheduler} into one which also implements {@link IdlingResource}.
 * You must {@linkplain IdlingRegistry#register(IdlingResource...) register} the
 * returned instance with Espresso before it will be used. Only work scheduled on the returned
 * instance directly will be registered.
 */
@SuppressWarnings("ConstantConditions") // Public API guarding.
@CheckResult @NonNull
public static IdlingResourceScheduler wrap(@NonNull Scheduler scheduler, @NonNull String name) {
  if (scheduler == null) throw new NullPointerException("scheduler == null");
  if (name == null) throw new NullPointerException("name == null");
  return new DelegatingIdlingResourceScheduler(scheduler, name);
}
 
@Test public void betweenPeriodicSchedulesReportsIdle() {
  Scheduler.Worker worker = scheduler.createWorker();
  CountingRunnable action = new CountingRunnable();
  worker.schedulePeriodically(action, 0, 1, SECONDS);
  delegate.triggerActions();
  assertEquals(1, action.count());
  delegate.advanceTimeBy(500, MILLISECONDS);
  assertIdle(1);
  delegate.advanceTimeBy(1000, MILLISECONDS);
  assertIdle(2);
}
 
@Test public void unsubscribingScheduledWorkWhileRunningWorkReportsBusy() {
  final Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(() -> {
    worker.dispose();
    assertBusy();
  });
  delegate.triggerActions();
}
 
源代码10 项目: code-examples   文件: ReactiveBatchProcessorV2.java
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
  return Schedulers.from(new ThreadPoolExecutor(
      poolSize,
      poolSize,
      0L,
      TimeUnit.SECONDS,
      new LinkedBlockingDeque<>(queueSize)
  ));
}
 
源代码11 项目: code-examples   文件: ReactiveBatchProcessorV3.java
public void start() {
  // WARNING: this code doesn't work as expected
  Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);

  messageSource.getMessageBatches()
      .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
      .doOnNext(batch -> logger.log(batch.toString()))
      .flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
      .flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
          .subscribeOn(scheduler))
      .subscribeWith(new SimpleSubscriber<>(threads, 1));
}
 
源代码12 项目: code-examples   文件: ReactiveBatchProcessorV3.java
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
  return Schedulers.from(new ThreadPoolExecutor(
      poolSize,
      poolSize,
      0L,
      TimeUnit.SECONDS,
      new LinkedBlockingDeque<>(queueSize)
  ));
}
 
源代码13 项目: code-examples   文件: ReactiveBatchProcessor.java
void start() {

    Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);

    messageSource.getMessageBatches()
        .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
        .doOnNext(batch -> logger.log(batch.toString()))
        .flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
        .flatMapSingle(m -> Single.defer(() -> Single.just(m)
            .map(messageHandler::handleMessage))
            .subscribeOn(scheduler))
        .subscribeWith(new SimpleSubscriber<>(threads, 1));
  }
 
源代码14 项目: code-examples   文件: ReactiveBatchProcessor.java
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
  return Schedulers.from(new ThreadPoolExecutor(
      poolSize,
      poolSize,
      0L,
      TimeUnit.SECONDS,
      new LinkedBlockingDeque<>(queueSize),
      new WaitForCapacityPolicy()
  ));
}
 
源代码15 项目: code-examples   文件: ReactiveBatchProcessorV1.java
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
  return Schedulers.from(new ThreadPoolExecutor(
      poolSize,
      poolSize,
      0L,
      TimeUnit.SECONDS,
      new LinkedBlockingDeque<>(queueSize)
  ));
}
 
源代码16 项目: mobius   文件: SchedulerWorkRunner.java
public SchedulerWorkRunner(@NonNull Scheduler scheduler) {
  this.worker = checkNotNull(scheduler).createWorker();
}
 
源代码17 项目: catnip   文件: Catnip.java
@Nonnull
@CheckReturnValue
default Scheduler rxScheduler() {
    return options().rxScheduler();
}
 
DelegatingIdlingResourceScheduler(Scheduler delegate, String name) {
  this.delegate = delegate;
  this.name = name;
}
 
@Test public void scheduledWorkReportsBusy() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new CountingRunnable());
  assertBusy();
}
 
@Test public void scheduledWorkUnsubscribedReportsIdle() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new CountingRunnable()).dispose();
  assertIdle(1);
}
 
@Test public void scheduleWithZeroDelayReportsBusy() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new CountingRunnable(), 0, SECONDS);
  assertBusy();
}
 
@Test public void scheduleWithNonZeroDelayReportsIdle() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new CountingRunnable(), 1, SECONDS);
  assertIdle(0);
}
 
@Test public void schedulePeriodicallyWithZeroDelayReportsBusy() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedulePeriodically(new CountingRunnable(), 0, 1, SECONDS);
  assertBusy();
}
 
@Test public void schedulePeriodicallyWithNonZeroDelayReportsIdle() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedulePeriodically(new CountingRunnable(), 1, 1, SECONDS);
  assertIdle(0);
}
 
@Test public void runningWorkReportsBusy() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(this::assertBusy);
  delegate.triggerActions();
}
 
@Test public void unsubscribingScheduledWorksReportsIdle() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new CountingRunnable());
  worker.dispose();
  assertIdle(1);
}
 
@Test public void scheduleWorkAfterUnsubscribedReportsIdle() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.dispose();
  worker.schedule(new CountingRunnable());
  assertIdle(0);
}
 
@Test public void finishingWorkWithoutRegisteredCallbackDoesNotCrash() {
  IdlingResourceScheduler scheduler = Rx3Idler.wrap(delegate, "Bob");
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new CountingRunnable());
  delegate.triggerActions();
}
 
源代码29 项目: RxRelay   文件: BehaviorRelayTest.java
@Test
@Ignore("OOMs")
public void testEmissionSubscriptionRace() throws Exception {
    Scheduler s = Schedulers.io();
    Scheduler.Worker worker = Schedulers.io().createWorker();
    try {
        for (int i = 0; i < 50000; i++) {
            if (i % 1000 == 0) {
                System.out.println(i);
            }
            final BehaviorRelay<Object> rs = BehaviorRelay.create();

            final CountDownLatch finish = new CountDownLatch(1);
            final CountDownLatch start = new CountDownLatch(1);

            worker.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        start.await();
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                    rs.accept(1);
                }
            });

            final AtomicReference<Object> o = new AtomicReference<Object>();

            rs.subscribeOn(s).observeOn(Schedulers.io())
            .subscribe(new DefaultObserver<Object>() {

                @Override
                public void onComplete() {
                    o.set(-1);
                    finish.countDown();
                }

                @Override
                public void onError(Throwable e) {
                    o.set(e);
                    finish.countDown();
                }

                @Override
                public void onNext(Object t) {
                    o.set(t);
                    finish.countDown();
                }

            });
            start.countDown();

            if (!finish.await(5, TimeUnit.SECONDS)) {
                System.out.println(o.get());
                System.out.println(rs.hasObservers());
                fail("Timeout @ " + i);
                break;
            } else {
                Assert.assertEquals(1, o.get());
            }
        }
    } finally {
        worker.dispose();
    }
}
 
源代码30 项目: RxRelay   文件: ReplayRelayConcurrencyTest.java
@Test
public void testReplayRelayEmissionSubscriptionRace() throws Exception {
    Scheduler s = Schedulers.io();
    Scheduler.Worker worker = Schedulers.io().createWorker();
    try {
        for (int i = 0; i < 50000; i++) {
            if (i % 1000 == 0) {
                System.out.println(i);
            }
            final ReplayRelay<Object> rs = ReplayRelay.create();

            final CountDownLatch finish = new CountDownLatch(1);
            final CountDownLatch start = new CountDownLatch(1);

            worker.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        start.await();
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                    rs.accept(1);
                }
            });

            final AtomicReference<Object> o = new AtomicReference<Object>();

            rs.subscribeOn(s).observeOn(Schedulers.io())
            .subscribe(new DefaultObserver<Object>() {

                @Override
                public void onComplete() {
                    o.set(-1);
                    finish.countDown();
                }

                @Override
                public void onError(Throwable e) {
                    o.set(e);
                    finish.countDown();
                }

                @Override
                public void onNext(Object t) {
                    o.set(t);
                    finish.countDown();
                }

            });
            start.countDown();

            if (!finish.await(5, TimeUnit.SECONDS)) {
                System.out.println(o.get());
                System.out.println(rs.hasObservers());
                Assert.fail("Timeout @ " + i);
                break;
            } else {
                Assert.assertEquals(1, o.get());
            }
        }
    } finally {
        worker.dispose();
    }
}
 
 类所在包
 类方法
 同包方法