下面列出了io.reactivex.Flowable#defer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public Flowable<State> dataSetState(){
return Flowable.defer(() ->{
State state;
DataSetInstance dataSetInstance = d2.dataSetModule().dataSetInstances()
.byDataSetUid().eq(dataSetUid)
.byAttributeOptionComboUid().eq(catOptCombo)
.byOrganisationUnitUid().eq(orgUnitUid)
.byPeriod().eq(periodId).one().blockingGet();
state = dataSetInstance.state();
DataSetCompleteRegistration dscr = d2.dataSetModule().dataSetCompleteRegistrations()
.byDataSetUid().eq(dataSetUid)
.byAttributeOptionComboUid().eq(catOptCombo)
.byOrganisationUnitUid().eq(orgUnitUid)
.byPeriod().eq(periodId).one().blockingGet();
if(state == State.SYNCED && dscr!=null){
state = dscr.state();
}
return state != null ? Flowable.just(state) : Flowable.empty();
});
}
private static <T> Flowable<Tx<T>> inTransaction(CallableBuilder b,
Function<Single<Connection>, Flowable<Notification<T>>> f) {
return Flowable.defer(() -> {
AtomicReference<Connection> con = new AtomicReference<Connection>();
// set the atomic reference when transactedConnection emits
Single<Connection> transactedConnection = b.connection //
.map(c -> Util.toTransactedConnection(con, c));
return f.apply(transactedConnection) //
.<Tx<T>>flatMap(n -> Tx.toTx(n, con.get(), b.db)) //
.doOnNext(tx -> {
if (tx.isComplete()) {
((TxImpl<T>) tx).connection().commit();
}
});
});
}
/**
* Transforms the results using the given function.
*
* @param mapper
* maps the query results to an object
* @return the results of the query as an Observable
*/
@Override
public <T> Flowable<Tx<T>> get(@Nonnull ResultSetMapper<? extends T> mapper) {
Preconditions.checkNotNull(mapper, "mapper cannot be null");
return Flowable.defer(() -> {
AtomicReference<Connection> connection = new AtomicReference<Connection>();
Flowable<T> o = Update.<T>createReturnGeneratedKeys( //
update.updateBuilder.connections //
.map(c -> Util.toTransactedConnection(connection, c)),
update.parameterGroupsToFlowable(), update.updateBuilder.sql, mapper, false);
return o.materialize() //
.flatMap(n -> Tx.toTx(n, connection.get(), db)) //
.doOnNext(tx -> {
if (tx.isComplete()) {
((TxImpl<T>) tx).connection().commit();
}
});
});
}
@SuppressWarnings("unchecked")
private static <T> Flowable<Tx<T>> createFlowable(SelectBuilder sb,
ResultSetMapper<? extends T> mapper, Database db) {
return (Flowable<Tx<T>>) (Flowable<?>) Flowable.defer(() -> {
//Select.<T>create(connection, pg, sql, fetchSize, mapper, true);
AtomicReference<Connection> connection = new AtomicReference<Connection>();
Single<Connection> con = sb.connection //
.map(c -> Util.toTransactedConnection(connection, c));
return Select.create(con, //
sb.parameterGroupsToFlowable(), //
sb.sql, //
sb.fetchSize, //
mapper, //
false, //
sb.queryTimeoutSec) //
.materialize() //
.flatMap(n -> Tx.toTx(n, connection.get(), db)) //
.doOnNext(tx -> {
if (tx.isComplete()) {
((TxImpl<T>) tx).connection().commit();
}
});
});
}
private static <T> Flowable<Tx<T>> createFlowable(SelectAutomappedBuilder<T> sb, Database db) {
return Flowable.defer(() -> {
AtomicReference<Connection> connection = new AtomicReference<Connection>();
return Select.create(sb.selectBuilder.connection //
.map(c -> Util.toTransactedConnection(connection, c)), //
sb.selectBuilder.parameterGroupsToFlowable(), //
sb.selectBuilder.sql, //
sb.selectBuilder.fetchSize, //
Util.autoMap(sb.cls), //
false, //
sb.selectBuilder.queryTimeoutSec) //
.materialize() //
.flatMap(n -> Tx.toTx(n, connection.get(), db)) //
.doOnNext(tx -> {
if (tx.isComplete()) {
((TxImpl<T>) tx).connection().commit();
}
});
});
}
public static Flowable<Boolean> addPost(final String boardName, final long threadId, final long postId) {
return Flowable.defer((Callable<Flowable<Boolean>>) () -> {
long val = 0;
BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
BriteDatabase.Transaction transaction = db.newTransaction();
try {
UserPost userPost = new UserPost();
userPost.boardName = boardName;
userPost.threadId = threadId;
userPost.postId = postId;
userPost.postTime = System.currentTimeMillis();
val = DatabaseUtils.insert(db, userPost);
transaction.markSuccessful();
} catch (Exception e) {
Log.e(LOG_TAG, "Error saving user post data", e);
} finally {
transaction.end();
}
return Flowable.just(val > 0);
});
}
public static Flowable<Boolean> removePost(final String boardName, final long threadId, final long postId) {
return Flowable.defer((Callable<Flowable<Boolean>>) () -> {
long val = 0;
BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
BriteDatabase.Transaction transaction = db.newTransaction();
try {
UserPost userPost = new UserPost();
userPost.boardName = boardName;
userPost.threadId = threadId;
userPost.postId = postId;
userPost.postTime = System.currentTimeMillis();
val = DatabaseUtils.delete(db, userPost);
transaction.markSuccessful();
} catch (Exception e) {
Log.e(LOG_TAG, "Error saving user post data", e);
} finally {
transaction.end();
}
return Flowable.just(val > 0);
});
}
public Flowable<File> compressToFileAsFlowable(final File imageFile, final String compressedFileName) {
return Flowable.defer((Callable<Flowable<File>>) () -> {
try {
return Flowable.just(compressToFile(imageFile, compressedFileName));
} catch (IOException e) {
return Flowable.error(e);
}
});
}
public Flowable<Bitmap> compressToBitmapAsFlowable(final File imageFile) {
return Flowable.defer((Callable<Flowable<Bitmap>>) () -> {
try {
return Flowable.just(compressToBitmap(imageFile));
} catch (IOException e) {
return Flowable.error(e);
}
});
}
@Override
public <I, O> ProcessingStage<I, O> create(Engine engine, Stage.ProcessorStage stage) {
Processor<I, O> processor = Casts.cast(Objects.requireNonNull(
Objects.requireNonNull(stage).getRsProcessor()));
return source -> Flowable.defer(() -> {
Flowable<O> flowable = Flowable.fromPublisher(processor);
source.safeSubscribe(processor);
return flowable;
});
}
/**
* Get the resized image file as RxJava Flowable.
* @return A Flowable that emits the resized image file or error.
*/
public Flowable<File> getResizedFileAsFlowable() {
return Flowable.defer(new Callable<Flowable<File>>() {
@Override
public Flowable<File> call() {
try {
return Flowable.just(getResizedFile());
} catch (IOException e) {
return Flowable.error(e);
}
}
});
}
/**
* Get the resized image bitmap as RxJava Flowable.
* @return A Flowable that emits the resized image bitmap or error.
*/
public Flowable<Bitmap> getResizedBitmapAsFlowable() {
return Flowable.defer(new Callable<Flowable<Bitmap>>() {
@Override
public Flowable<Bitmap> call() {
try {
return Flowable.just(getResizedBitmap());
} catch (IOException e) {
return Flowable.error(e);
}
}
});
}
public Flowable<File> compressToFileAsFlowable(final File imageFile, final String compressedFileName) {
return Flowable.defer(() -> {
try {
return Flowable.just(compressToFile(imageFile, compressedFileName));
} catch (IOException e) {
return Flowable.error(e);
}
});
}
public Flowable<Bitmap> compressToBitmapAsFlowable(final File imageFile) {
return Flowable.defer(() -> {
try {
return Flowable.just(compressToBitmap(imageFile));
} catch (IOException e) {
return Flowable.error(e);
}
});
}
private static Flowable<Integer> createExecuteBatch(NamedPreparedStatement ps,
List<Object> parameters) {
return Flowable.defer(() -> {
Util.convertAndSetParameters(ps.ps, parameters, ps.names);
ps.ps.addBatch();
log.debug("batch added with {}", parameters);
Flowable<Integer> o = toFlowable(ps.ps.executeBatch());
log.debug("batch executed");
return o;
});
}
public static Flowable<Boolean> setHistoryRemovedStatus(final String boardName, final long threadId, final boolean removed) {
return Flowable.defer((Callable<Flowable<Boolean>>) () -> {
BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
BriteDatabase.Transaction transaction = db.newTransaction();
int val = 0;
try {
ContentValues values = new ContentValues();
values.put(History.KEY_THREAD_REMOVED, removed);
val = db.update(History.TABLE_NAME, SQLiteDatabase.CONFLICT_REPLACE, values, History.KEY_THREAD_ID + "=?", String.valueOf(threadId));
transaction.markSuccessful();
} catch (Exception e) {
Log.e(LOG_TAG, "Error deleting history: name=" + boardName + ", thread=" + threadId, e);
} finally {
transaction.end();
}
return Flowable.just(val > 0)
.onErrorReturn(throwable -> {
Log.e(LOG_TAG, "Error deleting history: name=" + boardName + ", thread=" + threadId, throwable);
return false;
})
.compose(DatabaseUtils.<Boolean>applySchedulers());
});
}
public static <K extends BaseModel> Flowable<Boolean> insert(final List<K> models) {
return Flowable.defer((Callable<Flowable<Boolean>>) () -> {
long val = insertModels(models);
return Flowable.just(val > 0);
});
}
public static <K extends BaseModel> Flowable<Boolean> remove(final K model, WhereArg wheres) {
return Flowable.defer((Callable<Flowable<Boolean>>) () -> {
BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
long val = db.delete(model.getTableName(), wheres.where, wheres.where);
return Flowable.just(val >= 0);
});
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Flowable<EntityStateMachine<?, Id>> flowable() {
return Flowable.defer(() -> {
Worker worker = signalScheduler.createWorker();
Flowable<Signal<?, Id>> o0 = subject //
.toSerialized() //
.toFlowable(BackpressureStrategy.BUFFER) //
.mergeWith(signals) //
.doOnCancel(() -> worker.dispose()) //
.compose(preGroupBy);
Flowable<GroupedFlowable<ClassId<?, Id>, Signal<?, Id>>> o;
if (mapFactory != null) {
o = o0.groupBy(signal -> new ClassId(signal.cls(),
signal.id()), x -> x, true, 16, mapFactory);
} else {
o = o0.groupBy(signal -> new ClassId(signal.cls(), signal.id()),
Functions.identity());
}
return o.flatMap(g -> {
Flowable<EntityStateMachine<?, Id>> obs = g //
.flatMap(processSignalsToSelfAndSendSignalsToOthers(worker, g.getKey())) //
.doOnNext(m -> stateMachines.put(g.getKey(), m)) //
.subscribeOn(processingScheduler); //
Flowable<EntityStateMachine<?, Id>> res = entityTransform
.apply(grouped(g.getKey(), obs));
return res;
});
});
}
public static <T> Flowable<T> create(final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch,
final long start, final int maxConcurrency) {
return Flowable.defer(new Callable<Flowable<T>>() {
@Override
public Flowable<T> call() throws Exception {
// need a ReplaySubject because multiple requests can come
// through before concatEager has established subscriptions to
// the subject
final ReplaySubject<Flowable<T>> subject = ReplaySubject.create();
final AtomicLong position = new AtomicLong(start);
LongConsumer request = new LongConsumer() {
@Override
public void accept(final long n) throws Exception {
final long pos = position.getAndAdd(n);
if (SubscriptionHelper.validate(n)) {
Flowable<T> flowable;
try {
flowable = fetch.apply(pos, n);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
subject.onError(e);
return;
}
// reduce allocations by incorporating the onNext
// and onComplete actions into the mutable count
// object
final Count count = new Count(subject, n);
flowable = flowable //
.doOnNext(count) //
.doOnComplete(count);
subject.onNext(flowable);
}
}
};
return Flowable //
.concatEager(subject.serialize() //
.toFlowable(BackpressureStrategy.BUFFER), maxConcurrency, 128) //
.doOnRequest(request);
}
});
}