类io.reactivex.internal.fuseable.ScalarCallable源码实例Demo

下面列出了怎么用io.reactivex.internal.fuseable.ScalarCallable的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: brave   文件: NotYetSupportedTest.java
/**
 * This is an example of "conditional micro fusion" where use use a source that supports fusion:
 * {@link Flowable#range(int, int)} with an intermediate operator which supports transitive
 * fusion: {@link Flowable#filter(Predicate)}.
 *
 * <p>We are looking for the assembly trace context to be visible, but specifically inside
 * {@link ConditionalSubscriber#tryOnNext(Object)}, as if we wired things correctly, this will be
 * called instead of {@link Subscriber#onNext(Object)}.
 */
@Test(expected = AssertionError.class)
public void conditionalMicroFusion() {
  Flowable<Integer> fuseable;
  try (Scope scope1 = currentTraceContext.newScope(assemblyContext)) {
    // we want the fitering to occur in the assembly context
    fuseable = Flowable.just(1);
    assertThat(fuseable).isInstanceOf(ScalarCallable.class);
  }

  // proves the assembly context is retained even after it is no longer in scope
  // TODO: this lies as if you debug this you'll notice it isn't fusing with upstream
  fuseable = fuseable.filter(i -> {
    assertInAssemblyContext();
    return i < 3;
  });

  ConditionalTestSubscriber<Integer> testSubscriber = new ConditionalTestSubscriber<>();
  try (Scope scope2 = currentTraceContext.newScope(subscribeContext)) {
    // subscribing in a different scope shouldn't affect the assembly context
    fuseable.subscribe(testSubscriber);
  }

  testSubscriber.assertValues(1).assertNoErrors();
}
 
源代码2 项目: brave   文件: NotYetSupportedTest.java
/**
 * On XMap (ex {@code just(1).concatMap(..}, the source scalar callable is not passed as an input
 * to the subsequent operator like {@code ObservableScalarXMap.ScalarXMapObservable}. What is
 * passed is the result of {@link ScalarCallable#call()}.
 *
 * <p>Usually, this would result in lost tracking of the assembled context. However, we use a
 * thread local to stash the context between {@link ScalarCallable#call()} and the next {@link
 * RxJavaPlugins#onAssembly assembly hook}.
 *
 * @see ObservableScalarXMap#scalarXMap - references to this are operators which require stashing
 */
@Test(expected = AssertionError.class)
public void observable_scalarCallable_propagatesContextOnXMap() {
  Observable<Integer> fuseable;
  try (Scope scope1 = currentTraceContext.newScope(assemblyContext)) {
    fuseable = Observable.just(1);
    assertThat(fuseable).isInstanceOf(ScalarCallable.class);
  }

  // eventhough upstream is assembled with XMap, we still inherit the fused context.
  fuseable = fuseable.concatMap(Observable::just);

  assertXMapFusion(fuseable).test().assertValues(1).assertNoErrors();
}
 
源代码3 项目: brave   文件: NotYetSupportedTest.java
/**
 * Same as {@link #observable_scalarCallable_propagatesContextOnXMap()}, except for Flowable.
 *
 * @see FlowableScalarXMap#scalarXMap - references of this will break when assembly
 */
@Test(expected = AssertionError.class)
public void flowable_scalarCallable_propagatesContextOnXMap() {
  Observable<Integer> fuseable;
  try (Scope scope1 = currentTraceContext.newScope(assemblyContext)) {
    fuseable = Observable.just(1);
    assertThat(fuseable).isInstanceOf(ScalarCallable.class);
  }

  // eventhough upstream is assembled with XMap, we still inherit the fused context.
  fuseable = fuseable.concatMap(Observable::just);

  assertXMapFusion(fuseable).test().assertValues(1).assertNoErrors();
}
 
@SuppressWarnings("unchecked")
@Override
public Object call() {
    return ((ScalarCallable<Object>)source).call();
}
 
@SuppressWarnings("unchecked")
@Override
public T call() {
    return ((ScalarCallable<T>)source).call();
}
 
@SuppressWarnings("unchecked")
@Override
public T call() {
    return ((ScalarCallable<T>)source).call();
}
 
@SuppressWarnings("unchecked")
@Override
public T call() {
    return ((ScalarCallable<T>)source).call();
}
 
@SuppressWarnings("unchecked")
@Override
public T call() {
    return ((ScalarCallable<T>) source).call();
}
 
@SuppressWarnings("unchecked")
@Override
public T call() {
    return ((ScalarCallable<T>) source).call();
}
 
@SuppressWarnings("unchecked")
@Override
public T call() {
    return ((ScalarCallable<T>) source).call();
}
 
@SuppressWarnings("unchecked")
@Override
public T call() {
    return ((ScalarCallable<T>) source).call();
}
 
@SuppressWarnings("unchecked")
@Override
public T call() {
    return ((ScalarCallable<T>) source).call();
}
 
 类所在包
 同包方法