下面列出了怎么用io.reactivex.functions.BiConsumer的API类实例代码及写法,或者点击链接到github查看源代码。
public static <T> Flowable<T> create(Callable<Connection> connectionFactory, List<Object> parameters, String sql,
Function<? super ResultSet, T> mapper) {
Callable<ResultSet> initialState = () -> {
Connection con = connectionFactory.call();
PreparedStatement ps = con.prepareStatement(sql);
// TODO set parameters
ResultSet rs = ps.executeQuery();
return rs;
};
BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
if (rs.next()) {
emitter.onNext(mapper.apply(rs));
} else {
emitter.onComplete();
}
};
Consumer<ResultSet> disposeState = FlowableSelect::closeSilently;
return Flowable.generate(initialState, generator, disposeState);
}
@Override
public Flowable<String> getRepos(String user) {
Callable<Iterator<String>> initialState =
() -> gitHbubRepos.getRepos(user)
.stream()
.map(Repository::getName).iterator();
BiConsumer<Iterator<String>, Emitter<String>> generator =
(iterator, emitter) -> {
if (iterator.hasNext()) {
emitter.onNext(iterator.next() + " ");
} else {
emitter.onComplete();
}
};
return Flowable.generate(initialState, generator);
}
/**
* Returns a bi-consumer writing the received {@link AsyncResult} to the routing context and setting
* the HTTP status to the given status.
*
* @param context the routing context
* @param status the status
* @return the bi-consumer
*/
private static <T> BiConsumer<T, Throwable> writeJsonResponse(RoutingContext context, int status) {
return (res, err) -> {
if (err != null) {
if (err instanceof NoSuchElementException) {
context.response().setStatusCode(404).end(err.getMessage());
} else {
context.fail(err);
}
} else {
context.response().setStatusCode(status)
.putHeader("content-type", "application/json; charset=utf-8")
.end(Json.encodePrettily(res));
}
};
}
@Override
public Flowable<String> getRepos(String user) {
Callable<Iterator<String>> initialState =
() -> gitHbubRepos.getRepos(user)
.stream()
.map(Repository::getName).iterator();
BiConsumer<Iterator<String>, Emitter<String>> generator =
(iterator, emitter) -> {
if (iterator.hasNext()) {
emitter.onNext(iterator.next() + " ");
} else {
emitter.onComplete();
}
};
return Flowable.generate(initialState, generator);
}
private static <T> Flowable<T> create(NamedPreparedStatement ps, List<Object> parameters,
Function<? super ResultSet, T> mapper) {
Callable<ResultSet> initialState = () -> {
Util.convertAndSetParameters(ps.ps, parameters, ps.names);
ps.ps.execute();
return ps.ps.getGeneratedKeys();
};
BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
if (rs.next()) {
emitter.onNext(mapper.apply(rs));
} else {
emitter.onComplete();
}
};
Consumer<ResultSet> disposer = Util::closeSilently;
return Flowable.generate(initialState, generator, disposer);
}
private static <T> Flowable<T> createFlowable(NamedCallableStatement stmt,
Function<? super ResultSet, ? extends T> f) throws SQLException {
ResultSet rsActual = stmt.stmt.getResultSet();
Callable<ResultSet> initialState = () -> rsActual;
BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
log.debug("getting row from ps={}, rs={}", stmt.stmt, rs);
if (rs.next()) {
T v = f.apply(rs);
log.debug("emitting {}", v);
emitter.onNext(v);
} else {
log.debug("completed");
emitter.onComplete();
}
};
Consumer<ResultSet> disposeState = Util::closeSilently;
return Flowable.generate(initialState, generator, disposeState);
}
public FlowableStateMachine(Flowable<In> source, //
Callable<? extends State> initialState, //
Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, //
BiConsumer<? super State, ? super Emitter<Out>> completionAction, //
Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, //
BackpressureStrategy backpressureStrategy, //
int requestBatchSize) {
Preconditions.checkNotNull(initialState);
Preconditions.checkNotNull(transition);
Preconditions.checkNotNull(backpressureStrategy);
Preconditions.checkArgument(requestBatchSize > 0,
"initialRequest must be greater than zero");
this.source = source;
this.initialState = initialState;
this.transition = transition;
this.completionAction = completionAction;
this.errorAction = errorAction;
this.backpressureStrategy = backpressureStrategy;
this.requestBatchSize = requestBatchSize;
}
StateMachineSubscriber( //
Callable<? extends State> initialState,
Function3<? super State, ? super In, ? super Emitter<Out>, ? extends State> transition, //
BiConsumer<? super State, ? super Emitter<Out>> completionAction, //
Consumer3<? super State, ? super Throwable, ? super Emitter<Out>> errorAction, //
BackpressureStrategy backpressureStrategy, //
int requestBatchSize, //
Subscriber<? super Out> child) {
this.initialState = initialState;
this.transition = transition;
this.completionAction = completionAction;
this.errorAction = errorAction;
this.backpressureStrategy = backpressureStrategy;
this.requestBatchSize = requestBatchSize;
this.child = child;
this.count = requestBatchSize;
}
@Override
protected void onCreate(Bundle savedState) {
super.onCreate(savedState);
restartableFirst(RequestCommand.REQUEST_READ_TYPE,
new Function0<Observable<List<ReadTypeBean>>>() {
@Override
public Observable<List<ReadTypeBean>> apply() {
return HttpRetrofit.getInstance().providers.getTypeList(observable, new DynamicKey("闲读分类"), new EvictDynamicKey(false))
.map(new HttpRetrofit.HttpResultFuncCcche<List<ReadTypeBean>>())
.compose(HttpRetrofit.toSubscribe());
}
},
new BiConsumer<ReadTadPageFragment, List<ReadTypeBean>>() {
@Override
public void accept(@NonNull ReadTadPageFragment readTadPageFragment, @NonNull List<ReadTypeBean> readTypeBeen) throws Exception {
readTadPageFragment.onData(readTypeBeen);
}
});
}
@Override
protected void onCreate(Bundle savedState) {
super.onCreate(savedState);
restartableFirst(RequestCommand.REQUEST_READ_CHILD_LIST,
new Function0<Observable<ReadTypeBean>>() {
@Override
public Observable<ReadTypeBean> apply() {
return HttpRetrofit.getInstance().providers.getStackTypeList(observable, new DynamicKey(requestContext.getType()), new EvictDynamicKey(true))
.map(new HttpRetrofit.HttpResultFuncCcche<ReadTypeBean>())
.compose(HttpRetrofit.toSubscribe());
}
},
new BiConsumer<ReadMoreActivity, ReadTypeBean>() {
@Override
public void accept(@NonNull ReadMoreActivity readMoreActivity, @NonNull ReadTypeBean readTypeBean) throws Exception {
if (readTypeBean.getReadListBeanList() != null)
readMoreActivity.onDataList(readTypeBean.getReadListBeanList());
if(!TextUtils.isEmpty(readTypeBean.getPage()))
readMoreActivity.setUrl_page(readTypeBean.getPage());
}
});
}
private String combineName(List<Permission> permissions) {
return Observable.fromIterable(permissions)
.map(new Function<Permission, String>() {
@Override
public String apply(Permission permission) throws Exception {
return permission.name;
}
}).collectInto(new StringBuilder(), new BiConsumer<StringBuilder, String>() {
@Override
public void accept(StringBuilder s, String s2) throws Exception {
if (s.length() == 0) {
s.append(s2);
} else {
s.append(", ").append(s2);
}
}
}).blockingGet().toString();
}
public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable> onCallback) {
ObjectHelper.requireNonNull(onCallback, "onCallback is null");
BiConsumerSingleObserver<T> observer = new BiConsumerSingleObserver<T>(onCallback);
subscribe(observer);
return observer;
}
private static <T> Flowable<T> create(PreparedStatement ps, Function<? super ResultSet, T> mapper) {
Callable<ResultSet> initialState = () -> {
ps.execute();
return ps.getGeneratedKeys();
};
BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
if (rs.next()) {
emitter.onNext(mapper.apply(rs));
} else {
emitter.onComplete();
}
};
Consumer<ResultSet> disposer = Update::closeAll;
return Flowable.generate(initialState, generator, disposer);
}
@Override
public Flowable<Repository> getRepos0(String user) {
Callable<Iterator<Repository>> initialState =
gitHbubRepos.getRepos(user)
.stream()::iterator;
BiConsumer<Iterator<Repository>, Emitter<Repository>> generator =
(iterator, emitter) -> {
if (iterator.hasNext()) {
emitter.onNext(iterator.next());
} else {
emitter.onComplete();
}
};
return Flowable.generate(initialState, generator);
}
@Override
public Future<String> doParallelStringConcatAsync(int count) {
BiConsumer<StringBuilder, Object> collector =
(stringBuilder, o) -> stringBuilder.append(o); //1
return Observable.range(0, count).map(i -> "i=" + i)
.window(10) // 3
.flatMap(flow -> flow.subscribeOn(Schedulers.computation())
.collectInto(new StringBuilder(), collector).toObservable())
.collectInto(new StringBuilder(), collector) //4
.map(StringBuilder::toString) //5
.toFuture();
}
@Override
public Flowable<Repository> getRepos0(String user) {
Callable<Iterator<Repository>> initialState =
gitHbubRepos.getRepos(user)
.stream()::iterator;
BiConsumer<Iterator<Repository>, Emitter<Repository>> generator =
(iterator, emitter) -> {
if (iterator.hasNext()) {
emitter.onNext(iterator.next());
} else {
emitter.onComplete();
}
};
return Flowable.generate(initialState, generator);
}
@Override
protected void onCreate(Bundle savedState) {
super.onCreate(savedState);
restartableFirst(RequestCommand.REQUEST_READ_LIST,
new Function0<Observable<ReadTypeBean>>() {
@Override
public Observable<ReadTypeBean> apply() {
return HttpRetrofit.getInstance().providers.getStackTypeList(observable, new DynamicKey(requestContext.getType()), new EvictDynamicKey(true))
.map(new HttpRetrofit.HttpResultFuncCcche<ReadTypeBean>())
.compose(HttpRetrofit.toSubscribe());
}
},
new BiConsumer<ReadFragment, ReadTypeBean>() {
@Override
public void accept(@NonNull ReadFragment readFragment, @NonNull ReadTypeBean readTypeBean) throws Exception {
if (readTypeBean.getReadChildTypeBeanList() != null)
readFragment.onDataChild(readTypeBean.getReadChildTypeBeanList());
if (readTypeBean.getReadListBeanList() != null)
readFragment.onDataList(readTypeBean.getReadListBeanList());
if(!TextUtils.isEmpty(readTypeBean.getPage()))
readFragment.setUrl_page(readTypeBean.getPage());
}
}
);
}
/**
*
* @param restartableId 重新请求ID
* @param observableFactory 返回一个可观测时可重新起动的运行。
* @param onNext 回调时将调用接收的数据应该传递给视图。
* @param onError 回调错误到onError。
* @param <T> 可观察类型
*/
public <T> void restartableFirst(int restartableId, final Function0<Observable<T>> observableFactory,
final BiConsumer<View, T> onNext, @Nullable final BiConsumer<View, HttpExceptionHandle.ResponeThrowable> onError) {
restartable(restartableId, new Function0<Disposable>() {
@Override
public Disposable apply() {
return observableFactory.apply()
.compose(BaseRxPresenter.this.deliverFirst())
.subscribe(split(onNext, onError));
}
});
}
/**
*
* @param restartableId 重新请求ID
* @param observableFactory 返回一个可观测时可重新起动的运行。
* @param onNext 回调时将调用接收的数据应该传递给视图。
* @param onError 回调错误到onError。
* @param <T> 可观察类型
*/
public <T> void restartableLatestCache(int restartableId, final Function0<Observable<T>> observableFactory,
final BiConsumer<View, T> onNext, @Nullable final BiConsumer<View, HttpExceptionHandle.ResponeThrowable> onError) {
restartable(restartableId, new Function0<Disposable>() {
@Override
public Disposable apply() {
return observableFactory.apply()
.compose(BaseRxPresenter.this.deliverLatestCache())
.subscribe(split(onNext, onError));
}
});
}
/**
*
* @param restartableId 重新请求ID
* @param observableFactory 返回一个可观测时可重新起动的运行。
* @param onNext 回调时将调用接收的数据应该传递给视图。
* @param onError 回调错误到onError。
* @param <T> 可观察类型
*/
public <T> void restartableReplay(int restartableId, final Function0<Observable<T>> observableFactory,
final BiConsumer<View, T> onNext, @Nullable final BiConsumer<View, HttpExceptionHandle.ResponeThrowable> onError) {
restartable(restartableId, new Function0<Disposable>() {
@Override
public Disposable apply() {
return observableFactory.apply()
.compose(BaseRxPresenter.this.deliverReplay())
.subscribe(split(onNext, onError));
}
});
}
public void split(BiConsumer<View, T> onNext, @Nullable BiConsumer<View, HttpExceptionHandle.ResponeThrowable> onError) throws Exception {
if (notification.isOnNext()) {
onNext.accept(view, notification.getValue());
}else if(notification.isOnError()) {
assert onError != null;
onError.accept(view,(HttpExceptionHandle.ResponeThrowable)notification.getError());
}
}
static void processCurveTable(JsonElement file, BiConsumer<Integer, Integer> onEntry) throws Exception {
JsonElement curve = file.getAsJsonObject().get("curve");
for (JsonElement he : curve.getAsJsonArray()) {
JsonObject obj = he.getAsJsonObject();
onEntry.accept(obj.get("x").getAsInt(), obj.get("y").getAsInt());
}
}
private BiConsumer<List<Repo>, Throwable> reposHandler() {
return (repos, throwable) -> {
if (mUser != null) {
getView().updateUserData(mUser);
}
if (throwable == null) {
handleReposLoaded(repos);
} else {
handleLoadingFailed(throwable);
}
};
}
static <T> BiConsumer<T, Throwable> ok(RoutingContext rc) {
return writeJsonResponse(rc, 200);
}
static <T> BiConsumer<T, Throwable> created(RoutingContext rc) {
return writeJsonResponse(rc, 201);
}
@Override
public void initDatas(final long id) {
remindView.showProgressBar();
Single.create(new SingleOnSubscribe<Integer>() {
@Override
public void subscribe(SingleEmitter<Integer> e) throws Exception {
todayCount = mAssistDao.findRemindToday();
List<Remind> allReminds = mAssistDao.findAllRemindDesc();
for (Remind remind : allReminds) {
TaskCard<Remind> taskCard = new TaskCard<>(remind, TaskCard.TaskState.ACTIVE);
/*if (remind.getRdate().before(TimeUtils.getTodayDate())) {
taskCard.taskState = TaskCard.TaskState.INVALID;
} else if (remind.getRdate().before(TimeUtils.getTomorrow())) {
if (new SimpleDate(remind.getRtime()).toValue() < TimeUtils.getTimeValue(new Date())) {
taskCard.taskState = TaskCard.TaskState.INVALID;
}
}*/
if(remind.getFrequency()==0&&remind.getRdate().before(new Date())) {
taskCard.taskState = TaskCard.TaskState.INVALID;
}
showDatas.add(taskCard);
}
e.onSuccess(0);
}
})
.observeOn(AndroidSchedulers.mainThread()) //响应订阅(Sbscriber)所在线程
.subscribeOn(Schedulers.io()) //执行订阅(subscribe())所在线程
.subscribe(new BiConsumer<Integer, Throwable>() {
@Override
public void accept(Integer integer, Throwable throwable) throws Exception {
remindView.hideProgressBar();
remindView.notifyListView();
if (id > 0) {
for (int i = 0; i < showDatas.size(); i++) {
if (id == showDatas.get(i).t.getId()) {
remindView.moveToPosition(i + 1);
break;
}
}
}
}
});
}
private static <T> Flowable<? extends T> create(PreparedStatement ps, List<Object> parameters,
Function<? super ResultSet, T> mapper, List<String> names, String sql, int fetchSize, int queryTimeoutSec) {
log.debug("parameters={}", parameters);
log.debug("names={}", names);
Callable<ResultSet> initialState = () -> {
List<Parameter> params = Util.toParameters(parameters);
boolean hasCollection = params.stream().anyMatch(x -> x.isCollection());
final PreparedStatement ps2;
if (hasCollection) {
// create a new prepared statement with the collection ? substituted with
// ?s to match the size of the collection parameter
ps2 = Util.prepare(ps.getConnection(), fetchSize, sql, params, queryTimeoutSec);
// now wrap the rs to auto close ps2 because it is single use (the next
// collection parameter may have a different ordinality so we need to build
// a new PreparedStatement with a different number of question marks
// substituted
return new ResultSetAutoClosesStatement(Util //
.setParameters(ps2, params, names) //
.executeQuery(), ps2);
} else {
// use the current prepared statement (normal re-use)
ps2 = ps;
return Util //
.setParameters(ps2, params, names) //
.executeQuery();
}
};
BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
log.debug("getting row from ps={}, rs={}", rs.getStatement(), rs);
if (rs.next()) {
T v = mapper.apply(rs);
log.debug("emitting {}", v);
emitter.onNext(v);
} else {
log.debug("completed");
emitter.onComplete();
}
};
Consumer<ResultSet> disposeState = Util::closeSilently;
return Flowable.generate(initialState, generator, disposeState);
}
/**
* This is a shortcut for calling {@link #restartableFirst (int, Flowable, BiConsumer, BiConsumer)} with the last parameter = null.
*/
public <T> void restartableFirst(int restartableId, final Function0<Observable<T>> observableFactory, final BiConsumer<View, T> onNext) {
restartableFirst(restartableId, observableFactory, onNext, null);
}
/**
* This is a shortcut for calling {@link #restartableLatestCache (int, Function, BiConsumer, BiConsumer)} with the last parameter = null.
*/
public <T> void restartableLatestCache(int restartableId, final Function0<Observable<T>> observableFactory, final BiConsumer<View, T> onNext) {
restartableLatestCache(restartableId, observableFactory, onNext, null);
}
/**
* This is a shortcut for calling {@link #restartableReplay (int, Function, BiConsumer, BiConsumer)} with the last parameter = null.
*/
public <T> void restartableReplay(int restartableId, final Function0<Observable<T>> observableFactory, final BiConsumer<View, T> onNext) {
restartableReplay(restartableId, observableFactory, onNext, null);
}