下面列出了怎么用io.reactivex.parallel.ParallelFlowable的API类实例代码及写法,或者点击链接到github查看源代码。
@SuppressWarnings("unchecked")
public void subscribe(@NonNull Subscriber<? super T>[] subscribers) {
if (!validate(subscribers)) {
return;
}
int n = subscribers.length;
Subscriber<? super T>[] parents = new Subscriber[n];
for (int i = 0; i < n; i++) {
Subscriber<? super T> a = subscribers[i];
if (a instanceof ConditionalSubscriber) {
parents[i] = new LifeConditionalSubscriber<>((ConditionalSubscriber<? super T>) a, scope);
} else {
parents[i] = new LifeSubscriber<>(a, scope);
}
}
ParallelFlowable<T> upStream = this.upStream;
if (onMain) upStream = upStream.runOn(AndroidSchedulers.mainThread());
upStream.subscribe(parents);
}
@Test public void parallelFlowable_conditional_assembleInScope_subscribeNoScope() {
ParallelFlowable<Integer> source, errorSource;
try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
source = Flowable.range(1, 3).parallel()
.filter(lessThanThreeInAssemblyContext)
.doOnNext(e -> assertInAssemblyContext())
.doOnComplete(this::assertInAssemblyContext);
errorSource = Flowable.<Integer>concat(
Flowable.error(new IllegalStateException()), Flowable.error(new IllegalStateException()))
.parallel()
.filter(lessThanThreeInAssemblyContext)
.doOnError(t -> assertInAssemblyContext())
.doOnComplete(this::assertInAssemblyContext);
}
subscribeInNoContext(source, errorSource).assertResult(1, 2);
}
@Test public void parallelFlowable_conditional_assembleInScope_subscribeInScope() {
ParallelFlowable<Integer> source, errorSource;
try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
source = Flowable.range(1, 3).parallel()
.filter(lessThanThreeInAssemblyContext)
.doOnNext(e -> assertInAssemblyContext())
.doOnComplete(this::assertInAssemblyContext);
errorSource = Flowable.<Integer>concat(
Flowable.error(new IllegalStateException()), Flowable.error(new IllegalStateException()))
.parallel()
.filter(lessThanThreeInAssemblyContext)
.doOnError(t -> assertInAssemblyContext())
.doOnComplete(this::assertInAssemblyContext);
}
subscribeInDifferentContext(source, errorSource).assertResult(1, 2);
}
private static void parallel3(){
ParallelFlowable src = Flowable.fromArray("one","two","three").parallel();
src.runOn(Schedulers.computation())
.doAfterNext(s -> System.out.println("1: " + Thread.currentThread().getName() + " => " + s))
.flatMap(w -> Flowable.fromArray(((String)w).split("")))
.runOn(Schedulers.computation())
.doAfterNext(s -> System.out.println("2: " + Thread.currentThread().getName() + " => " + s))
.sequential()
.subscribe(s -> System.out.println("3: " + s));
pauseMs(100);
}
public static void main(String[] args) {
int numberOfRails = 4; // can query #processors with parallelism()
ParallelFlowable
.from(Flowable.range(1, 10), numberOfRails)
.runOn(Schedulers.computation())
.map(i -> i * i)
.filter(i -> i % 3 == 0)
.sequential()
.subscribe(System.out::println);
}
@Setup
public void setup() {
flowable = ParallelFlowable.from(Flowable.range(0, count)).runOn(Schedulers.computation())
.filter(v -> { Blackhole.consumeCPU(cost); return false; })
.sequential();
flowableFJ = ParallelFlowable.from(Flowable.range(0, count))
.runOn(Schedulers.from(ForkJoinPool.commonPool()))
.filter(v -> { Blackhole.consumeCPU(cost); return false; })
.sequential();
}
@Test public void parallelFlowable_assembleInScope_subscribeNoScope() {
ParallelFlowable<Integer> source, errorSource;
try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
source = Flowable.range(1, 3).parallel()
.doOnNext(e -> assertInAssemblyContext())
.doOnComplete(this::assertInAssemblyContext);
errorSource = Flowable.<Integer>concat(
Flowable.error(new IllegalStateException()), Flowable.error(new IllegalStateException()))
.parallel()
.doOnError(t -> assertInAssemblyContext())
.doOnComplete(this::assertInAssemblyContext);
}
subscribeInNoContext(source, errorSource).assertResult(1, 2, 3);
}
@Test public void parallelFlowable_assembleInScope_subscribeInScope() {
ParallelFlowable<Integer> source, errorSource;
try (Scope scope = currentTraceContext.newScope(assemblyContext)) {
source = Flowable.range(1, 3).parallel()
.doOnNext(e -> assertInAssemblyContext())
.doOnComplete(this::assertInAssemblyContext);
errorSource = Flowable.<Integer>concat(
Flowable.error(new IllegalStateException()), Flowable.error(new IllegalStateException()))
.parallel()
.doOnError(t -> assertInAssemblyContext())
.doOnComplete(this::assertInAssemblyContext);
}
subscribeInDifferentContext(source, errorSource).assertResult(1, 2, 3);
}
@Test public void parallelFlowable_assembleNoScope_subscribeInScope() {
ParallelFlowable<Integer> source = Flowable.range(1, 3).parallel()
.doOnNext(e -> assertInSubscribeContext())
.doOnComplete(this::assertInSubscribeContext);
ParallelFlowable<Integer> errorSource = Flowable.<Integer>concat(
Flowable.error(new IllegalStateException()), Flowable.error(new IllegalStateException()))
.parallel()
.doOnError(t -> assertInSubscribeContext())
.doOnComplete(this::assertInSubscribeContext);
subscribeInDifferentContext(source, errorSource).assertResult(1, 2, 3);
}
@Test public void parallelFlowable_conditional_assembleNoScope_subscribeInScope() {
ParallelFlowable<Integer> source = Flowable.range(1, 3).parallel()
.filter(lessThanThreeInSubscribeContext)
.doOnNext(e -> assertInSubscribeContext())
.doOnComplete(this::assertInSubscribeContext);
ParallelFlowable<Integer> errorSource = Flowable.<Integer>concat(
Flowable.error(new IllegalStateException()), Flowable.error(new IllegalStateException()))
.parallel()
.filter(lessThanThreeInSubscribeContext)
.doOnError(t -> assertInSubscribeContext())
.doOnComplete(this::assertInSubscribeContext);
subscribeInDifferentContext(source, errorSource).assertResult(1, 2);
}
TestObserver<Integer> subscribeInNoContext(ParallelFlowable<Integer> source,
ParallelFlowable<Integer> errorSource) {
source = source.doOnSubscribe(s -> assertInNoContext());
errorSource = errorSource.doOnSubscribe(s -> assertInNoContext());
errorSource.sequential().test().assertFailure(IllegalStateException.class);
return source.sequential().toObservable().test();
}
TestObserver<Integer> subscribeInDifferentContext(ParallelFlowable<Integer> source,
ParallelFlowable<Integer> errorSource) {
source = source.doOnSubscribe(s -> assertInSubscribeContext());
errorSource = errorSource.doOnSubscribe(s -> assertInSubscribeContext());
try (Scope scope2 = currentTraceContext.newScope(subscribeContext)) {
errorSource.sequential().test().assertFailure(IllegalStateException.class);
return source.sequential().toObservable().test();
}
}
ParallelFlowableLife(ParallelFlowable<T> upStream, Scope scope, boolean onMain) {
this.upStream = upStream;
this.scope = scope;
this.onMain = onMain;
}
ParallelFlowableOnAssembly(ParallelFlowable<T> source) {
this.source = source;
this.assembled = new RxJavaAssemblyException();
}
static ParallelFlowable<Integer> createParallelFlowable() {
return Flowable.range(1, 5).concatWith(Flowable.<Integer>error(new IOException())).parallel();
}
RequestContextParallelFlowable(ParallelFlowable<T> source, RequestContext assemblyContext) {
this.source = source;
this.assemblyContext = assemblyContext;
}
TraceContextParallelFlowable(
ParallelFlowable<T> source, CurrentTraceContext contextScoper, TraceContext assembled) {
this.source = source;
this.contextScoper = contextScoper;
this.assembled = assembled;
}
public static <T> ParallelFlowable<T> wrap(
ParallelFlowable<T> source, CurrentTraceContext contextScoper, TraceContext assembled) {
return new TraceContextParallelFlowable<>(source, contextScoper, assembled);
}