io.reactivex.Flowable#filter ( )源码实例Demo

下面列出了io.reactivex.Flowable#filter ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: RxCache   文件: FirstCacheTimeoutStrategy.java
@Override
public <T> Publisher<CacheResult<T>> flow(RxCache rxCache, String key, Flowable<T> source, Type type) {
    Flowable<CacheResult<T>> cache = RxCacheHelper.loadCacheFlowable(rxCache, key, type, true);
    cache = cache.filter(new Predicate<CacheResult<T>>() {
        @Override
        public boolean test(CacheResult<T> tCacheResult) throws Exception {
            return System.currentTimeMillis() - tCacheResult.getTimestamp() <= milliSecond;
        }
    });
    Flowable<CacheResult<T>> remote;
    if (isSync) {
        remote = RxCacheHelper.loadRemoteSyncFlowable(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
    } else {
        remote = RxCacheHelper.loadRemoteFlowable(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
    }
    return cache.switchIfEmpty(remote);
}
 
源代码2 项目: brave   文件: NotYetSupportedTest.java
/**
 * This is an example of "conditional micro fusion" where use use a source that supports fusion:
 * {@link Flowable#range(int, int)} with an intermediate operator which supports transitive
 * fusion: {@link Flowable#filter(Predicate)}.
 *
 * <p>We are looking for the assembly trace context to be visible, but specifically inside
 * {@link ConditionalSubscriber#tryOnNext(Object)}, as if we wired things correctly, this will be
 * called instead of {@link Subscriber#onNext(Object)}.
 */
@Test(expected = AssertionError.class)
public void conditionalMicroFusion() {
  Flowable<Integer> fuseable;
  try (Scope scope1 = currentTraceContext.newScope(assemblyContext)) {
    // we want the fitering to occur in the assembly context
    fuseable = Flowable.just(1);
    assertThat(fuseable).isInstanceOf(ScalarCallable.class);
  }

  // proves the assembly context is retained even after it is no longer in scope
  // TODO: this lies as if you debug this you'll notice it isn't fusing with upstream
  fuseable = fuseable.filter(i -> {
    assertInAssemblyContext();
    return i < 3;
  });

  ConditionalTestSubscriber<Integer> testSubscriber = new ConditionalTestSubscriber<>();
  try (Scope scope2 = currentTraceContext.newScope(subscribeContext)) {
    // subscribing in a different scope shouldn't affect the assembly context
    fuseable.subscribe(testSubscriber);
  }

  testSubscriber.assertValues(1).assertNoErrors();
}
 
源代码3 项目: rxjava2-jdbc   文件: TransactedSelectBuilder.java
@Override
public <T> Flowable<Tx<T>> get(ResultSetMapper<? extends T> function) {
    Flowable<Tx<T>> o = createFlowable(selectBuilder, function, db);
    if (valuesOnly) {
        return o.filter(tx -> tx.isValue());
    } else {
        return o;
    }
}
 
public Flowable<Tx<T>> get() {
    Flowable<Tx<T>> o = createFlowable(selectBuilder, db);
    if (valuesOnly) {
        return o.filter(tx -> tx.isValue());
    } else {
        return o;
    }
}
 
源代码5 项目: rxjava2-jdbc   文件: TransactedUpdateBuilder.java
public Flowable<Tx<Integer>> counts() {
    Flowable<Tx<Integer>> o = createFlowable(updateBuilder, db);
    if (valuesOnly) {
        return o.filter(tx -> tx.isValue());
    } else {
        return o;
    }
}
 
源代码6 项目: storio   文件: ChangesFilter.java
@NonNull
public static Flowable<Changes> applyForTablesAndTags(
        @NonNull Flowable<Changes> changes,
        @NonNull Set<String> tables,
        @NonNull Set<String> tags
) {
    checkNotNull(tables, "Set of tables can not be null");
    checkNotNull(tags, "Set of tags can not be null");
    return changes
            .filter(new ChangesFilter(tables, tags));
}
 
源代码7 项目: storio   文件: ChangesFilter.java
@NonNull
public static Flowable<Changes> applyForTables(@NonNull Flowable<Changes> changes, @NonNull Set<String> tables) {
    checkNotNull(tables, "Set of tables can not be null");
    return changes
            .filter(new ChangesFilter(tables, null));
}
 
源代码8 项目: storio   文件: ChangesFilter.java
@NonNull
public static Flowable<Changes> applyForTags(@NonNull Flowable<Changes> changes, @NonNull Set<String> tags) {
    checkNotNull(tags, "Set of tags can not be null");
    return changes
            .filter(new ChangesFilter(null, tags));
}