io.reactivex.Observable#concatMap ( )源码实例Demo

下面列出了io.reactivex.Observable#concatMap ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: brave   文件: NotYetSupportedTest.java
/**
 * On XMap (ex {@code just(1).concatMap(..}, the source scalar callable is not passed as an input
 * to the subsequent operator like {@code ObservableScalarXMap.ScalarXMapObservable}. What is
 * passed is the result of {@link ScalarCallable#call()}.
 *
 * <p>Usually, this would result in lost tracking of the assembled context. However, we use a
 * thread local to stash the context between {@link ScalarCallable#call()} and the next {@link
 * RxJavaPlugins#onAssembly assembly hook}.
 *
 * @see ObservableScalarXMap#scalarXMap - references to this are operators which require stashing
 */
@Test(expected = AssertionError.class)
public void observable_scalarCallable_propagatesContextOnXMap() {
  Observable<Integer> fuseable;
  try (Scope scope1 = currentTraceContext.newScope(assemblyContext)) {
    fuseable = Observable.just(1);
    assertThat(fuseable).isInstanceOf(ScalarCallable.class);
  }

  // eventhough upstream is assembled with XMap, we still inherit the fused context.
  fuseable = fuseable.concatMap(Observable::just);

  assertXMapFusion(fuseable).test().assertValues(1).assertNoErrors();
}
 
源代码2 项目: brave   文件: NotYetSupportedTest.java
/**
 * Same as {@link #observable_scalarCallable_propagatesContextOnXMap()}, except for Flowable.
 *
 * @see FlowableScalarXMap#scalarXMap - references of this will break when assembly
 */
@Test(expected = AssertionError.class)
public void flowable_scalarCallable_propagatesContextOnXMap() {
  Observable<Integer> fuseable;
  try (Scope scope1 = currentTraceContext.newScope(assemblyContext)) {
    fuseable = Observable.just(1);
    assertThat(fuseable).isInstanceOf(ScalarCallable.class);
  }

  // eventhough upstream is assembled with XMap, we still inherit the fused context.
  fuseable = fuseable.concatMap(Observable::just);

  assertXMapFusion(fuseable).test().assertValues(1).assertNoErrors();
}