类io.reactivex.functions.BiFunction源码实例Demo

下面列出了怎么用io.reactivex.functions.BiFunction的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: AndroidQuick   文件: Network3Fragment.java
private void clickAsyncZip() {
    TestApis testApis = RetrofitManager.INSTANCE.getRetrofit(Constants.GANK_API_URL).create(TestApis.class);
    Observable<TSSCRes<TSSCResult>> o1 = testApis.getTangShiSongCi();
    Observable<XHYRes<XHYResult>> o2 = testApis.getXHY();
    Observable.zip(o1, o2, new BiFunction<TSSCRes<TSSCResult>, XHYRes<XHYResult>, Integer>() {
        @Override
        public Integer apply(TSSCRes<TSSCResult> tsscRes, XHYRes<XHYResult> xhyRes) throws Exception {
            return tsscRes.getResult().size() + xhyRes.getResult().size();
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(new BaseObserver<Integer>() {

                           @Override
                           public void onError(ApiException exception) {
                               LogUtil.e(TAG, "error:" + exception.getMessage());
                           }

                           @Override
                           public void onSuccess(Integer count) {
                               LogUtil.e(TAG, "count = " + count);
                           }
                       }
            );
}
 
源代码2 项目: symbol-sdk-java   文件: BlockServiceImpl.java
private Observable<Boolean> getBooleanObservable(Observable<String> rootObservable, String leaf,
    Observable<MerkleProofInfo> merkleTransactionObservable) {

    BiFunction<String, MerkleProofInfo, Boolean> zipper = (root, merkleProofInfo) -> {
        List<MerklePathItem> merklePath = merkleProofInfo.getMerklePath();
        if (merklePath.isEmpty()) {
            // Single item tree, so leaf = HRoot0
            return leaf.equalsIgnoreCase(root);
        }

        // 1 is left
        java.util.function.BiFunction<String, MerklePathItem, String> accumulator = (proofHash, pathItem) -> ConvertUtils
            .toHex(Hashes
                .sha3_256(ConvertUtils
                    .fromHexToBytes(
                        pathItem.getPosition() == Position.LEFT ? pathItem.getHash() + proofHash
                            : proofHash + pathItem.getHash())));

        String hroot0 = merklePath.stream().reduce(leaf, accumulator, (s1, s2) -> s1);
        return root.equalsIgnoreCase(hroot0);
    };
    return Observable.zip(rootObservable, merkleTransactionObservable, zipper).onErrorReturn((e) -> {
        e.printStackTrace();
        return false;
    });
}
 
源代码3 项目: symbol-sdk-java   文件: TransactionServiceImpl.java
private Observable<TransactionFactory<? extends Transaction>> resolveTransactionFactory(
    TransferTransaction transaction, ReceiptSource expectedReceiptSource) {
    Observable<Statement> statementObservable = getStatement(transaction);
    Observable<List<Mosaic>> resolvedMosaics = Observable
        .fromIterable(transaction.getMosaics()).flatMap(
            m -> getResolvedMosaic(transaction, m, statementObservable, expectedReceiptSource))
        .toList().toObservable();

    Observable<Address> resolvedRecipient = getResolvedAddress(transaction,
        transaction.getRecipient(),
        statementObservable, expectedReceiptSource);

    BiFunction<Address, List<Mosaic>, TransferTransactionFactory> mergeFunction = (address, mosaics) ->
        TransferTransactionFactory
            .create(transaction.getNetworkType(), address, mosaics, transaction.getMessage());
    return Observable.combineLatest(resolvedRecipient, resolvedMosaics, mergeFunction);
}
 
源代码4 项目: symbol-sdk-java   文件: TransactionServiceImpl.java
private Observable<TransactionFactory<? extends Transaction>> resolveTransactionFactory(
    MosaicAddressRestrictionTransaction transaction,
    ReceiptSource expectedReceiptSource) {
    Observable<Statement> statementObservable = getStatement(transaction);
    Observable<MosaicId> resolvedMosaicId = getResolvedMosaicId(transaction,
        transaction.getMosaicId(),
        statementObservable, expectedReceiptSource);

    Observable<Address> resolvedTargetAddress = Observable
        .just(transaction.getTargetAddress())
        .flatMap(m -> getResolvedAddress(transaction, m, statementObservable,
            expectedReceiptSource));

    BiFunction<? super MosaicId, ? super Address, MosaicAddressRestrictionTransactionFactory> mapper = (mosaicId, targetAddress) ->
        MosaicAddressRestrictionTransactionFactory
            .create(transaction.getNetworkType(), mosaicId,
                transaction.getRestrictionKey(), targetAddress,
                transaction.getNewRestrictionValue())
            .previousRestrictionValue(transaction.getPreviousRestrictionValue());
    return Observable.combineLatest(resolvedMosaicId, resolvedTargetAddress, mapper);
}
 
源代码5 项目: symbol-sdk-java   文件: TransactionServiceImpl.java
private Observable<TransactionFactory<? extends Transaction>> resolveTransactionFactory(
    AccountMosaicRestrictionTransaction transaction,
    ReceiptSource expectedReceiptSource) {
    Observable<Statement> statementObservable = getStatement(transaction);
    Observable<List<UnresolvedMosaicId>> unresolvedAdditions = getResolvedMosaicIds(transaction,
        transaction.getRestrictionAdditions(), statementObservable, expectedReceiptSource);

    Observable<List<UnresolvedMosaicId>> unresolvedDeletions = getResolvedMosaicIds(transaction,
        transaction.getRestrictionDeletions(), statementObservable, expectedReceiptSource);

    BiFunction<List<UnresolvedMosaicId>, List<UnresolvedMosaicId>, TransactionFactory<AccountMosaicRestrictionTransaction>> mapper =
        (additions, deletions) -> AccountMosaicRestrictionTransactionFactory
            .create(transaction.getNetworkType(), transaction.getRestrictionFlags(), additions,
                deletions);
    return Observable.combineLatest(unresolvedAdditions, unresolvedDeletions, mapper);
}
 
源代码6 项目: symbol-sdk-java   文件: TransactionServiceImpl.java
private Observable<TransactionFactory<? extends Transaction>> resolveTransactionFactory(
    AccountAddressRestrictionTransaction transaction,
    ReceiptSource expectedReceiptSource) {
    Observable<Statement> statementObservable = getStatement(transaction);
    Observable<List<UnresolvedAddress>> unresolvedAdditions = getResolvedAddresses(transaction,
        transaction.getRestrictionAdditions(), statementObservable, expectedReceiptSource);

    Observable<List<UnresolvedAddress>> unresolvedDeletions = getResolvedAddresses(transaction,
        transaction.getRestrictionDeletions(), statementObservable, expectedReceiptSource);

    BiFunction<List<UnresolvedAddress>, List<UnresolvedAddress>, AccountAddressRestrictionTransactionFactory> mapper =
        (additions, deletions) -> AccountAddressRestrictionTransactionFactory
            .create(transaction.getNetworkType(), transaction.getRestrictionFlags(), additions,
                deletions);
    return Observable.combineLatest(unresolvedAdditions, unresolvedDeletions, mapper);
}
 
源代码7 项目: RxRetroJsoup   文件: MainActivity.java
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    recyclerView = (RecyclerView) findViewById(R.id.recyclerview);

    recyclerView.setLayoutManager(new LinearLayoutManager(getBaseContext()));
    adapter = new Adapter();
    recyclerView.setAdapter(adapter);

    loadWithRetroJsoup();

    Observable.zip(
            Observable.just(""),
            Observable.just("&"),
            new BiFunction<String, String, String>(){

                @Override
                public String apply(@NonNull String s, @NonNull String s2) throws Exception {
                    return null;
                }
            }
    );
}
 
源代码8 项目: Tangram-Android   文件: LifeCycleHelper.java
public static <T, E> LifecycleTransformer<T> bindToLifeCycle(Observable<E> lifecycle,
    final Function<E, E> correspondingEvents) {
    Observable<E> lifecycleCopy = lifecycle.share();
    return new LifecycleTransformer<>(Observable.combineLatest(lifecycle.take(1).map(correspondingEvents),
        lifecycleCopy.skip(1),
        new BiFunction<E, E, Boolean>() {

            @Override
            public Boolean apply(E e, E e2) throws Exception {
                return e.equals(e2);
            }
        }).filter(new Predicate<Boolean>() {
        @Override
        public boolean test(Boolean cmpResult) throws Exception {
            return cmpResult;
        }
    }));
}
 
源代码9 项目: RIBs   文件: Step.java
/**
 * Chains another step to be performed after this step completes. If the previous step results in
 * an error and does not emit a new actionable item, future chained onStep calls will not be
 * called.
 *
 * @param func to return the next step when this current step completes. This function will
 *     receive the result of the previous step and the next actionable item to take an action on.
 * @param <TNewValueType> the value type returned by the next step.
 * @param <TNewActionableItem> the actionable item type returned by the next step.
 * @return a {@link Step} to chain more calls to.
 */
public <TNewValueType, TNewActionableItem extends ActionableItem>
    Step<TNewValueType, TNewActionableItem> onStep(
        final BiFunction<T, A, Step<TNewValueType, TNewActionableItem>> func) {
  return new Step<>(
      asObservable()
          .flatMap(
              new Function<
                  Optional<Data<T, A>>,
                  Observable<Optional<Data<TNewValueType, TNewActionableItem>>>>() {
                @Override
                public Observable<Optional<Data<TNewValueType, TNewActionableItem>>> apply(
                    Optional<Data<T, A>> dataOptional) throws Exception {
                  if (dataOptional.isPresent()) {
                    Data<T, A> data = dataOptional.get();
                    return func.apply(data.value, data.actionableItem).asObservable();
                  } else {
                    return Observable.just(
                        Optional.<Data<TNewValueType, TNewActionableItem>>absent());
                  }
                }
              })
          .singleOrError());
}
 
源代码10 项目: rxjava2-extras   文件: Transformers.java
public static <T, R extends Number> FlowableTransformer<T, Pair<T, Statistics>> collectStats(
        final Function<? super T, ? extends R> function) {
    return new FlowableTransformer<T, Pair<T, Statistics>>() {

        @Override
        public Flowable<Pair<T, Statistics>> apply(Flowable<T> source) {
            return source.scan(Pair.create((T) null, Statistics.create()),
                    new BiFunction<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
                        @Override
                        public Pair<T, Statistics> apply(Pair<T, Statistics> pair, T t) throws Exception {
                            return Pair.create(t, pair.b().add(function.apply(t)));
                        }
                    }).skip(1);
        }
    };
}
 
源代码11 项目: My-MVP   文件: RxJavaOperatorActivity.java
/**
 * 当 merge 和 zip 遇到  error 时改怎么办
 */
private void mergeAndZipOnError() {
    Observable<String> fail = Observable.create(emitter -> emitter.onError(new Throwable("obviously error")));

    Observable<Integer> success = Observable.create(emitter -> emitter.onNext(1));

    Disposable d = Observable.merge(fail, success)
            .subscribe((Consumer<Serializable>) serializable -> {
                if (serializable instanceof String) {

                }

            }, throwable -> Log.e(TAG, "accept: merge error " + throwable.getMessage()));

    d = Observable.zip(fail, success, (BiFunction<String, Integer, Object>)
            (s, integer) -> new String("success")).subscribe(o -> { },
            throwable -> Log.e(TAG, "accept: zip error " + throwable.getMessage()));
}
 
源代码12 项目: GankGirl   文件: RetryWhenNetworkException.java
@Override
public Observable<?> apply(Observable<? extends Throwable> flowable) throws Exception {
    return flowable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() {
        @Override
        public Wrapper apply(Throwable throwable, Integer integer) throws Exception {
            return new Wrapper(throwable, integer);
        }
    }).flatMap(wrapper -> {
        if ((wrapper.throwable instanceof ConnectException
                || wrapper.throwable instanceof SocketTimeoutException
                || wrapper.throwable instanceof TimeoutException)
                && wrapper.index < count + 1) {
            return Observable.timer(delay + (wrapper.index - 1) * delay, TimeUnit.MILLISECONDS);
        }
        return Observable.error(wrapper.throwable);
    } );
}
 
源代码13 项目: SQLite   文件: RxSQLiteTest.java
@Test
public void testElementsList() throws Exception {
    List<TestObject> elements = new ArrayList<>();
    elements.add(new TestObject(1, 9.5, "a"));
    elements.add(new TestObject(2, 6.7, "ab"));
    elements.add(new TestObject(3, 8.2, "abc"));
    elements.add(new TestObject(4, 3.4, "abcd"));
    elements.add(new TestObject(5, 6.5, "abcde"));
    SQLite.get().insert(TestTable.TABLE, elements);

    Observable.zip(RxSQLite.get().query(TestTable.TABLE),
            Observable.just(elements), new BiFunction<List<TestObject>, List<TestObject>, Object>() {
                @Override
                public Object apply(List<TestObject> testElements, List<TestObject> savedElements) throws Exception {
                    assertEquals(testElements.size(), savedElements.size());
                    for (int i = 0; i < testElements.size(); i++) {
                        assertEquals(testElements.size(), savedElements.size());
                    }
                    return null;
                }
            })
            .test();
}
 
源代码14 项目: RxJava2Demo   文件: ChapterFour.java
public static void practice1(){
    final Api api = RetrofitProvider.get().create(Api.class);
    Observable<UserBaseInfoResponse> observable1 =
            api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());

    Observable<UserExtraInfoResponse> observable2 =
            api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());

    Observable.zip(observable1, observable2,
            new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {
                @Override
                public UserInfo apply(UserBaseInfoResponse baseInfo,
                                      UserExtraInfoResponse extraInfo) throws Exception {
                    return new UserInfo(baseInfo, extraInfo);
                }
            }).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<UserInfo>() {
                @Override
                public void accept(UserInfo userInfo) throws Exception {
                    //do something;
                }
            });
}
 
源代码15 项目: sqlitemagic   文件: SynchronousColumnQueryTest.java
@Test
public void avgFunction() {
  final double count = 8;
  final Integer sum = Observable.fromIterable(insertSimpleAllValues((int) count))
      .map(new Function<SimpleAllValuesMutable, Integer>() {
        @Override
        public Integer apply(SimpleAllValuesMutable v) {
          return (int) v.primitiveShort;
        }
      })
      .reduce(new BiFunction<Integer, Integer, Integer>() {
        @Override
        public Integer apply(Integer v1, Integer v2) {
          return v1 + v2;
        }
      })
      .blockingGet();

  final Double value = Select
      .column(avg(SIMPLE_ALL_VALUES_MUTABLE.PRIMITIVE_SHORT))
      .from(SIMPLE_ALL_VALUES_MUTABLE)
      .takeFirst()
      .execute();
  assertThat(value).isEqualTo(sum.doubleValue() / count);
}
 
源代码16 项目: AndroidQuick   文件: RxJavaFragment.java
private void testZip() {
    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
        @Override
        public String apply(Integer a, String b) throws Exception {
            return a + b;
        }
    }).compose(RxUtil.applySchedulers()).subscribe(new Consumer<String>() {
        @Override
        public void accept(String o) throws Exception {
            Log.d(TAG, o);
        }
    });
}
 
源代码17 项目: AndroidQuick   文件: RxJavaFragment.java
private void testZipAsync() {
    observable1 = observable1.subscribeOn(Schedulers.io());
    observable2 = observable2.subscribeOn(Schedulers.io());
    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
        @Override
        public String apply(Integer a, String b) throws Exception {
            return a + b;
        }
    }).compose(RxUtil.applySchedulers()).subscribe(new Consumer<String>() {
        @Override
        public void accept(String o) throws Exception {
            Log.d(TAG, o);
        }
    });
}
 
源代码18 项目: NewFastFrame   文件: RetryWhenNetworkException.java
@Override
public Observable<?> apply(@NonNull Observable<? extends Throwable> observable) throws Exception {
    return observable
            .zipWith(Observable.range(1, count + 1), (BiFunction<Throwable, Integer, Wrapper>) Wrapper::new).flatMap((Function<Wrapper, ObservableSource<?>>) wrapper -> {
                if ((wrapper.throwable instanceof ConnectException
                        || wrapper.throwable instanceof SocketTimeoutException
                        || wrapper.throwable instanceof TimeoutException)
                        && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                    return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

                }
                return Observable.error(wrapper.throwable);
            });
}
 
源代码19 项目: RxEasyHttp   文件: RetryExceptionFunc.java
@Override
public Observable<?> apply(@NonNull Observable<? extends Throwable> observable) throws Exception {
    return observable.zipWith(Observable.range(1, count + 1), new BiFunction<Throwable, Integer, Wrapper>() {
        @Override
        public Wrapper apply(@NonNull Throwable throwable, @NonNull Integer integer) throws Exception {
            return new Wrapper(throwable, integer);
        }
    }).flatMap(new Function<Wrapper, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(@NonNull Wrapper wrapper) throws Exception {
            if (wrapper.index > 1)
                HttpLog.i("重试次数:" + (wrapper.index));
            int errCode = 0;
            if (wrapper.throwable instanceof ApiException) {
                ApiException exception = (ApiException) wrapper.throwable;
                errCode = exception.getCode();
            }
            if ((wrapper.throwable instanceof ConnectException
                    || wrapper.throwable instanceof SocketTimeoutException
                    || errCode == ApiException.ERROR.NETWORD_ERROR
                    || errCode == ApiException.ERROR.TIMEOUT_ERROR
                    || wrapper.throwable instanceof SocketTimeoutException
                    || wrapper.throwable instanceof TimeoutException)
                    && wrapper.index < count + 1) { //如果超出重试次数也抛出错误,否则默认是会进入onCompleted
                return Observable.timer(delay + (wrapper.index - 1) * increaseDelay, TimeUnit.MILLISECONDS);

            }
            return Observable.error(wrapper.throwable);
        }
    });
}
 
源代码20 项目: NullAway   文件: NullAwayJava8PositiveCases.java
static void testAnnoatedThirdParty() {
  // BUG: Diagnostic contains: returning @Nullable expression from method with @NonNull return
  Function<String, Object> f1 = (x) -> null; // io.reactivex.(Bi)Function is anotated
  Function<String, Object> f2 =
      (x) -> {
        // BUG: Diagnostic contains: returning @Nullable expression from method with @NonNull
        return null;
      };
  // BUG: Diagnostic contains: returning @Nullable expression from method with @NonNull return
  BiFunction<String, String, Object> f3 = (x, y) -> null;
}
 
源代码21 项目: RxBus   文件: RxBus.java
public <CLASS> Observable<CLASS> get(Class<CLASS> theClass) {
    return Observable.zip(
            onEvent(theClass),
            postAsObservable(new AskedEvent(theClass)),
            new BiFunction<CLASS, Object, CLASS>() {
                @Override
                public CLASS apply(@NonNull CLASS neededObject, @NonNull Object _useless) throws Exception {
                    return neededObject;
                }
            });
}
 
源代码22 项目: RIBs   文件: StepTest.java
@Test
public void onStep_withASuccessFullFirstAction_shouldProperlyChainTheNextStep() {
  Object returnValue = new Object();
  final Object secondReturnValue = new Object();
  TestObserver<Optional<Step.Data<Object, ActionableItem>>> testSubscriber = new TestObserver<>();

  interactorLifecycleSubject.onNext(InteractorEvent.ACTIVE);

  step.onStep(
          new BiFunction<Object, ActionableItem, Step<Object, ActionableItem>>() {
            @Override
            public Step<Object, ActionableItem> apply(Object o, ActionableItem actionableItem) {
              return Step.from(
                  Observable.just(new Step.Data<>(secondReturnValue, actionableItem))
                      .singleOrError());
            }
          })
      .asObservable()
      .subscribe(testSubscriber);

  returnValueSubject.onNext(
      Optional.of(
          new Step.Data<Object, ActionableItem>(
              returnValue,
              new ActionableItem() {
                @NonNull
                @Override
                public Observable<InteractorEvent> lifecycle() {
                  return interactorLifecycleSubject;
                }
              })));
  returnValueSubject.onComplete();

  testSubscriber.assertValueCount(1);
  assertThat(testSubscriber.values().get(0).get().getValue()).isEqualTo(secondReturnValue);
  testSubscriber.assertComplete();
  testSubscriber.assertNoErrors();
}
 
源代码23 项目: RIBs   文件: StepTest.java
@Test
public void onStep_withAnUnsuccessfulFirstAction_shouldTerminateTheWholeChain() {
  TestObserver<Optional<Step.Data<Object, ActionableItem>>> testSubscriber = new TestObserver<>();
  final Object secondReturnValue = new Object();

  interactorLifecycleSubject.onNext(InteractorEvent.ACTIVE);

  step.onStep(
          new BiFunction<Object, ActionableItem, Step<Object, ActionableItem>>() {
            @Override
            public Step<Object, ActionableItem> apply(Object o, ActionableItem actionableItem) {
              return Step.from(
                  Observable.just(new Step.Data<>(secondReturnValue, actionableItem))
                      .singleOrError());
            }
          })
      .asObservable()
      .subscribe(testSubscriber);

  returnValueSubject.onNext(Optional.<Step.Data<Object, ActionableItem>>absent());
  returnValueSubject.onComplete();

  testSubscriber.assertValueCount(1);
  assertThat(testSubscriber.values().get(0).isPresent()).isFalse();
  testSubscriber.assertComplete();
  testSubscriber.assertNoErrors();
}
 
源代码24 项目: rxjava2-jdbc   文件: Call.java
private static <T> Flowable<Notification<T>> createWithParameters(Connection con, String sql,
        Flowable<List<Object>> parameterGroups, List<ParameterPlaceholder> parameterPlaceholders,
        BiFunction<NamedCallableStatement, List<Object>, Single<T>> single) {
    Callable<NamedCallableStatement> resourceFactory = () -> Util.prepareCall(con, sql, parameterPlaceholders);
    final Function<NamedCallableStatement, Flowable<Notification<T>>> flowableFactory = //
            stmt -> parameterGroups //
                    .flatMap(parameters -> single.apply(stmt, parameters).toFlowable()) //
                    .materialize() //
                    .doOnComplete(() -> Util.commit(stmt.stmt)) //
                    .doOnError(e -> Util.rollback(stmt.stmt));
    Consumer<NamedCallableStatement> disposer = Util::closeCallableStatementAndConnection;
    return Flowable.using(resourceFactory, flowableFactory, disposer, true);
}
 
@Override
public void run(SourceContext<TEventType> sourceContext) throws Exception {

    disposables = new CompositeDisposable();

    Observable
            // We zip two Streams: The Iterable and an Interval emitting data stream:
            .zip(
                    Observable.fromIterable(iterable()),
                    Observable.interval(interval().toMillis(), TimeUnit.MILLISECONDS),
                    new BiFunction<TEventType, Long, TEventType>() {
                        @Override
                        public TEventType apply(@NonNull TEventType obs, @NonNull Long timer) throws Exception {
                            return obs;
                        }
                    }
            )
            // When the Subscription happens, add it to the list of Disposables:
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    disposables.add(disposable);
                }
            })
            // We want this to be synchronous on the current thread, so do all this in a blocking subscribe:
            .blockingSubscribe(new Consumer<TEventType>() {
                @Override
                public void accept(TEventType event) throws Exception {
                    sourceContext.collect(event);
                }
            });
}
 
源代码26 项目: rxjava2-extras   文件: FlowableMatch.java
public FlowableMatch(Flowable<A> a, Flowable<B> b, Function<? super A, ? extends K> aKey,
        Function<? super B, ? extends K> bKey, BiFunction<? super A, ? super B, C> combiner, long requestSize) {
    Preconditions.checkNotNull(a, "a should not be null");
    Preconditions.checkNotNull(b, "b should not be null");
    Preconditions.checkNotNull(aKey, "aKey cannot be null");
    Preconditions.checkNotNull(bKey, "bKey cannot be null");
    Preconditions.checkNotNull(combiner, "combiner cannot be null");
    Preconditions.checkArgument(requestSize >= 1, "requestSize must be >=1");
    this.a = a;
    this.b = b;
    this.aKey = aKey;
    this.bKey = bKey;
    this.combiner = combiner;
    this.requestSize = requestSize;
}
 
源代码27 项目: rxjava2-extras   文件: FlowableMatch.java
MatchCoordinator(Function<? super A, ? extends K> aKey, Function<? super B, ? extends K> bKey,
        BiFunction<? super A, ? super B, C> combiner, long requestSize, Subscriber<? super C> child) {
    this.aKey = aKey;
    this.bKey = bKey;
    this.combiner = combiner;
    this.requestSize = requestSize;
    this.queue = new MpscLinkedQueue<Object>();
    this.child = child;
}
 
源代码28 项目: rxjava2-extras   文件: FlowableCollectWhile.java
public FlowableCollectWhile(Flowable<T> source, Callable<R> collectionFactory,
        BiFunction<? super R, ? super T, ? extends R> add,
        BiPredicate<? super R, ? super T> condition, boolean emitRemainder) {
    super();
    this.source = source;
    this.collectionFactory = collectionFactory;
    this.add = add;
    this.condition = condition;
    this.emitRemainder = emitRemainder;
}
 
源代码29 项目: rxjava2-extras   文件: FlowableCollectWhile.java
CollectWhileSubscriber(Callable<R> collectionFactory,
        BiFunction<? super R, ? super T, ? extends R> add,
        BiPredicate<? super R, ? super T> condition, Subscriber<? super R> child,
        boolean emitRemainder) {
    this.collectionFactory = collectionFactory;
    this.add = add;
    this.condition = condition;
    this.child = child;
    this.emitRemainder = emitRemainder;
}
 
源代码30 项目: rxjava2-extras   文件: Transformers.java
public static <A, B, C, K> FlowableTransformer<A, C> matchWith(final Flowable<B> b,
        final Function<? super A, K> aKey, final Function<? super B, K> bKey,
        final BiFunction<? super A, ? super B, C> combiner, int requestSize) {
    return new FlowableTransformer<A, C>() {

        @Override
        public Publisher<C> apply(Flowable<A> upstream) {
            return Flowables.match(upstream, b, aKey, bKey, combiner);
        }
    };
}
 
 类所在包
 类方法
 同包方法