下面列出了怎么用io.reactivex.functions.Predicate的API类实例代码及写法,或者点击链接到github查看源代码。
SelectionCheckerBase(Location location, String selectionString, String[] selectionArgs)
{
_location = location;
if (selectionString != null)
{
String[] filtNames = selectionString.split(" ");
int i = 0;
for (String filtName : filtNames)
{
if (selectionArgs == null || i >= selectionArgs.length)
break;
Predicate<CachedPathInfo> f = getFilter(filtName, selectionArgs[i++]);
//if (f == null)
// throw new IllegalArgumentException("Unsupported search filter: " + filtName);
//else
if(f!=null)
_filters.add(f);
}
}
}
@SuppressLint("CheckResult")
public static Disposable loop(long period, final OnRxLoopListener listener){
return Observable.interval(period, TimeUnit.MILLISECONDS)
.takeWhile(new Predicate<Long>() {
@Override
public boolean test(Long aLong) throws Exception {
return listener.takeWhile();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<Long>() {
@Override
public void onNext(Long l) {
listener.onExecute();
}
@Override
public void onComplete() {
listener.onFinish();
}
@Override
public void onError(Throwable e) {
listener.onError(e);
}
});
}
@Override
public @NonNull
Predicate<PageLifecycleEventInfo> pageloadPredicate() {
return new Predicate<PageLifecycleEventInfo>() {
@Override
public boolean test(PageLifecycleEventInfo info) throws Exception {
if (info.currentEvent.lifecycleEvent.isSystemLifecycle()) {
return getPageLifecycleEventCostTime(info) > 2000;
}
if ((info.currentEvent.lifecycleEvent == ActivityLifecycleEvent.ON_DRAW
|| info.currentEvent.lifecycleEvent == FragmentLifecycleEvent.ON_DRAW)) {
return getPageLifecycleEventCostTime(info) > 2000;
}
if ((info.currentEvent.lifecycleEvent == ActivityLifecycleEvent.ON_LOAD
|| info.currentEvent.lifecycleEvent == FragmentLifecycleEvent.ON_LOAD)) {
return getPageLifecycleEventCostTime(info) > 8000;
}
return false;
}
};
}
private Observable<Integer> filterWithIfThenMapNullableContainer(
Observable<NullableContainer<String>> observable) {
return observable
.filter(
new Predicate<NullableContainer<String>>() {
@Override
public boolean test(NullableContainer<String> container) throws Exception {
if (container.get() != null) {
return true;
} else {
return false;
}
}
})
.map(
new Function<NullableContainer<String>, Integer>() {
@Override
public Integer apply(NullableContainer<String> c) throws Exception {
return c.get().length();
}
});
}
private Observable<Integer> filterThenMapNullableContainerMergesReturns(
Observable<NullableContainer<String>> observable) {
return observable
.filter(
new Predicate<NullableContainer<String>>() {
@Override
public boolean test(NullableContainer<String> container) throws Exception {
if (perhaps() && container.get() != null) {
return true;
} else {
return (container.get() != null);
}
}
})
.map(
new Function<NullableContainer<String>, Integer>() {
@Override
public Integer apply(NullableContainer<String> c) throws Exception {
return c.get().length();
}
});
}
private Observable<NullableContainer<String>> filterThenDistinctUntilChanged(
Observable<NullableContainer<String>> observable) {
return observable
.filter(
new Predicate<NullableContainer<String>>() {
@Override
public boolean test(NullableContainer<String> container) throws Exception {
return container.get() != null;
}
})
.distinctUntilChanged(
new BiPredicate<NullableContainer<String>, NullableContainer<String>>() {
@Override
public boolean test(NullableContainer<String> nc1, NullableContainer<String> nc2) {
return nc1.get().length() == nc2.get().length()
&& nc1.get().contains(nc2.get())
&& nc2.get().contains(nc1.get());
}
});
}
private Maybe<Integer> testMaybe(Maybe<NullableContainer<String>> maybe) {
return maybe
.filter(
new Predicate<NullableContainer<String>>() {
@Override
public boolean test(NullableContainer<String> container) throws Exception {
return container.get() != null;
}
})
.map(
new Function<NullableContainer<String>, Integer>() {
@Override
public Integer apply(NullableContainer<String> c) throws Exception {
return c.get().length();
}
});
}
private Observable<Integer> filterWithIfThenMapNullableContainerNullableOnSomeBranch(
Observable<NullableContainer<String>> observable) {
return observable
.filter(
new Predicate<NullableContainer<String>>() {
@Override
public boolean test(NullableContainer<String> container) throws Exception {
if (container.get() != null) {
return true;
} else {
return perhaps();
}
}
})
.map(
new Function<NullableContainer<String>, Integer>() {
@Override
public Integer apply(NullableContainer<String> c) throws Exception {
// BUG: Diagnostic contains: dereferenced expression
return c.get().length();
}
});
}
private Observable<Integer> filterWithIfThenMapNullableContainerNullableOnSomeBranchAnyOrder(
Observable<NullableContainer<String>> observable) {
return observable
.filter(
new Predicate<NullableContainer<String>>() {
@Override
public boolean test(NullableContainer<String> container) throws Exception {
if (container.get() == null) {
return perhaps();
} else {
return true;
}
}
})
.map(
new Function<NullableContainer<String>, Integer>() {
@Override
public Integer apply(NullableContainer<String> c1) throws Exception {
// BUG: Diagnostic contains: dereferenced expression
return c1.get().length();
}
});
}
public Disposable observeResultsForFragment(final String fragmentId, Consumer<ResultResponse> consumer) {
return publishSubject
.filter(new Predicate<ResultResponse>() {
@Override
public boolean test(ResultResponse resultResponse) throws Exception {
return resultResponse.fragmentId.equals(fragmentId);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
}
});
}
@Test
public void onAppStartEndSuccess() {
try {
GodEye.instance().uninstall();
GodEye.instance().install(GodEyeConfig.noneConfigBuilder().withStartupConfig(new StartupConfig()).build());
StartupInfo startupInfo = new StartupInfo(StartupInfo.StartUpType.COLD, 3000);
GodEyeHelper.onAppStartEnd(startupInfo);
TestObserver testObserver = GodEye.instance().<Startup, StartupInfo>moduleObservable(GodEye.ModuleName.STARTUP).test();
testObserver.assertValue(new Predicate<StartupInfo>() {
@Override
public boolean test(StartupInfo o) throws Exception {
return startupInfo.startupType.equals(o.startupType) && startupInfo.startupTime == o.startupTime;
}
});
} catch (Throwable e) {
fail();
}
}
public Maybe<Trip> getTrip(final String tripId) {
return getTrips()
.toObservable()
.flatMap(new Function<List<Trip>, ObservableSource<? extends Trip>>() {
@Override
public ObservableSource<? extends Trip> apply(List<Trip> tripList) throws Exception {
return Observable.fromIterable(tripList);
}
})
.filter(new Predicate<Trip>() {
@Override
public boolean test(Trip trip) throws Exception {
return trip.getId().equals(tripId);
}
})
.singleElement();
}
public <CLASS> Observable<Getter<CLASS>> onGet(final Class<CLASS> theClass) {
return onEvent(AskedEvent.class)//I wait for an event (askevent) of CLASS
.filter(new Predicate<AskedEvent>() {
@Override
public boolean test(@NonNull AskedEvent askedEvent) throws Exception {
return askedEvent.askedObject.equals(theClass);
}
})
.map(new Function<AskedEvent, Getter<CLASS>>() {
@Override
public Getter<CLASS> apply(@NonNull AskedEvent o) throws Exception {
return new Getter<CLASS>() {
//then I send to the listener a Getter (interface)
//when the getter is notified, the value is sent to the first subscrier
//who called the method `get`
@Override
public void get(CLASS value) {
post(value); //the value is published on the bus
}
};
}
});
}
public static <T, E> LifecycleTransformer<T> bindToLifeCycle(Observable<E> lifecycle,
final Function<E, E> correspondingEvents) {
Observable<E> lifecycleCopy = lifecycle.share();
return new LifecycleTransformer<>(Observable.combineLatest(lifecycle.take(1).map(correspondingEvents),
lifecycleCopy.skip(1),
new BiFunction<E, E, Boolean>() {
@Override
public Boolean apply(E e, E e2) throws Exception {
return e.equals(e2);
}
}).filter(new Predicate<Boolean>() {
@Override
public boolean test(Boolean cmpResult) throws Exception {
return cmpResult;
}
}));
}
public void register(final int interval, @NonNull final OnTickListener onTickListener, boolean intermediate) {
Disposable disposable = this.mIntervalObservable.delaySubscription(intermediate ? 0 : interval * mInterval, TimeUnit.MILLISECONDS).skipWhile(
new Predicate<Long>() {
@Override
public boolean test(Long aLong) throws Exception {
return pause;
}
}).observeOn(
AndroidSchedulers.mainThread()).subscribe(new Consumer<Long>() {
@Override
public void accept(Long count) throws Exception {
Log.d("RxTimerSupportTest", "accept " + " value " + count);
if (count % interval == 0) {
if (onTickListener != null) {
onTickListener.onTick();
}
}
}
});
tickCache.put(onTickListener, disposable);
}
@Test
public void registerCustomSubscriberWithFilter() throws Exception {
CustomSubscriber<TestEvent> subscriber = bus
.obtainSubscriber(TestEvent.class, testEventConsumer)
.withFilter(new Predicate<TestEvent>() {
@Override
public boolean test(TestEvent testEvent) throws Exception {
return testEvent == TestEvent.TWO;
}
});
bus.registerSubscriber(observer, subscriber);
bus.post(TestEvent.ONE);
verify(testEventConsumer, never()).accept(TestEvent.ONE);
bus.post(TestEvent.TWO);
verify(testEventConsumer).accept(TestEvent.TWO);
bus.unregister(observer);
}
@Test
public void work2() {
((TestScheduler) ThreadUtil.computationScheduler()).advanceTimeBy(5, TimeUnit.SECONDS);
try {
TestObserver<List<ThreadInfo>> subscriber = new TestObserver<>();
GodEye.instance().<ThreadDump, List<ThreadInfo>>moduleObservable(GodEye.ModuleName.THREAD).subscribe(subscriber);
subscriber.assertValue(new Predicate<List<ThreadInfo>>() {
@Override
public boolean test(List<ThreadInfo> threadInfos) throws Exception {
return threadInfos != null && !threadInfos.isEmpty();
}
});
} catch (UninstallException e) {
Assert.fail();
}
}
@Override
public void subscribeEvent() {
super.subscribeEvent();
addSubscriber(
RxBus.getInstance().toObservable(StatusBarEvent.class)
.compose(RxUtils.switchSchedulers())
.subscribe(statusBarEvent -> mView.setStatusBarColor(statusBarEvent.isSet()))
);
addSubscriber(
RxBus.getInstance().toObservable(UpdataEvent.class)
.filter(new Predicate<UpdataEvent>() {
@Override
public boolean test(UpdataEvent updataEvent) throws Exception {
return updataEvent.isMain();
}
})
.subscribe(updataEvent -> mView.upDataVersion())
);
addSubscriber(
RxBus.getInstance().toObservable(OpenBrowseEvent.class)
.subscribe(openBrowseEvent -> mView.showOpenBrowseDialog())
);
addSubscriber(
RxBus.getInstance().toObservable(InstallApkEvent.class)
.subscribe(installApkEvent -> mView.installApk())
);
}
@Override
public void subscribeEvent() {
super.subscribeEvent();
addSubscriber(
RxBus.getInstance().toObservable(StatusBarEvent.class)
.compose(RxUtils.switchSchedulers())
.subscribe(statusBarEvent -> mView.setStatusBarColor(statusBarEvent.isSet()))
);
addSubscriber(
RxBus.getInstance().toObservable(UpdataEvent.class)
.filter(new Predicate<UpdataEvent>() {
@Override
public boolean test(UpdataEvent updataEvent) throws Exception {
return updataEvent.isMain() == false;
}
})
.subscribe(updataEvent -> mView.upDataVersion())
);
addSubscriber(
RxBus.getInstance().toObservable(ClearCacheEvent.class)
.subscribe(clearCacheEvent -> mView.clearCache())
);
addSubscriber(
RxBus.getInstance().toObservable(LanguageEvent.class)
.subscribe(languageEvent -> {
mModel.setSelectedLanguage(languageEvent.getLanguage());
mView.handleLanguage();
})
);
}
@Override
public boolean test(CachedPathInfo cachedPathInfo) throws Exception
{
for (Predicate<CachedPathInfo> pc : _filters)
if (!pc.test(cachedPathInfo))
return false;
return true;
}
private Predicate<CachedPathInfo> getFilter(String filtName, String arg)
{
for (SearchFilter f : getAllFilters())
if (f.getName().equals(filtName))
return f.getChecker(_location, arg);
return null;
}
@Override
public @NonNull
Predicate<List<CrashInfo>> crashPredicate() {
return new Predicate<List<CrashInfo>>() {
@Override
public boolean test(List<CrashInfo> info) throws Exception {
return info != null && !info.isEmpty();
}
};
}
@Override
public <T> Observable<CacheResult<T>> execute(RxCache rxCache, String key, long time, Observable<T> source, Type type) {
Observable<CacheResult<T>> cache = loadCache(rxCache, type, key, time, true);
Observable<CacheResult<T>> remote = loadRemote(rxCache, key, source, false);
return Observable.concat(cache, remote)
.filter(new Predicate<CacheResult<T>>() {
@Override
public boolean test(@NonNull CacheResult<T> tCacheResult) throws Exception {
return tCacheResult != null && tCacheResult.data != null;
}
});
}
/**
* @param <T> The type of {@link ActivityLifecycleEvent} subclass you want.
* @param clazz The {@link ActivityLifecycleEvent} subclass you want.
* @return an observable of this activity's lifecycle events.
*/
public <T extends ActivityLifecycleEvent> Observable<T> lifecycle(final Class<T> clazz) {
return lifecycle()
.filter(
new Predicate<ActivityLifecycleEvent>() {
@Override
public boolean test(ActivityLifecycleEvent activityEvent) throws Exception {
return clazz.isAssignableFrom(activityEvent.getClass());
}
})
.cast(clazz);
}
private static <T> boolean predtest(Predicate<T> f, T val) {
try {
return f.test(val);
} catch (Exception e) {
return false;
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
// new thread for operator
Observable.fromCallable(new Callable() {
@Override
public Object call() throws Exception {
// TODO Auto-generated method stub
System.out.println("Observable thread is:-" + Thread.currentThread().getName());
return 5;
}
}).observeOn(Schedulers.newThread()).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer value) throws Exception {
// TODO Auto-generated method stub
System.out.println("operator thread is:-" + Thread.currentThread().getName());
return value > 10;
}
}).defaultIfEmpty(1).observeOn(Schedulers.newThread()).subscribe(item -> {
System.out.println("Subscriber thread is:-" + Thread.currentThread().getName());
System.out.println("items emitted :-"+item);
});
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) {
// TODO Auto-generated method stub
// new thread for operator
Observable.fromCallable(new Callable() {
@Override
public Object call() throws Exception {
// TODO Auto-generated method stub
System.out.println("Observable thread is:-" + Thread.currentThread().getName());
return 5;
}
}).observeOn(Schedulers.newThread()).filter(new Predicate<Integer>() {
@Override
public boolean test(Integer value) throws Exception {
// TODO Auto-generated method stub
System.out.println("operator thread is:-" + Thread.currentThread().getName());
return value > 10;
}
}).defaultIfEmpty(1).subscribe(item -> {
System.out.println("Subscriber thread is:-" + Thread.currentThread().getName());
System.out.println("items emitted :-"+item);
});
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public @NonNull
Predicate<ImageIssue> imageCanaryPredicate() {
return new Predicate<ImageIssue>() {
@Override
public boolean test(ImageIssue info) throws Exception {
return info.issueType != ImageIssue.IssueType.NONE;
}
};
}
/**
* @param <T> The type of {@link ActivityCallbackEvent} subclass you want.
* @param clazz The {@link ActivityCallbackEvent} subclass you want.
* @return an observable of this activity's callbacks events.
*/
public <T extends ActivityCallbackEvent> Observable<T> callbacks(final Class<T> clazz) {
return callbacks()
.filter(
new Predicate<ActivityCallbackEvent>() {
@Override
public boolean test(ActivityCallbackEvent activityCallbackEvent) throws Exception {
return clazz.isAssignableFrom(activityCallbackEvent.getClass());
}
})
.cast(clazz);
}
@NonNull
private Predicate<StockUpdate> containsAnyOfKeywords(String[] trackingKeywords) {
return stockUpdate -> {
for (String keyword : trackingKeywords) {
if (stockUpdate.getTwitterStatus().contains(keyword)) {
return true;
}
}
return false;
};
}