下面列出了怎么用io.reactivex.functions.BiFunction的API类实例代码及写法,或者点击链接到github查看源代码。
private void clickAsyncZip() {
TestApis testApis = RetrofitManager.INSTANCE.getRetrofit(Constants.GANK_API_URL).create(TestApis.class);
Observable<TSSCRes<TSSCResult>> o1 = testApis.getTangShiSongCi();
Observable<XHYRes<XHYResult>> o2 = testApis.getXHY();
Observable.zip(o1, o2, new BiFunction<TSSCRes<TSSCResult>, XHYRes<XHYResult>, Integer>() {
@Override
public Integer apply(TSSCRes<TSSCResult> tsscRes, XHYRes<XHYResult> xhyRes) throws Exception {
return tsscRes.getResult().size() + xhyRes.getResult().size();
}
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(new BaseObserver<Integer>() {
@Override
public void onError(ApiException exception) {
LogUtil.e(TAG, "error:" + exception.getMessage());
}
@Override
public void onSuccess(Integer count) {
LogUtil.e(TAG, "count = " + count);
}
}
);
}
private Observable<Boolean> getBooleanObservable(Observable<String> rootObservable, String leaf,
Observable<MerkleProofInfo> merkleTransactionObservable) {
BiFunction<String, MerkleProofInfo, Boolean> zipper = (root, merkleProofInfo) -> {
List<MerklePathItem> merklePath = merkleProofInfo.getMerklePath();
if (merklePath.isEmpty()) {
// Single item tree, so leaf = HRoot0
return leaf.equalsIgnoreCase(root);
}
// 1 is left
java.util.function.BiFunction<String, MerklePathItem, String> accumulator = (proofHash, pathItem) -> ConvertUtils
.toHex(Hashes
.sha3_256(ConvertUtils
.fromHexToBytes(
pathItem.getPosition() == Position.LEFT ? pathItem.getHash() + proofHash
: proofHash + pathItem.getHash())));
String hroot0 = merklePath.stream().reduce(leaf, accumulator, (s1, s2) -> s1);
return root.equalsIgnoreCase(hroot0);
};
return Observable.zip(rootObservable, merkleTransactionObservable, zipper).onErrorReturn((e) -> {
e.printStackTrace();
return false;
});
}
private Observable<TransactionFactory<? extends Transaction>> resolveTransactionFactory(
TransferTransaction transaction, ReceiptSource expectedReceiptSource) {
Observable<Statement> statementObservable = getStatement(transaction);
Observable<List<Mosaic>> resolvedMosaics = Observable
.fromIterable(transaction.getMosaics()).flatMap(
m -> getResolvedMosaic(transaction, m, statementObservable, expectedReceiptSource))
.toList().toObservable();
Observable<Address> resolvedRecipient = getResolvedAddress(transaction,
transaction.getRecipient(),
statementObservable, expectedReceiptSource);
BiFunction<Address, List<Mosaic>, TransferTransactionFactory> mergeFunction = (address, mosaics) ->
TransferTransactionFactory
.create(transaction.getNetworkType(), address, mosaics, transaction.getMessage());
return Observable.combineLatest(resolvedRecipient, resolvedMosaics, mergeFunction);
}
private Observable<TransactionFactory<? extends Transaction>> resolveTransactionFactory(
MosaicAddressRestrictionTransaction transaction,
ReceiptSource expectedReceiptSource) {
Observable<Statement> statementObservable = getStatement(transaction);
Observable<MosaicId> resolvedMosaicId = getResolvedMosaicId(transaction,
transaction.getMosaicId(),
statementObservable, expectedReceiptSource);
Observable<Address> resolvedTargetAddress = Observable
.just(transaction.getTargetAddress())
.flatMap(m -> getResolvedAddress(transaction, m, statementObservable,
expectedReceiptSource));
BiFunction<? super MosaicId, ? super Address, MosaicAddressRestrictionTransactionFactory> mapper = (mosaicId, targetAddress) ->
MosaicAddressRestrictionTransactionFactory
.create(transaction.getNetworkType(), mosaicId,
transaction.getRestrictionKey(), targetAddress,
transaction.getNewRestrictionValue())
.previousRestrictionValue(transaction.getPreviousRestrictionValue());
return Observable.combineLatest(resolvedMosaicId, resolvedTargetAddress, mapper);
}
private Observable<TransactionFactory<? extends Transaction>> resolveTransactionFactory(
AccountMosaicRestrictionTransaction transaction,
ReceiptSource expectedReceiptSource) {
Observable<Statement> statementObservable = getStatement(transaction);
Observable<List<UnresolvedMosaicId>> unresolvedAdditions = getResolvedMosaicIds(transaction,
transaction.getRestrictionAdditions(), statementObservable, expectedReceiptSource);
Observable<List<UnresolvedMosaicId>> unresolvedDeletions = getResolvedMosaicIds(transaction,
transaction.getRestrictionDeletions(), statementObservable, expectedReceiptSource);
BiFunction<List<UnresolvedMosaicId>, List<UnresolvedMosaicId>, TransactionFactory<AccountMosaicRestrictionTransaction>> mapper =
(additions, deletions) -> AccountMosaicRestrictionTransactionFactory
.create(transaction.getNetworkType(), transaction.getRestrictionFlags(), additions,
deletions);
return Observable.combineLatest(unresolvedAdditions, unresolvedDeletions, mapper);
}
private Observable<TransactionFactory<? extends Transaction>> resolveTransactionFactory(
AccountAddressRestrictionTransaction transaction,
ReceiptSource expectedReceiptSource) {
Observable<Statement> statementObservable = getStatement(transaction);
Observable<List<UnresolvedAddress>> unresolvedAdditions = getResolvedAddresses(transaction,
transaction.getRestrictionAdditions(), statementObservable, expectedReceiptSource);
Observable<List<UnresolvedAddress>> unresolvedDeletions = getResolvedAddresses(transaction,
transaction.getRestrictionDeletions(), statementObservable, expectedReceiptSource);
BiFunction<List<UnresolvedAddress>, List<UnresolvedAddress>, AccountAddressRestrictionTransactionFactory> mapper =
(additions, deletions) -> AccountAddressRestrictionTransactionFactory
.create(transaction.getNetworkType(), transaction.getRestrictionFlags(), additions,
deletions);
return Observable.combineLatest(unresolvedAdditions, unresolvedDeletions, mapper);
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
recyclerView = (RecyclerView) findViewById(R.id.recyclerview);
recyclerView.setLayoutManager(new LinearLayoutManager(getBaseContext()));
adapter = new Adapter();
recyclerView.setAdapter(adapter);
loadWithRetroJsoup();
Observable.zip(
Observable.just(""),
Observable.just("&"),
new BiFunction<String, String, String>(){
@Override
public String apply(@NonNull String s, @NonNull String s2) throws Exception {
return null;
}
}
);
}
public static <T, E> LifecycleTransformer<T> bindToLifeCycle(Observable<E> lifecycle,
final Function<E, E> correspondingEvents) {
Observable<E> lifecycleCopy = lifecycle.share();
return new LifecycleTransformer<>(Observable.combineLatest(lifecycle.take(1).map(correspondingEvents),
lifecycleCopy.skip(1),
new BiFunction<E, E, Boolean>() {
@Override
public Boolean apply(E e, E e2) throws Exception {
return e.equals(e2);
}
}).filter(new Predicate<Boolean>() {
@Override
public boolean test(Boolean cmpResult) throws Exception {
return cmpResult;
}
}));
}
/**
* Chains another step to be performed after this step completes. If the previous step results in
* an error and does not emit a new actionable item, future chained onStep calls will not be
* called.
*
* @param func to return the next step when this current step completes. This function will
* receive the result of the previous step and the next actionable item to take an action on.
* @param <TNewValueType> the value type returned by the next step.
* @param <TNewActionableItem> the actionable item type returned by the next step.
* @return a {@link Step} to chain more calls to.
*/
public <TNewValueType, TNewActionableItem extends ActionableItem>
Step<TNewValueType, TNewActionableItem> onStep(
final BiFunction<T, A, Step<TNewValueType, TNewActionableItem>> func) {
return new Step<>(
asObservable()
.flatMap(
new Function<
Optional<Data<T, A>>,
Observable<Optional<Data<TNewValueType, TNewActionableItem>>>>() {
@Override
public Observable<Optional<Data<TNewValueType, TNewActionableItem>>> apply(
Optional<Data<T, A>> dataOptional) throws Exception {
if (dataOptional.isPresent()) {
Data<T, A> data = dataOptional.get();
return func.apply(data.value, data.actionableItem).asObservable();
} else {
return Observable.just(
Optional.<Data<TNewValueType, TNewActionableItem>>absent());
}
}
})
.singleOrError());
}
public static <T, R extends Number> FlowableTransformer<T, Pair<T, Statistics>> collectStats(
final Function<? super T, ? extends R> function) {
return new FlowableTransformer<T, Pair<T, Statistics>>() {
@Override
public Flowable<Pair<T, Statistics>> apply(Flowable<T> source) {
return source.scan(Pair.create((T) null, Statistics.create()),
new BiFunction<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
@Override
public Pair<T, Statistics> apply(Pair<T, Statistics> pair, T t) throws Exception {
return Pair.create(t, pair.b().add(function.apply(t)));
}
}).skip(1);
}
};
}
/**
* 当 merge 和 zip 遇到 error 时改怎么办
*/
private void mergeAndZipOnError() {
Observable<String> fail = Observable.create(emitter -> emitter.onError(new Throwable("obviously error")));
Observable<Integer> success = Observable.create(emitter -> emitter.onNext(1));
Disposable d = Observable.merge(fail, success)
.subscribe((Consumer<Serializable>) serializable -> {
if (serializable instanceof String) {
}
}, throwable -> Log.e(TAG, "accept: merge error " + throwable.getMessage()));
d = Observable.zip(fail, success, (BiFunction<String, Integer, Object>)
(s, integer) -> new String("success")).subscribe(o -> { },
throwable -> Log.e(TAG, "accept: zip error " + throwable.getMessage()));
}
@Override
public Observable<?> apply(Observable<? extends Throwable> flowable) throws Exception {
return flowable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() {
@Override
public Wrapper apply(Throwable throwable, Integer integer) throws Exception {
return new Wrapper(throwable, integer);
}
}).flatMap(wrapper -> {
if ((wrapper.throwable instanceof ConnectException
|| wrapper.throwable instanceof SocketTimeoutException
|| wrapper.throwable instanceof TimeoutException)
&& wrapper.index < count + 1) {
return Observable.timer(delay + (wrapper.index - 1) * delay, TimeUnit.MILLISECONDS);
}
return Observable.error(wrapper.throwable);
} );
}
@Test
public void testElementsList() throws Exception {
List<TestObject> elements = new ArrayList<>();
elements.add(new TestObject(1, 9.5, "a"));
elements.add(new TestObject(2, 6.7, "ab"));
elements.add(new TestObject(3, 8.2, "abc"));
elements.add(new TestObject(4, 3.4, "abcd"));
elements.add(new TestObject(5, 6.5, "abcde"));
SQLite.get().insert(TestTable.TABLE, elements);
Observable.zip(RxSQLite.get().query(TestTable.TABLE),
Observable.just(elements), new BiFunction<List<TestObject>, List<TestObject>, Object>() {
@Override
public Object apply(List<TestObject> testElements, List<TestObject> savedElements) throws Exception {
assertEquals(testElements.size(), savedElements.size());
for (int i = 0; i < testElements.size(); i++) {
assertEquals(testElements.size(), savedElements.size());
}
return null;
}
})
.test();
}
public static void practice1(){
final Api api = RetrofitProvider.get().create(Api.class);
Observable<UserBaseInfoResponse> observable1 =
api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());
Observable<UserExtraInfoResponse> observable2 =
api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2,
new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
@Override
public UserInfo apply(UserBaseInfoResponse baseInfo,
UserExtraInfoResponse extraInfo) throws Exception {
return new UserInfo(baseInfo, extraInfo);
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<UserInfo>() {
@Override
public void accept(UserInfo userInfo) throws Exception {
//do something;
}
});
}
@Test
public void avgFunction() {
final double count = 8;
final Integer sum = Observable.fromIterable(insertSimpleAllValues((int) count))
.map(new Function<SimpleAllValuesMutable, Integer>() {
@Override
public Integer apply(SimpleAllValuesMutable v) {
return (int) v.primitiveShort;
}
})
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer v1, Integer v2) {
return v1 + v2;
}
})
.blockingGet();
final Double value = Select
.column(avg(SIMPLE_ALL_VALUES_MUTABLE.PRIMITIVE_SHORT))
.from(SIMPLE_ALL_VALUES_MUTABLE)
.takeFirst()
.execute();
assertThat(value).isEqualTo(sum.doubleValue() / count);
}
private void testZip() {
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer a, String b) throws Exception {
return a + b;
}
}).compose(RxUtil.applySchedulers()).subscribe(new Consumer<String>() {
@Override
public void accept(String o) throws Exception {
Log.d(TAG, o);
}
});
}
private void testZipAsync() {
observable1 = observable1.subscribeOn(Schedulers.io());
observable2 = observable2.subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer a, String b) throws Exception {
return a + b;
}
}).compose(RxUtil.applySchedulers()).subscribe(new Consumer<String>() {
@Override
public void accept(String o) throws Exception {
Log.d(TAG, o);
}
});
}
@Override
public Observable<?> apply(@NonNull Observable<? extends Throwable> observable) throws Exception {
return observable
.zipWith(Observable.range(1, count + 1), (BiFunction<Throwable, Integer, Wrapper>) Wrapper::new).flatMap((Function<Wrapper, ObservableSource<?>>) wrapper -> {
if ((wrapper.throwable instanceof ConnectException
|| wrapper.throwable instanceof SocketTimeoutException
|| wrapper.throwable instanceof TimeoutException)
&& wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);
}
return Observable.error(wrapper.throwable);
});
}
@Override
public Observable<?> apply(@NonNull Observable<? extends Throwable> observable) throws Exception {
return observable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() {
@Override
public Wrapper apply(@NonNull Throwable throwable, @NonNull Integer integer) throws Exception {
return new Wrapper(throwable, integer);
}
}).flatMap(new Function<Wrapper, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Wrapper wrapper) throws Exception {
if (wrapper.index > 1)
HttpLog.i("重试次数:" + (wrapper.index));
int errCode = 0;
if (wrapper.throwable instanceof ApiException) {
ApiException exception = (ApiException) wrapper.throwable;
errCode = exception.getCode();
}
if ((wrapper.throwable instanceof ConnectException
|| wrapper.throwable instanceof SocketTimeoutException
|| errCode == ApiException.ERROR.NETWORD_ERROR
|| errCode == ApiException.ERROR.TIMEOUT_ERROR
|| wrapper.throwable instanceof SocketTimeoutException
|| wrapper.throwable instanceof TimeoutException)
&& wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);
}
return Observable.error(wrapper.throwable);
}
});
}
static void testAnnoatedThirdParty() {
// BUG: Diagnostic contains: returning @Nullable expression from method with @NonNull return
Function<String, Object> f1 = (x) -> null; // io.reactivex.(Bi)Function is anotated
Function<String, Object> f2 =
(x) -> {
// BUG: Diagnostic contains: returning @Nullable expression from method with @NonNull
return null;
};
// BUG: Diagnostic contains: returning @Nullable expression from method with @NonNull return
BiFunction<String, String, Object> f3 = (x, y) -> null;
}
public <CLASS> Observable<CLASS> get(Class<CLASS> theClass) {
return Observable.zip(
onEvent(theClass),
postAsObservable(new AskedEvent(theClass)),
new BiFunction<CLASS, Object, CLASS>() {
@Override
public CLASS apply(@NonNull CLASS neededObject, @NonNull Object _useless) throws Exception {
return neededObject;
}
});
}
@Test
public void onStep_withASuccessFullFirstAction_shouldProperlyChainTheNextStep() {
Object returnValue = new Object();
final Object secondReturnValue = new Object();
TestObserver<Optional<Step.Data<Object, ActionableItem>>> testSubscriber = new TestObserver<>();
interactorLifecycleSubject.onNext(InteractorEvent.ACTIVE);
step.onStep(
new BiFunction<Object, ActionableItem, Step<Object, ActionableItem>>() {
@Override
public Step<Object, ActionableItem> apply(Object o, ActionableItem actionableItem) {
return Step.from(
Observable.just(new Step.Data<>(secondReturnValue, actionableItem))
.singleOrError());
}
})
.asObservable()
.subscribe(testSubscriber);
returnValueSubject.onNext(
Optional.of(
new Step.Data<Object, ActionableItem>(
returnValue,
new ActionableItem() {
@NonNull
@Override
public Observable<InteractorEvent> lifecycle() {
return interactorLifecycleSubject;
}
})));
returnValueSubject.onComplete();
testSubscriber.assertValueCount(1);
assertThat(testSubscriber.values().get(0).get().getValue()).isEqualTo(secondReturnValue);
testSubscriber.assertComplete();
testSubscriber.assertNoErrors();
}
@Test
public void onStep_withAnUnsuccessfulFirstAction_shouldTerminateTheWholeChain() {
TestObserver<Optional<Step.Data<Object, ActionableItem>>> testSubscriber = new TestObserver<>();
final Object secondReturnValue = new Object();
interactorLifecycleSubject.onNext(InteractorEvent.ACTIVE);
step.onStep(
new BiFunction<Object, ActionableItem, Step<Object, ActionableItem>>() {
@Override
public Step<Object, ActionableItem> apply(Object o, ActionableItem actionableItem) {
return Step.from(
Observable.just(new Step.Data<>(secondReturnValue, actionableItem))
.singleOrError());
}
})
.asObservable()
.subscribe(testSubscriber);
returnValueSubject.onNext(Optional.<Step.Data<Object, ActionableItem>>absent());
returnValueSubject.onComplete();
testSubscriber.assertValueCount(1);
assertThat(testSubscriber.values().get(0).isPresent()).isFalse();
testSubscriber.assertComplete();
testSubscriber.assertNoErrors();
}
private static <T> Flowable<Notification<T>> createWithParameters(Connection con, String sql,
Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
BiFunction<NamedCallableStatement, List<Object>, Single<T>> single) {
Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
final Function<NamedCallableStatement, Flowable<Notification<T>>> flowableFactory = //
stmt -> parameterGroups //
.flatMap(parameters -> single.apply(stmt, parameters).toFlowable()) //
.materialize() //
.doOnComplete(() -> Util.commit(stmt.stmt)) //
.doOnError(e -> Util.rollback(stmt.stmt));
Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
@Override
public void run(SourceContext<TEventType> sourceContext) throws Exception {
disposables = new CompositeDisposable();
Observable
// We zip two Streams: The Iterable and an Interval emitting data stream:
.zip(
Observable.fromIterable(iterable()),
Observable.interval(interval().toMillis(), TimeUnit.MILLISECONDS),
new BiFunction<TEventType, Long, TEventType>() {
@Override
public TEventType apply(@NonNull TEventType obs, @NonNull Long timer) throws Exception {
return obs;
}
}
)
// When the Subscription happens, add it to the list of Disposables:
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
disposables.add(disposable);
}
})
// We want this to be synchronous on the current thread, so do all this in a blocking subscribe:
.blockingSubscribe(new Consumer<TEventType>() {
@Override
public void accept(TEventType event) throws Exception {
sourceContext.collect(event);
}
});
}
public FlowableMatch(Flowable<A> a, Flowable<B> b, Function<? super A, ? extends K> aKey,
Function<? super B, ? extends K> bKey, BiFunction<? super A, ? super B, C> combiner, long requestSize) {
Preconditions.checkNotNull(a, "a should not be null");
Preconditions.checkNotNull(b, "b should not be null");
Preconditions.checkNotNull(aKey, "aKey cannot be null");
Preconditions.checkNotNull(bKey, "bKey cannot be null");
Preconditions.checkNotNull(combiner, "combiner cannot be null");
Preconditions.checkArgument(requestSize >= 1, "requestSize must be >=1");
this.a = a;
this.b = b;
this.aKey = aKey;
this.bKey = bKey;
this.combiner = combiner;
this.requestSize = requestSize;
}
MatchCoordinator(Function<? super A, ? extends K> aKey, Function<? super B, ? extends K> bKey,
BiFunction<? super A, ? super B, C> combiner, long requestSize, Subscriber<? super C> child) {
this.aKey = aKey;
this.bKey = bKey;
this.combiner = combiner;
this.requestSize = requestSize;
this.queue = new MpscLinkedQueue<Object>();
this.child = child;
}
public FlowableCollectWhile(Flowable<T> source, Callable<R> collectionFactory,
BiFunction<? super R, ? super T, ? extends R> add,
BiPredicate<? super R, ? super T> condition, boolean emitRemainder) {
super();
this.source = source;
this.collectionFactory = collectionFactory;
this.add = add;
this.condition = condition;
this.emitRemainder = emitRemainder;
}
CollectWhileSubscriber(Callable<R> collectionFactory,
BiFunction<? super R, ? super T, ? extends R> add,
BiPredicate<? super R, ? super T> condition, Subscriber<? super R> child,
boolean emitRemainder) {
this.collectionFactory = collectionFactory;
this.add = add;
this.condition = condition;
this.child = child;
this.emitRemainder = emitRemainder;
}
public static <A, B, C, K> FlowableTransformer<A, C> matchWith(final Flowable<B> b,
final Function<? super A, K> aKey, final Function<? super B, K> bKey,
final BiFunction<? super A, ? super B, C> combiner, int requestSize) {
return new FlowableTransformer<A, C>() {
@Override
public Publisher<C> apply(Flowable<A> upstream) {
return Flowables.match(upstream, b, aKey, bKey, combiner);
}
};
}