下面列出了io.reactivex.Observable#concatMap ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* On XMap (ex {@code just(1).concatMap(..}, the source scalar callable is not passed as an input
* to the subsequent operator like {@code ObservableScalarXMap.ScalarXMapObservable}. What is
* passed is the result of {@link ScalarCallable#call()}.
*
* <p>Usually, this would result in lost tracking of the assembled context. However, we use a
* thread local to stash the context between {@link ScalarCallable#call()} and the next {@link
* RxJavaPlugins#onAssembly assembly hook}.
*
* @see ObservableScalarXMap#scalarXMap - references to this are operators which require stashing
*/
@Test(expected = AssertionError.class)
public void observable_scalarCallable_propagatesContextOnXMap() {
Observable<Integer> fuseable;
try (Scope scope1 = currentTraceContext.newScope(assemblyContext)) {
fuseable = Observable.just(1);
assertThat(fuseable).isInstanceOf(ScalarCallable.class);
}
// eventhough upstream is assembled with XMap, we still inherit the fused context.
fuseable = fuseable.concatMap(Observable::just);
assertXMapFusion(fuseable).test().assertValues(1).assertNoErrors();
}
/**
* Same as {@link #observable_scalarCallable_propagatesContextOnXMap()}, except for Flowable.
*
* @see FlowableScalarXMap#scalarXMap - references of this will break when assembly
*/
@Test(expected = AssertionError.class)
public void flowable_scalarCallable_propagatesContextOnXMap() {
Observable<Integer> fuseable;
try (Scope scope1 = currentTraceContext.newScope(assemblyContext)) {
fuseable = Observable.just(1);
assertThat(fuseable).isInstanceOf(ScalarCallable.class);
}
// eventhough upstream is assembled with XMap, we still inherit the fused context.
fuseable = fuseable.concatMap(Observable::just);
assertXMapFusion(fuseable).test().assertValues(1).assertNoErrors();
}