类io.reactivex.functions.BiConsumer源码实例Demo

下面列出了怎么用io.reactivex.functions.BiConsumer的API类实例代码及写法,或者点击链接到github查看源代码。

public static <T> Flowable<T> create(Callable<Connection> connectionFactory, List<Object> parameters, String sql,
                                     Function<? super ResultSet, T> mapper) {
    Callable<ResultSet> initialState = () -> {
        Connection con = connectionFactory.call();
        PreparedStatement ps = con.prepareStatement(sql);
        // TODO set parameters
        ResultSet rs = ps.executeQuery();
        return rs;
    };
    BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
        if (rs.next()) {
            emitter.onNext(mapper.apply(rs));
        } else {
            emitter.onComplete();
        }
    };
    Consumer<ResultSet> disposeState = FlowableSelect::closeSilently;
    return Flowable.generate(initialState, generator, disposeState);
}
 
@Override
public Flowable<String> getRepos(String user) {
    Callable<Iterator<String>> initialState =
            () -> gitHbubRepos.getRepos(user)
                              .stream()
                              .map(Repository::getName).iterator();
    BiConsumer<Iterator<String>, Emitter<String>> generator =
            (iterator, emitter) -> {
                if (iterator.hasNext()) {
                    emitter.onNext(iterator.next() + "  ");
                } else {
                    emitter.onComplete();
                }
            };
    return Flowable.generate(initialState, generator);
}
 
/**
 * Returns a bi-consumer writing the received {@link AsyncResult} to the routing context and setting
 * the HTTP status to the given status.
 *
 * @param context the routing context
 * @param status  the status
 * @return the bi-consumer
 */
private static <T> BiConsumer<T, Throwable> writeJsonResponse(RoutingContext context, int status) {
    return (res, err) -> {
        if (err != null) {
            if (err instanceof NoSuchElementException) {
                context.response().setStatusCode(404).end(err.getMessage());
            } else {
                context.fail(err);
            }
        } else {
            context.response().setStatusCode(status)
                .putHeader("content-type", "application/json; charset=utf-8")
                .end(Json.encodePrettily(res));
        }
    };
}
 
源代码4 项目: Java-9-Spring-Webflux   文件: GitHubServiceImpl.java
@Override
public Flowable<String> getRepos(String user) {
    Callable<Iterator<String>> initialState =
            () -> gitHbubRepos.getRepos(user)
                              .stream()
                              .map(Repository::getName).iterator();
    BiConsumer<Iterator<String>, Emitter<String>> generator =
            (iterator, emitter) -> {
                if (iterator.hasNext()) {
                    emitter.onNext(iterator.next() + "  ");
                } else {
                    emitter.onComplete();
                }
            };
    return Flowable.generate(initialState, generator);
}
 
源代码5 项目: rxjava2-jdbc   文件: Update.java
private static <T> Flowable<T> create(NamedPreparedStatement ps, List<Object> parameters,
        Function<? super ResultSet, T> mapper) {
    Callable<ResultSet> initialState = () -> {
        Util.convertAndSetParameters(ps.ps, parameters, ps.names);
        ps.ps.execute();
        return ps.ps.getGeneratedKeys();
    };
    BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
        if (rs.next()) {
            emitter.onNext(mapper.apply(rs));
        } else {
            emitter.onComplete();
        }
    };
    Consumer<ResultSet> disposer = Util::closeSilently;
    return Flowable.generate(initialState, generator, disposer);
}
 
源代码6 项目: rxjava2-jdbc   文件: Call.java
private static <T> Flowable<T> createFlowable(NamedCallableStatement stmt,
        Function<? super ResultSet, ? extends T> f) throws SQLException {
    ResultSet rsActual = stmt.stmt.getResultSet();
    Callable<ResultSet> initialState = () -> rsActual;
    BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
        log.debug("getting row from ps={}, rs={}", stmt.stmt, rs);
        if (rs.next()) {
            T v = f.apply(rs);
            log.debug("emitting {}", v);
            emitter.onNext(v);
        } else {
            log.debug("completed");
            emitter.onComplete();
        }
    };
    Consumer<ResultSet> disposeState = Util::closeSilently;
    return Flowable.generate(initialState, generator, disposeState);
}
 
源代码7 项目: rxjava2-extras   文件: FlowableStateMachine.java
public FlowableStateMachine(Flowable<In> source, //
        Callable<? extends State> initialState, //
        Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, //
        BiConsumer<? super State, ? super Emitter<Out>> completionAction, //
        Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, //
        BackpressureStrategy backpressureStrategy, //
        int requestBatchSize) {
    Preconditions.checkNotNull(initialState);
    Preconditions.checkNotNull(transition);
    Preconditions.checkNotNull(backpressureStrategy);
    Preconditions.checkArgument(requestBatchSize > 0,
            "initialRequest must be greater than zero");
    this.source = source;
    this.initialState = initialState;
    this.transition = transition;
    this.completionAction = completionAction;
    this.errorAction = errorAction;
    this.backpressureStrategy = backpressureStrategy;
    this.requestBatchSize = requestBatchSize;
}
 
源代码8 项目: rxjava2-extras   文件: FlowableStateMachine.java
StateMachineSubscriber( //
        Callable<? extends State> initialState,
        Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, //
        BiConsumer<? super State, ? super Emitter<Out>> completionAction, //
        Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, //
        BackpressureStrategy backpressureStrategy, //
        int requestBatchSize, //
        Subscriber<? super Out> child) {
    this.initialState = initialState;
    this.transition = transition;
    this.completionAction = completionAction;
    this.errorAction = errorAction;
    this.backpressureStrategy = backpressureStrategy;
    this.requestBatchSize = requestBatchSize;
    this.child = child;
    this.count = requestBatchSize;
}
 
源代码9 项目: GankGirl   文件: ReadTadPagePresenter.java
@Override
protected void onCreate(Bundle savedState) {
    super.onCreate(savedState);

    restartableFirst(RequestCommand.REQUEST_READ_TYPE,
            new Function0<Observable<List<ReadTypeBean>>>() {
                @Override
                public Observable<List<ReadTypeBean>> apply() {
                    return HttpRetrofit.getInstance().providers.getTypeList(observable, new DynamicKey("闲读分类"), new EvictDynamicKey(false))
                            .map(new HttpRetrofit.HttpResultFuncCcche<List<ReadTypeBean>>())
                            .compose(HttpRetrofit.toSubscribe());
                }
            },
            new BiConsumer<ReadTadPageFragment, List<ReadTypeBean>>() {
                @Override
                public void accept(@NonNull ReadTadPageFragment readTadPageFragment, @NonNull List<ReadTypeBean> readTypeBeen) throws Exception {
                    readTadPageFragment.onData(readTypeBeen);
                }
            });
}
 
源代码10 项目: GankGirl   文件: ReadMorePresenter.java
@Override
protected void onCreate(Bundle savedState) {
    super.onCreate(savedState);

    restartableFirst(RequestCommand.REQUEST_READ_CHILD_LIST,
            new Function0<Observable<ReadTypeBean>>() {
                @Override
                public Observable<ReadTypeBean> apply() {
                    return HttpRetrofit.getInstance().providers.getStackTypeList(observable, new DynamicKey(requestContext.getType()), new EvictDynamicKey(true))
                            .map(new HttpRetrofit.HttpResultFuncCcche<ReadTypeBean>())
                            .compose(HttpRetrofit.toSubscribe());
                }
            },
            new BiConsumer<ReadMoreActivity, ReadTypeBean>() {
                @Override
                public void accept(@NonNull ReadMoreActivity readMoreActivity, @NonNull ReadTypeBean readTypeBean) throws Exception {
                    if (readTypeBean.getReadListBeanList() != null)
                        readMoreActivity.onDataList(readTypeBean.getReadListBeanList());
                    if(!TextUtils.isEmpty(readTypeBean.getPage()))
                        readMoreActivity.setUrl_page(readTypeBean.getPage());
                }
            });
}
 
源代码11 项目: RxPermissions   文件: Permission.java
private String combineName(List<Permission> permissions) {
    return Observable.fromIterable(permissions)
            .map(new Function<Permission, String>() {
                @Override
                public String apply(Permission permission) throws Exception {
                    return permission.name;
                }
            }).collectInto(new StringBuilder(), new BiConsumer<StringBuilder, String>() {
                @Override
                public void accept(StringBuilder s, String s2) throws Exception {
                    if (s.length() == 0) {
                        s.append(s2);
                    } else {
                        s.append(", ").append(s2);
                    }
                }
            }).blockingGet().toString();
}
 
源代码12 项目: rxjava-RxLife   文件: SingleLife.java
public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable> onCallback) {
    ObjectHelper.requireNonNull(onCallback, "onCallback is null");

    BiConsumerSingleObserver<T> observer = new BiConsumerSingleObserver<T>(onCallback);
    subscribe(observer);
    return observer;
}
 
private static <T> Flowable<T> create(PreparedStatement ps, Function<? super ResultSet, T> mapper) {
    Callable<ResultSet> initialState = () -> {
        ps.execute();
        return ps.getGeneratedKeys();
    };
    BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
        if (rs.next()) {
            emitter.onNext(mapper.apply(rs));
        } else {
            emitter.onComplete();
        }
    };
    Consumer<ResultSet> disposer = Update::closeAll;
    return Flowable.generate(initialState, generator, disposer);
}
 
@Override
public Flowable<Repository> getRepos0(String user) {
    Callable<Iterator<Repository>> initialState =
            gitHbubRepos.getRepos(user)
                        .stream()::iterator;
    BiConsumer<Iterator<Repository>, Emitter<Repository>> generator =
            (iterator, emitter) -> {
                if (iterator.hasNext()) {
                    emitter.onNext(iterator.next());
                } else {
                    emitter.onComplete();
                }
            };
    return Flowable.generate(initialState, generator);
}
 
源代码15 项目: reactive-streams-in-java   文件: RxJavaDemo.java
@Override
public Future<String> doParallelStringConcatAsync(int count) {
    BiConsumer<StringBuilder, Object> collector =
            (stringBuilder, o) -> stringBuilder.append(o); //1
    return Observable.range(0, count).map(i -> "i=" + i)
            .window(10) // 3
            .flatMap(flow -> flow.subscribeOn(Schedulers.computation())
                    .collectInto(new StringBuilder(), collector).toObservable())
            .collectInto(new StringBuilder(), collector) //4
            .map(StringBuilder::toString) //5
            .toFuture();
}
 
源代码16 项目: Java-9-Spring-Webflux   文件: GitHubServiceImpl.java
@Override
public Flowable<Repository> getRepos0(String user) {
    Callable<Iterator<Repository>> initialState =
            gitHbubRepos.getRepos(user)
                        .stream()::iterator;
    BiConsumer<Iterator<Repository>, Emitter<Repository>> generator =
            (iterator, emitter) -> {
                if (iterator.hasNext()) {
                    emitter.onNext(iterator.next());
                } else {
                    emitter.onComplete();
                }
            };
    return Flowable.generate(initialState, generator);
}
 
源代码17 项目: GankGirl   文件: ReadPresenter.java
@Override
protected void onCreate(Bundle savedState) {
    super.onCreate(savedState);

    restartableFirst(RequestCommand.REQUEST_READ_LIST,
            new Function0<Observable<ReadTypeBean>>() {
                @Override
                public Observable<ReadTypeBean> apply() {
                    return HttpRetrofit.getInstance().providers.getStackTypeList(observable, new DynamicKey(requestContext.getType()), new EvictDynamicKey(true))
                            .map(new HttpRetrofit.HttpResultFuncCcche<ReadTypeBean>())
                            .compose(HttpRetrofit.toSubscribe());
                }
            },
            new BiConsumer<ReadFragment, ReadTypeBean>() {
                @Override
                public void accept(@NonNull ReadFragment readFragment, @NonNull ReadTypeBean readTypeBean) throws Exception {

                    if (readTypeBean.getReadChildTypeBeanList() != null)
                        readFragment.onDataChild(readTypeBean.getReadChildTypeBeanList());
                    if (readTypeBean.getReadListBeanList() != null)
                        readFragment.onDataList(readTypeBean.getReadListBeanList());
                    if(!TextUtils.isEmpty(readTypeBean.getPage()))
                        readFragment.setUrl_page(readTypeBean.getPage());
                }
            }
    );
}
 
源代码18 项目: GankGirl   文件: BaseRxPresenter.java
/**
 *
 * @param restartableId     重新请求ID
 * @param observableFactory 返回一个可观测时可重新起动的运行。
 * @param onNext            回调时将调用接收的数据应该传递给视图。
 * @param onError           回调错误到onError。
 * @param <T>               可观察类型
 */
public <T> void restartableFirst(int restartableId, final Function0<Observable<T>> observableFactory,
                                      final BiConsumer<View, T> onNext, @Nullable final BiConsumer<View, HttpExceptionHandle.ResponeThrowable> onError) {

    restartable(restartableId, new Function0<Disposable>() {
        @Override
        public Disposable apply() {
            return observableFactory.apply()
                    .compose(BaseRxPresenter.this.deliverFirst())
                    .subscribe(split(onNext, onError));
        }
    });
}
 
源代码19 项目: GankGirl   文件: BaseRxPresenter.java
/**
 *
 * @param restartableId     重新请求ID
 * @param observableFactory 返回一个可观测时可重新起动的运行。
 * @param onNext            回调时将调用接收的数据应该传递给视图。
 * @param onError           回调错误到onError。
 * @param <T>               可观察类型
 */
public <T> void restartableLatestCache(int restartableId, final Function0<Observable<T>> observableFactory,
                                       final BiConsumer<View, T> onNext, @Nullable final BiConsumer<View, HttpExceptionHandle.ResponeThrowable> onError) {

    restartable(restartableId, new Function0<Disposable>() {
        @Override
        public Disposable apply()  {
            return observableFactory.apply()
                    .compose(BaseRxPresenter.this.deliverLatestCache())
                    .subscribe(split(onNext, onError));
        }
    });
}
 
源代码20 项目: GankGirl   文件: BaseRxPresenter.java
/**
 *
 * @param restartableId     重新请求ID
 * @param observableFactory 返回一个可观测时可重新起动的运行。
 * @param onNext            回调时将调用接收的数据应该传递给视图。
 * @param onError           回调错误到onError。
 * @param <T>               可观察类型
 */
public <T> void restartableReplay(int restartableId, final Function0<Observable<T>> observableFactory,
                                  final BiConsumer<View, T> onNext, @Nullable final BiConsumer<View, HttpExceptionHandle.ResponeThrowable> onError) {

    restartable(restartableId, new Function0<Disposable>() {
        @Override
        public Disposable apply() {
            return observableFactory.apply()
                    .compose(BaseRxPresenter.this.deliverReplay())
                    .subscribe(split(onNext, onError));
        }
    });
}
 
源代码21 项目: GankGirl   文件: Delivery.java
public void split(BiConsumer<View, T> onNext, @Nullable BiConsumer<View, HttpExceptionHandle.ResponeThrowable> onError) throws Exception {
    if (notification.isOnNext()) {
        onNext.accept(view, notification.getValue());
    }else if(notification.isOnError()) {
        assert onError != null;
        onError.accept(view,(HttpExceptionHandle.ResponeThrowable)notification.getError());
    }

}
 
源代码22 项目: akarnokd-misc   文件: Fallout76EnemyInfo.java
static void processCurveTable(JsonElement file, BiConsumer<Integer, Integer> onEntry) throws Exception {
    JsonElement curve = file.getAsJsonObject().get("curve");
    for (JsonElement he : curve.getAsJsonArray()) {
        JsonObject obj = he.getAsJsonObject();
        onEntry.accept(obj.get("x").getAsInt(), obj.get("y").getAsInt());
    }        
}
 
源代码23 项目: Varis-Android   文件: RepositoriesPresenter.java
private BiConsumer<List<Repo>, Throwable> reposHandler() {
    return (repos, throwable) -> {
        if (mUser != null) {
            getView().updateUserData(mUser);
        }

        if (throwable == null) {
            handleReposLoaded(repos);
        } else {
            handleLoadingFailed(throwable);
        }
    };
}
 
static <T> BiConsumer<T, Throwable> ok(RoutingContext rc) {
    return writeJsonResponse(rc, 200);
}
 
static <T> BiConsumer<T, Throwable> created(RoutingContext rc) {
    return writeJsonResponse(rc, 201);
}
 
源代码26 项目: AssistantBySDK   文件: RemindPresenter.java
@Override
public void initDatas(final long id) {
    remindView.showProgressBar();
    Single.create(new SingleOnSubscribe<Integer>() {
        @Override
        public void subscribe(SingleEmitter<Integer> e) throws Exception {
            todayCount = mAssistDao.findRemindToday();
            List<Remind> allReminds = mAssistDao.findAllRemindDesc();
            for (Remind remind : allReminds) {
                TaskCard<Remind> taskCard = new TaskCard<>(remind, TaskCard.TaskState.ACTIVE);
                /*if (remind.getRdate().before(TimeUtils.getTodayDate())) {
                    taskCard.taskState = TaskCard.TaskState.INVALID;
                } else if (remind.getRdate().before(TimeUtils.getTomorrow())) {
                    if (new SimpleDate(remind.getRtime()).toValue() < TimeUtils.getTimeValue(new Date())) {
                        taskCard.taskState = TaskCard.TaskState.INVALID;
                    }
                }*/
                if(remind.getFrequency()==0&&remind.getRdate().before(new Date())) {
                    taskCard.taskState = TaskCard.TaskState.INVALID;
                }
                showDatas.add(taskCard);
            }
            e.onSuccess(0);
        }
    })
            .observeOn(AndroidSchedulers.mainThread())  //响应订阅(Sbscriber)所在线程
            .subscribeOn(Schedulers.io())   //执行订阅(subscribe())所在线程
            .subscribe(new BiConsumer<Integer, Throwable>() {
                @Override
                public void accept(Integer integer, Throwable throwable) throws Exception {
                    remindView.hideProgressBar();
                    remindView.notifyListView();
                    if (id > 0) {
                        for (int i = 0; i < showDatas.size(); i++) {
                            if (id == showDatas.get(i).t.getId()) {
                                remindView.moveToPosition(i + 1);
                                break;
                            }
                        }
                    }
                }
            });


}
 
源代码27 项目: rxjava2-jdbc   文件: Select.java
private static <T> Flowable<? extends T> create(PreparedStatement ps, List<Object> parameters,
                                                Function<? super ResultSet, T> mapper, List<String> names, String sql, int fetchSize, int queryTimeoutSec) {
    log.debug("parameters={}", parameters);
    log.debug("names={}", names);

    Callable<ResultSet> initialState = () -> {
        List<Parameter> params = Util.toParameters(parameters);
        boolean hasCollection = params.stream().anyMatch(x -> x.isCollection());
        final PreparedStatement ps2;
        if (hasCollection) {
            // create a new prepared statement with the collection ? substituted with
            // ?s to match the size of the collection parameter
            ps2 = Util.prepare(ps.getConnection(), fetchSize, sql, params, queryTimeoutSec);
            // now wrap the rs to auto close ps2 because it is single use (the next
            // collection parameter may have a different ordinality so we need to build
            // a new PreparedStatement with a different number of question marks
            // substituted
            return new ResultSetAutoClosesStatement(Util //
                    .setParameters(ps2, params, names) //
                    .executeQuery(), ps2);
        } else {
            // use the current prepared statement (normal re-use)
            ps2 = ps;
            return Util //
                    .setParameters(ps2, params, names) //
                    .executeQuery();
        }
    };
    BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
        log.debug("getting row from ps={}, rs={}", rs.getStatement(), rs);
        if (rs.next()) {
            T v = mapper.apply(rs);
            log.debug("emitting {}", v);
            emitter.onNext(v);
        } else {
            log.debug("completed");
            emitter.onComplete();
        }
    };
    Consumer<ResultSet> disposeState = Util::closeSilently;
    return Flowable.generate(initialState, generator, disposeState);
}
 
源代码28 项目: GankGirl   文件: BaseRxPresenter.java
/**
 * This is a shortcut for calling {@link #restartableFirst (int, Flowable, BiConsumer, BiConsumer)} with the last parameter = null.
 */
public <T> void restartableFirst(int restartableId, final Function0<Observable<T>> observableFactory, final BiConsumer<View, T> onNext) {
    restartableFirst(restartableId, observableFactory, onNext, null);
}
 
源代码29 项目: GankGirl   文件: BaseRxPresenter.java
/**
 * This is a shortcut for calling {@link #restartableLatestCache (int, Function, BiConsumer, BiConsumer)} with the last parameter = null.
 */
public <T> void restartableLatestCache(int restartableId, final Function0<Observable<T>> observableFactory, final BiConsumer<View, T> onNext) {
    restartableLatestCache(restartableId, observableFactory, onNext, null);
}
 
源代码30 项目: GankGirl   文件: BaseRxPresenter.java
/**
 * This is a shortcut for calling {@link #restartableReplay (int, Function, BiConsumer, BiConsumer)} with the last parameter = null.
 */
public <T> void restartableReplay(int restartableId, final Function0<Observable<T>> observableFactory, final BiConsumer<View, T> onNext) {
    restartableReplay(restartableId, observableFactory, onNext, null);
}
 
 类所在包
 类方法
 同包方法