下面列出了io.reactivex.Flowable#filter ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public <T> Publisher<CacheResult<T>> flow(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<CacheResult<T>> cache = RxCacheHelper.loadCacheFlowable(rxCache, key, type, true);
cache = cache.filter(new Predicate<CacheResult<T>>() {
@Override
public boolean test(CacheResult<T> tCacheResult) throws Exception {
return System.currentTimeMillis() - tCacheResult.getTimestamp() <= milliSecond;
}
});
Flowable<CacheResult<T>> remote;
if (isSync) {
remote = RxCacheHelper.loadRemoteSyncFlowable(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
} else {
remote = RxCacheHelper.loadRemoteFlowable(rxCache, key, source, CacheTarget.MemoryAndDisk, false);
}
return cache.switchIfEmpty(remote);
}
/**
* This is an example of "conditional micro fusion" where use use a source that supports fusion:
* {@link Flowable#range(int, int)} with an intermediate operator which supports transitive
* fusion: {@link Flowable#filter(Predicate)}.
*
* <p>We are looking for the assembly trace context to be visible, but specifically inside
* {@link ConditionalSubscriber#tryOnNext(Object)}, as if we wired things correctly, this will be
* called instead of {@link Subscriber#onNext(Object)}.
*/
@Test(expected = AssertionError.class)
public void conditionalMicroFusion() {
Flowable<Integer> fuseable;
try (Scope scope1 = currentTraceContext.newScope(assemblyContext)) {
// we want the fitering to occur in the assembly context
fuseable = Flowable.just(1);
assertThat(fuseable).isInstanceOf(ScalarCallable.class);
}
// proves the assembly context is retained even after it is no longer in scope
// TODO: this lies as if you debug this you'll notice it isn't fusing with upstream
fuseable = fuseable.filter(i -> {
assertInAssemblyContext();
return i < 3;
});
ConditionalTestSubscriber<Integer> testSubscriber = new ConditionalTestSubscriber<>();
try (Scope scope2 = currentTraceContext.newScope(subscribeContext)) {
// subscribing in a different scope shouldn't affect the assembly context
fuseable.subscribe(testSubscriber);
}
testSubscriber.assertValues(1).assertNoErrors();
}
@Override
public <T> Flowable<Tx<T>> get(ResultSetMapper<? extends T> function) {
Flowable<Tx<T>> o = createFlowable(selectBuilder, function, db);
if (valuesOnly) {
return o.filter(tx -> tx.isValue());
} else {
return o;
}
}
public Flowable<Tx<T>> get() {
Flowable<Tx<T>> o = createFlowable(selectBuilder, db);
if (valuesOnly) {
return o.filter(tx -> tx.isValue());
} else {
return o;
}
}
public Flowable<Tx<Integer>> counts() {
Flowable<Tx<Integer>> o = createFlowable(updateBuilder, db);
if (valuesOnly) {
return o.filter(tx -> tx.isValue());
} else {
return o;
}
}
@NonNull
public static Flowable<Changes> applyForTablesAndTags(
@NonNull Flowable<Changes> changes,
@NonNull Set<String> tables,
@NonNull Set<String> tags
) {
checkNotNull(tables, "Set of tables can not be null");
checkNotNull(tags, "Set of tags can not be null");
return changes
.filter(new ChangesFilter(tables, tags));
}
@NonNull
public static Flowable<Changes> applyForTables(@NonNull Flowable<Changes> changes, @NonNull Set<String> tables) {
checkNotNull(tables, "Set of tables can not be null");
return changes
.filter(new ChangesFilter(tables, null));
}
@NonNull
public static Flowable<Changes> applyForTags(@NonNull Flowable<Changes> changes, @NonNull Set<String> tags) {
checkNotNull(tags, "Set of tags can not be null");
return changes
.filter(new ChangesFilter(null, tags));
}