类io.reactivex.parallel.ParallelFlowable源码实例Demo

下面列出了怎么用io.reactivex.parallel.ParallelFlowable的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: rxjava-RxLife   文件: ParallelFlowableLife.java
@SuppressWarnings("unchecked")
public void subscribe(@NonNull Subscriber<? super T>[] subscribers) {
    if (!validate(subscribers)) {
        return;
    }

    int n = subscribers.length;

    Subscriber<? super T>[] parents = new Subscriber[n];

    for (int i = 0; i < n; i++) {
        Subscriber<? super T> a = subscribers[i];
        if (a instanceof ConditionalSubscriber) {
            parents[i] = new LifeConditionalSubscriber<>((ConditionalSubscriber<? super T>) a, scope);
        } else {
            parents[i] = new LifeSubscriber<>(a, scope);
        }
    }
    ParallelFlowable<T> upStream = this.upStream;
    if (onMain) upStream = upStream.runOn(AndroidSchedulers.mainThread());
    upStream.subscribe(parents);
}
 
@Test public void parallelFlowable_conditional_assembleInScope_subscribeNoScope() {
  ParallelFlowable<Integer> source, errorSource;
  try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
    source = Flowable.range(1, 3).parallel()
      .filter(lessThanThreeInAssemblyContext)
      .doOnNext(e -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext);
    errorSource = Flowable.<Integer>concat(
      Flowable.error(new IllegalStateException()), Flowable.error(new IllegalStateException()))
      .parallel()
      .filter(lessThanThreeInAssemblyContext)
      .doOnError(t -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext);
  }

  subscribeInNoContext(source, errorSource).assertResult(1, 2);
}
 
@Test public void parallelFlowable_conditional_assembleInScope_subscribeInScope() {
  ParallelFlowable<Integer> source, errorSource;
  try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
    source = Flowable.range(1, 3).parallel()
      .filter(lessThanThreeInAssemblyContext)
      .doOnNext(e -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext);
    errorSource = Flowable.<Integer>concat(
      Flowable.error(new IllegalStateException()), Flowable.error(new IllegalStateException()))
      .parallel()
      .filter(lessThanThreeInAssemblyContext)
      .doOnError(t -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext);
  }

  subscribeInDifferentContext(source, errorSource).assertResult(1, 2);
}
 
源代码4 项目: Learn-Java-12-Programming   文件: Scheduler.java
private static void parallel3(){

        ParallelFlowable src = Flowable.fromArray("one","two","three").parallel();
        src.runOn(Schedulers.computation())
           .doAfterNext(s -> System.out.println("1: " + Thread.currentThread().getName() + " => " + s))
           .flatMap(w -> Flowable.fromArray(((String)w).split("")))
           .runOn(Schedulers.computation())
           .doAfterNext(s -> System.out.println("2: " + Thread.currentThread().getName() + " => " + s))
           .sequential()
           .subscribe(s -> System.out.println("3: " + s));

        pauseMs(100);

    }
 
源代码5 项目: rxjava2   文件: ParallelFlowableRange.java
public static void main(String[] args) {

        int numberOfRails = 4; // can query #processors with parallelism()

        ParallelFlowable
            .from(Flowable.range(1, 10), numberOfRails)
            .runOn(Schedulers.computation())
            .map(i -> i * i)
            .filter(i -> i % 3 == 0)
            .sequential()
            .subscribe(System.out::println);
    }
 
源代码6 项目: akarnokd-misc   文件: ParallelPerf.java
@Setup
public void setup() {
    flowable = ParallelFlowable.from(Flowable.range(0, count)).runOn(Schedulers.computation())
    .filter(v -> { Blackhole.consumeCPU(cost); return false; })
    .sequential();

    flowableFJ = ParallelFlowable.from(Flowable.range(0, count))
            .runOn(Schedulers.from(ForkJoinPool.commonPool()))
    .filter(v -> { Blackhole.consumeCPU(cost); return false; })
    .sequential();
}
 
@Test public void parallelFlowable_assembleInScope_subscribeNoScope() {
  ParallelFlowable<Integer> source, errorSource;
  try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
    source = Flowable.range(1, 3).parallel()
      .doOnNext(e -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext);
    errorSource = Flowable.<Integer>concat(
      Flowable.error(new IllegalStateException()), Flowable.error(new IllegalStateException()))
      .parallel()
      .doOnError(t -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext);
  }

  subscribeInNoContext(source, errorSource).assertResult(1, 2, 3);
}
 
@Test public void parallelFlowable_assembleInScope_subscribeInScope() {
  ParallelFlowable<Integer> source, errorSource;
  try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
    source = Flowable.range(1, 3).parallel()
      .doOnNext(e -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext);
    errorSource = Flowable.<Integer>concat(
      Flowable.error(new IllegalStateException()), Flowable.error(new IllegalStateException()))
      .parallel()
      .doOnError(t -> assertInAssemblyContext())
      .doOnComplete(this::assertInAssemblyContext);
  }

  subscribeInDifferentContext(source, errorSource).assertResult(1, 2, 3);
}
 
@Test public void parallelFlowable_assembleNoScope_subscribeInScope() {
  ParallelFlowable<Integer> source = Flowable.range(1, 3).parallel()
    .doOnNext(e -> assertInSubscribeContext())
    .doOnComplete(this::assertInSubscribeContext);
  ParallelFlowable<Integer> errorSource = Flowable.<Integer>concat(
    Flowable.error(new IllegalStateException()), Flowable.error(new IllegalStateException()))
    .parallel()
    .doOnError(t -> assertInSubscribeContext())
    .doOnComplete(this::assertInSubscribeContext);

  subscribeInDifferentContext(source, errorSource).assertResult(1, 2, 3);
}
 
@Test public void parallelFlowable_conditional_assembleNoScope_subscribeInScope() {
  ParallelFlowable<Integer> source = Flowable.range(1, 3).parallel()
    .filter(lessThanThreeInSubscribeContext)
    .doOnNext(e -> assertInSubscribeContext())
    .doOnComplete(this::assertInSubscribeContext);
  ParallelFlowable<Integer> errorSource = Flowable.<Integer>concat(
    Flowable.error(new IllegalStateException()), Flowable.error(new IllegalStateException()))
    .parallel()
    .filter(lessThanThreeInSubscribeContext)
    .doOnError(t -> assertInSubscribeContext())
    .doOnComplete(this::assertInSubscribeContext);

  subscribeInDifferentContext(source, errorSource).assertResult(1, 2);
}
 
TestObserver<Integer> subscribeInNoContext(ParallelFlowable<Integer> source,
  ParallelFlowable<Integer> errorSource) {
  source = source.doOnSubscribe(s -> assertInNoContext());
  errorSource = errorSource.doOnSubscribe(s -> assertInNoContext());

  errorSource.sequential().test().assertFailure(IllegalStateException.class);
  return source.sequential().toObservable().test();
}
 
TestObserver<Integer> subscribeInDifferentContext(ParallelFlowable<Integer> source,
  ParallelFlowable<Integer> errorSource) {
  source = source.doOnSubscribe(s -> assertInSubscribeContext());
  errorSource = errorSource.doOnSubscribe(s -> assertInSubscribeContext());

  try (Scope scope2 = currentTraceContext.newScope(subscribeContext)) {
    errorSource.sequential().test().assertFailure(IllegalStateException.class);
    return source.sequential().toObservable().test();
  }
}
 
源代码13 项目: rxjava-RxLife   文件: ParallelFlowableLife.java
ParallelFlowableLife(ParallelFlowable<T> upStream, Scope scope, boolean onMain) {
    this.upStream = upStream;
    this.scope = scope;
    this.onMain = onMain;
}
 
源代码14 项目: RxJava2Debug   文件: ParallelFlowableOnAssembly.java
ParallelFlowableOnAssembly(ParallelFlowable<T> source) {
    this.source = source;
    this.assembled = new RxJavaAssemblyException();
}
 
源代码15 项目: RxJava2Debug   文件: RxJava2AssemblyTrackingTest.java
static ParallelFlowable<Integer> createParallelFlowable() {
    return Flowable.range(1, 5).concatWith(Flowable.<Integer>error(new IOException())).parallel();
}
 
源代码16 项目: armeria   文件: RequestContextParallelFlowable.java
RequestContextParallelFlowable(ParallelFlowable<T> source, RequestContext assemblyContext) {
    this.source = source;
    this.assemblyContext = assemblyContext;
}
 
源代码17 项目: brave   文件: TraceContextParallelFlowable.java
TraceContextParallelFlowable(
  ParallelFlowable<T> source, CurrentTraceContext contextScoper, TraceContext assembled) {
  this.source = source;
  this.contextScoper = contextScoper;
  this.assembled = assembled;
}
 
源代码18 项目: brave   文件: Wrappers.java
public static <T> ParallelFlowable<T> wrap(
  ParallelFlowable<T> source, CurrentTraceContext contextScoper, TraceContext assembled) {
  return new TraceContextParallelFlowable<>(source, contextScoper, assembled);
}
 
 类所在包
 类方法
 同包方法