下面列出了怎么用io.reactivex.functions.BiPredicate的API类实例代码及写法,或者点击链接到github查看源代码。
private Observable<NullableContainer<String>> filterThenDistinctUntilChanged(
Observable<NullableContainer<String>> observable) {
return observable
.filter(
new Predicate<NullableContainer<String>>() {
@Override
public boolean test(NullableContainer<String> container) throws Exception {
return container.get() != null;
}
})
.distinctUntilChanged(
new BiPredicate<NullableContainer<String>, NullableContainer<String>>() {
@Override
public boolean test(NullableContainer<String> nc1, NullableContainer<String> nc2) {
return nc1.get().length() == nc2.get().length()
&& nc1.get().contains(nc2.get())
&& nc2.get().contains(nc1.get());
}
});
}
public static <T, R> BiPredicate<T, R> alwaysTrue() {
// TODO make holder
return new BiPredicate<T, R>() {
@Override
public boolean test(T t1, R t2) throws Exception {
return true;
}
};
}
public static <T, R> BiPredicate<T, R> alwaysFalse() {
// TODO make holder
return new BiPredicate<T, R>() {
@Override
public boolean test(T t1, R t2) throws Exception {
return false;
}
};
}
public static <T, R> BiPredicate<T, R> throwing() {
// TODO make holder
return new BiPredicate<T, R>() {
@Override
public boolean test(T t1, R t2) throws Exception {
throw new ThrowingException();
}
};
}
private TransformerStateMachine(Callable<? extends State> initialState,
Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
BackpressureStrategy backpressureStrategy, int requestBatchSize) {
Preconditions.checkNotNull(initialState);
Preconditions.checkNotNull(transition);
Preconditions.checkNotNull(completion);
Preconditions.checkNotNull(backpressureStrategy);
Preconditions.checkArgument(requestBatchSize > 0, "initialRequest must be greater than zero");
this.initialState = initialState;
this.transition = transition;
this.completion = completion;
this.backpressureStrategy = backpressureStrategy;
this.requestBatchSize = requestBatchSize;
}
public static <State, In, Out> FlowableTransformer<In, Out> create(Callable<? extends State> initialState,
Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
BackpressureStrategy backpressureStrategy, int requestBatchSize) {
return new TransformerStateMachine<State, In, Out>(initialState, transition, completion, backpressureStrategy,
requestBatchSize);
}
private static <State, Out, In> Function<Notification<In>, Flowable<Notification<Out>>> execute(
final Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
final BiPredicate<? super State, ? super FlowableEmitter<Out>> completion, final Mutable<State> state,
final BackpressureStrategy backpressureStrategy) {
return new Function<Notification<In>, Flowable<Notification<Out>>>() {
@Override
public Flowable<Notification<Out>> apply(final Notification<In> in) {
return Flowable.create(new FlowableOnSubscribe<Notification<Out>>() {
@Override
public void subscribe(FlowableEmitter<Notification<Out>> emitter) throws Exception {
FlowableEmitter<Out> w = wrap(emitter);
if (in.isOnNext()) {
state.value = transition.apply(state.value, in.getValue(), w);
if (!emitter.isCancelled())
emitter.onComplete();
else {
// this is a special emission to indicate that
// the transition called unsubscribe. It will be
// filtered later.
emitter.onNext(UnsubscribedNotificationHolder.<Out>unsubscribedNotification());
}
} else if (in.isOnComplete()) {
if (completion.test(state.value, w) && !emitter.isCancelled()) {
w.onComplete();
}
} else if (!emitter.isCancelled()) {
w.onError(in.getError());
}
}
}, backpressureStrategy);
}
};
}
public FlowableCollectWhile(Flowable<T> source, Callable<R> collectionFactory,
BiFunction<? super R, ? super T, ? extends R> add,
BiPredicate<? super R, ? super T> condition, boolean emitRemainder) {
super();
this.source = source;
this.collectionFactory = collectionFactory;
this.add = add;
this.condition = condition;
this.emitRemainder = emitRemainder;
}
CollectWhileSubscriber(Callable<R> collectionFactory,
BiFunction<? super R, ? super T, ? extends R> add,
BiPredicate<? super R, ? super T> condition, Subscriber<? super R> child,
boolean emitRemainder) {
this.collectionFactory = collectionFactory;
this.add = add;
this.condition = condition;
this.child = child;
this.emitRemainder = emitRemainder;
}
public static <State, In, Out> FlowableTransformer<In, Out> stateMachine(Callable<? extends State> initialState,
Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
BackpressureStrategy backpressureStrategy, int requestBatchSize) {
return TransformerStateMachine.create(initialState, transition, completion, backpressureStrategy,
requestBatchSize);
}
/**
* Buffers the source {@link Flowable} into {@link List}s, emitting Lists when
* the size of a list reaches {@code maxSize} or if the elapsed time since last
* emission from the source reaches the given duration.
*
* @param maxSize
* max size of emitted lists
* @param duration
* function that based on the last emission calculates the elapsed
* time to be used before emitting a buffered list
* @param unit
* unit of {@code duration}
* @param scheduler
* scheduler to use to schedule emission of a buffer (as a list) if
* the time since last emission from the source reaches duration
* @param <T>
* type of the source stream items
* @return source with operator applied
*/
public static <T> FlowableTransformer<T, List<T>> buffer(final int maxSize,
final Function<? super T, ? extends Long> duration, final TimeUnit unit, final Scheduler scheduler) {
final BiPredicate<List<T>, MyOptional<T>> condition = new BiPredicate<List<T>, MyOptional<T>>() {
@Override
public boolean test(List<T> list, MyOptional<T> x) throws Exception {
return list.size() < maxSize && x.isPresent();
}
};
Function<MyOptional<T>, Long> timeout = new Function<MyOptional<T>, Long>() {
@Override
public Long apply(MyOptional<T> t) throws Exception {
return duration.apply(t.get());
}
};
final FlowableTransformer<MyOptional<T>, MyOptional<T>> insert = insert(timeout, unit,
Functions.constant(MyOptional.<T>empty()), scheduler);
final FlowableTransformer<MyOptional<T>, List<T>> collectWhile = collectWhile( //
// factory
ListFactoryHolder.<T>factory(), //
// add function
MyOptional.<T>addIfPresent(), //
// condition
condition);
return new FlowableTransformer<T, List<T>>() {
@Override
public Publisher<List<T>> apply(Flowable<T> source) {
return source //
.map(MyOptional.<T>of()) //
.compose(insert) //
.compose(collectWhile)
// need this filter because sometimes nothing gets added to list
.filter(MyOptional.<T>listHasElements()); //
}
};
}
/**
* Returns an observable of the tic tac toe board. First value is provided immediately,
* succeeding values are guaranteed to be distinct from previous values. Values are
* always provided on the main thread.
*/
public Observable<Value[][]> grid() {
return grid.distinctUntilChanged(new BiPredicate<Value[][], Value[][]>() {
@Override public boolean test(Value[][] a, Value[][] b) throws Exception {
return Arrays.equals(a, b);
}
});
}
/**
* Returns an observable of the tic tac toe board. First value is provided immediately,
* succeeding values are guaranteed to be distinct from previous values. Values are
* always provided on the main thread.
*/
Observable<Value[][]> grid() {
return grid.distinctUntilChanged(new BiPredicate<Value[][], Value[][]>() {
@Override public boolean test(Value[][] a, Value[][] b) throws Exception {
return Arrays.equals(a, b);
}
});
}
public static <T, R> FlowableTransformer<T, R> collectWhile(final Callable<R> collectionFactory,
final BiFunction<? super R, ? super T, ? extends R> add,
final BiPredicate<? super R, ? super T> condition) {
return collectWhile(collectionFactory, add, condition, true);
}
public static <T> FlowableTransformer<T, List<T>> toListWhile(
final BiPredicate<? super List<T>, ? super T> condition, boolean emitRemainder) {
return collectWhile(ListFactoryHolder.<T>factory(), ListFactoryHolder.<T>add(), condition, emitRemainder);
}
public static <T> FlowableTransformer<T, List<T>> toListWhile(
final BiPredicate<? super List<T>, ? super T> condition) {
return toListWhile(condition, true);
}
public static <T> FlowableTransformer<T, List<T>> bufferWhile(
final BiPredicate<? super List<T>, ? super T> condition, boolean emitRemainder) {
return toListWhile(condition, emitRemainder);
}
public static <T> FlowableTransformer<T, List<T>> bufferWhile(
final BiPredicate<? super List<T>, ? super T> condition) {
return toListWhile(condition);
}
/**
* Returns a transformer that emits collections of items with the collection
* boundaries determined by the given {@link BiPredicate}.
*
* <p>
* <img src=
* "https://raw.githubusercontent.com/davidmoten/rxjava2-extras/master/src/docs/collectWhile.png"
* alt="image">
*
* @param collectionFactory
* factory to create a new collection
* @param add
* method to add an item to a collection
* @param condition
* while true will continue to add to the current collection. Do not
* modify the given collection!
* @param emitRemainder
* whether to emit the remainder as a collection
* @param <T>
* item type
* @param <R>
* collection type
* @return transform that collects while some conditions is returned then starts
* a new collection
*/
public static <T, R> FlowableTransformer<T, R> collectWhile(final Callable<R> collectionFactory,
final BiFunction<? super R, ? super T, ? extends R> add, final BiPredicate<? super R, ? super T> condition,
final boolean emitRemainder) {
return new FlowableTransformer<T, R>() {
@Override
public Publisher<R> apply(Flowable<T> source) {
return new FlowableCollectWhile<T, R>(source, collectionFactory, add, condition, emitRemainder);
}
};
}