io.reactivex.Flowable#defer ( )源码实例Demo

下面列出了io.reactivex.Flowable#defer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

public Flowable<State> dataSetState(){
    return Flowable.defer(() ->{
        State state;
        DataSetInstance dataSetInstance = d2.dataSetModule().dataSetInstances()
                .byDataSetUid().eq(dataSetUid)
                .byAttributeOptionComboUid().eq(catOptCombo)
                .byOrganisationUnitUid().eq(orgUnitUid)
                .byPeriod().eq(periodId).one().blockingGet();

        state = dataSetInstance.state();

        DataSetCompleteRegistration dscr = d2.dataSetModule().dataSetCompleteRegistrations()
                .byDataSetUid().eq(dataSetUid)
                .byAttributeOptionComboUid().eq(catOptCombo)
                .byOrganisationUnitUid().eq(orgUnitUid)
                .byPeriod().eq(periodId).one().blockingGet();

        if(state == State.SYNCED && dscr!=null){
            state = dscr.state();
        }

        return state != null ? Flowable.just(state) : Flowable.empty();
    });
}
 
源代码2 项目: rxjava2-jdbc   文件: TransactedCallableBuilder.java
private static <T> Flowable<Tx<T>> inTransaction(CallableBuilder b,
        Function<Single<Connection>, Flowable<Notification<T>>> f) {
    return Flowable.defer(() -> {
        AtomicReference<Connection> con = new AtomicReference<Connection>();
        // set the atomic reference when transactedConnection emits
        Single<Connection> transactedConnection = b.connection //
                .map(c -> Util.toTransactedConnection(con, c));
        return f.apply(transactedConnection) //
                .<Tx<T>>flatMap(n -> Tx.toTx(n, con.get(), b.db)) //
                .doOnNext(tx -> {
                    if (tx.isComplete()) {
                        ((TxImpl<T>) tx).connection().commit();
                    }
                });
    });
}
 
/**
 * Transforms the results using the given function.
 *
 * @param mapper
 *            maps the query results to an object
 * @return the results of the query as an Observable
 */
@Override
public <T> Flowable<Tx<T>> get(@Nonnull ResultSetMapper<? extends T> mapper) {
    Preconditions.checkNotNull(mapper, "mapper cannot be null");
    return Flowable.defer(() -> {
        AtomicReference<Connection> connection = new AtomicReference<Connection>();
        Flowable<T> o = Update.<T>createReturnGeneratedKeys( //
                update.updateBuilder.connections //
                        .map(c -> Util.toTransactedConnection(connection, c)),
                update.parameterGroupsToFlowable(), update.updateBuilder.sql, mapper, false);
        return o.materialize() //
                .flatMap(n -> Tx.toTx(n, connection.get(), db)) //
                .doOnNext(tx -> {
                    if (tx.isComplete()) {
                        ((TxImpl<T>) tx).connection().commit();
                    }
                });
    });
}
 
源代码4 项目: rxjava2-jdbc   文件: TransactedSelectBuilder.java
@SuppressWarnings("unchecked")
private static <T> Flowable<Tx<T>> createFlowable(SelectBuilder sb,
        ResultSetMapper<? extends T> mapper, Database db) {
    return (Flowable<Tx<T>>) (Flowable<?>) Flowable.defer(() -> {
        //Select.<T>create(connection, pg, sql, fetchSize, mapper, true);
        AtomicReference<Connection> connection = new AtomicReference<Connection>();
        Single<Connection> con = sb.connection //
                .map(c -> Util.toTransactedConnection(connection, c));
        return Select.create(con, //
                sb.parameterGroupsToFlowable(), //
                sb.sql, //
                sb.fetchSize, //
                mapper, //
                false, //
                sb.queryTimeoutSec) //
                .materialize() //
                .flatMap(n -> Tx.toTx(n, connection.get(), db)) //
                .doOnNext(tx -> {
                    if (tx.isComplete()) {
                        ((TxImpl<T>) tx).connection().commit();
                    }
                });
    });
}
 
private static <T> Flowable<Tx<T>> createFlowable(SelectAutomappedBuilder<T> sb, Database db) {
    return Flowable.defer(() -> {
        AtomicReference<Connection> connection = new AtomicReference<Connection>();
        return Select.create(sb.selectBuilder.connection //
                .map(c -> Util.toTransactedConnection(connection, c)), //
                sb.selectBuilder.parameterGroupsToFlowable(), //
                sb.selectBuilder.sql, //
                sb.selectBuilder.fetchSize, //
                Util.autoMap(sb.cls), //
                false, //
                sb.selectBuilder.queryTimeoutSec) //
                .materialize() //
                .flatMap(n -> Tx.toTx(n, connection.get(), db)) //
                .doOnNext(tx -> {
                    if (tx.isComplete()) {
                        ((TxImpl<T>) tx).connection().commit();
                    }
                });
    });
}
 
源代码6 项目: mimi-reader   文件: UserPostTableConnection.java
public static Flowable<Boolean> addPost(final String boardName, final long threadId, final long postId) {

        return Flowable.defer((Callable<Flowable<Boolean>>) () -> {
            long val = 0;
            BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
            BriteDatabase.Transaction transaction = db.newTransaction();

            try {
                UserPost userPost = new UserPost();
                userPost.boardName = boardName;
                userPost.threadId = threadId;
                userPost.postId = postId;
                userPost.postTime = System.currentTimeMillis();

                val = DatabaseUtils.insert(db, userPost);

                transaction.markSuccessful();
            } catch (Exception e) {
                Log.e(LOG_TAG, "Error saving user post data", e);
            } finally {
                transaction.end();
            }
            return Flowable.just(val > 0);
        });
    }
 
源代码7 项目: mimi-reader   文件: UserPostTableConnection.java
public static Flowable<Boolean> removePost(final String boardName, final long threadId, final long postId) {

        return Flowable.defer((Callable<Flowable<Boolean>>) () -> {
            long val = 0;
            BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
            BriteDatabase.Transaction transaction = db.newTransaction();

            try {
                UserPost userPost = new UserPost();
                userPost.boardName = boardName;
                userPost.threadId = threadId;
                userPost.postId = postId;
                userPost.postTime = System.currentTimeMillis();

                val = DatabaseUtils.delete(db, userPost);

                transaction.markSuccessful();
            } catch (Exception e) {
                Log.e(LOG_TAG, "Error saving user post data", e);
            } finally {
                transaction.end();
            }
            return Flowable.just(val > 0);
        });
    }
 
源代码8 项目: HaoReader   文件: Compressor.java
public Flowable<File> compressToFileAsFlowable(final File imageFile, final String compressedFileName) {
    return Flowable.defer((Callable<Flowable<File>>) () -> {
        try {
            return Flowable.just(compressToFile(imageFile, compressedFileName));
        } catch (IOException e) {
            return Flowable.error(e);
        }
    });
}
 
源代码9 项目: HaoReader   文件: Compressor.java
public Flowable<Bitmap> compressToBitmapAsFlowable(final File imageFile) {
    return Flowable.defer((Callable<Flowable<Bitmap>>) () -> {
        try {
            return Flowable.just(compressToBitmap(imageFile));
        } catch (IOException e) {
            return Flowable.error(e);
        }
    });
}
 
@Override
public <I, O> ProcessingStage<I, O> create(Engine engine, Stage.ProcessorStage stage) {
    Processor<I, O> processor = Casts.cast(Objects.requireNonNull(
            Objects.requireNonNull(stage).getRsProcessor()));

    return source -> Flowable.defer(() -> {
        Flowable<O> flowable = Flowable.fromPublisher(processor);
        source.safeSubscribe(processor);
        return flowable;
    });
}
 
源代码11 项目: Resizer   文件: Resizer.java
/**
 * Get the resized image file as RxJava Flowable.
 * @return A Flowable that emits the resized image file or error.
 */
public Flowable<File> getResizedFileAsFlowable() {
    return Flowable.defer(new Callable<Flowable<File>>() {
        @Override
        public Flowable<File> call() {
            try {
                return Flowable.just(getResizedFile());
            } catch (IOException e) {
                return Flowable.error(e);
            }
        }
    });
}
 
源代码12 项目: Resizer   文件: Resizer.java
/**
 * Get the resized image bitmap as RxJava Flowable.
 * @return A Flowable that emits the resized image bitmap or error.
 */
public Flowable<Bitmap> getResizedBitmapAsFlowable() {
    return Flowable.defer(new Callable<Flowable<Bitmap>>() {
        @Override
        public Flowable<Bitmap> call() {
            try {
                return Flowable.just(getResizedBitmap());
            } catch (IOException e) {
                return Flowable.error(e);
            }
        }
    });
}
 
源代码13 项目: RetroMusicPlayer   文件: Compressor.java
public Flowable<File> compressToFileAsFlowable(final File imageFile, final String compressedFileName) {
    return Flowable.defer(() -> {
        try {
            return Flowable.just(compressToFile(imageFile, compressedFileName));
        } catch (IOException e) {
            return Flowable.error(e);
        }
    });
}
 
源代码14 项目: RetroMusicPlayer   文件: Compressor.java
public Flowable<Bitmap> compressToBitmapAsFlowable(final File imageFile) {
    return Flowable.defer(() -> {
        try {
            return Flowable.just(compressToBitmap(imageFile));
        } catch (IOException e) {
            return Flowable.error(e);
        }
    });
}
 
源代码15 项目: rxjava2-jdbc   文件: Update.java
private static Flowable<Integer> createExecuteBatch(NamedPreparedStatement ps,
        List<Object> parameters) {
    return Flowable.defer(() -> {
        Util.convertAndSetParameters(ps.ps, parameters, ps.names);
        ps.ps.addBatch();
        log.debug("batch added with {}", parameters);
        Flowable<Integer> o = toFlowable(ps.ps.executeBatch());
        log.debug("batch executed");
        return o;
    });
}
 
源代码16 项目: mimi-reader   文件: HistoryTableConnection.java
public static Flowable<Boolean> setHistoryRemovedStatus(final String boardName, final long threadId, final boolean removed) {
    return Flowable.defer((Callable<Flowable<Boolean>>) () -> {

        BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
        BriteDatabase.Transaction transaction = db.newTransaction();

        int val = 0;

        try {
            ContentValues values = new ContentValues();
            values.put(History.KEY_THREAD_REMOVED, removed);

            val = db.update(History.TABLE_NAME, SQLiteDatabase.CONFLICT_REPLACE, values, History.KEY_THREAD_ID + "=?", String.valueOf(threadId));
            transaction.markSuccessful();
        } catch (Exception e) {
            Log.e(LOG_TAG, "Error deleting history: name=" + boardName + ", thread=" + threadId, e);
        } finally {
            transaction.end();
        }

        return Flowable.just(val > 0)
                .onErrorReturn(throwable -> {
                    Log.e(LOG_TAG, "Error deleting history: name=" + boardName + ", thread=" + threadId, throwable);
                    return false;
                })
                .compose(DatabaseUtils.<Boolean>applySchedulers());
    });
}
 
源代码17 项目: mimi-reader   文件: DatabaseUtils.java
public static <K extends BaseModel> Flowable<Boolean> insert(final List<K> models) {

        return Flowable.defer((Callable<Flowable<Boolean>>) () -> {
            long val = insertModels(models);
            return Flowable.just(val > 0);
        });
    }
 
源代码18 项目: mimi-reader   文件: DatabaseUtils.java
public static <K extends BaseModel> Flowable<Boolean> remove(final K model, WhereArg wheres) {

        return Flowable.defer((Callable<Flowable<Boolean>>) () -> {
            BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
            long val = db.delete(model.getTableName(), wheres.where, wheres.where);

            return Flowable.just(val >= 0);
        });
    }
 
源代码19 项目: state-machine   文件: Processor.java
@SuppressWarnings({ "rawtypes", "unchecked" })
public Flowable<EntityStateMachine<?, Id>> flowable() {
    return Flowable.defer(() -> {
        Worker worker = signalScheduler.createWorker();
        Flowable<Signal<?, Id>> o0 = subject //
                .toSerialized() //
                .toFlowable(BackpressureStrategy.BUFFER) //
                .mergeWith(signals) //
                .doOnCancel(() -> worker.dispose()) //
                .compose(preGroupBy);
        Flowable<GroupedFlowable<ClassId<?, Id>, Signal<?, Id>>> o;
        if (mapFactory != null) {
            o = o0.groupBy(signal -> new ClassId(signal.cls(),
             signal.id()), x -> x, true, 16, mapFactory);
        } else {
            o = o0.groupBy(signal -> new ClassId(signal.cls(), signal.id()),
                    Functions.identity());
        }
        return o.flatMap(g -> {
            Flowable<EntityStateMachine<?, Id>> obs = g //
                    .flatMap(processSignalsToSelfAndSendSignalsToOthers(worker, g.getKey())) //
                    .doOnNext(m -> stateMachines.put(g.getKey(), m)) //
                    .subscribeOn(processingScheduler); //

            Flowable<EntityStateMachine<?, Id>> res = entityTransform
                    .apply(grouped(g.getKey(), obs));
            return res;
        });
    });
}
 
public static <T> Flowable<T> create(final BiFunction<? super Long, ? super Long, ? extends Flowable<T>> fetch,
        final long start, final int maxConcurrency) {
    return Flowable.defer(new Callable<Flowable<T>>() {
        @Override
        public Flowable<T> call() throws Exception {
            // need a ReplaySubject because multiple requests can come
            // through before concatEager has established subscriptions to
            // the subject
            final ReplaySubject<Flowable<T>> subject = ReplaySubject.create();
            final AtomicLong position = new AtomicLong(start);
            LongConsumer request = new LongConsumer() {
                @Override
                public void accept(final long n) throws Exception {
                    final long pos = position.getAndAdd(n);
                    if (SubscriptionHelper.validate(n)) {
                        Flowable<T> flowable;
                        try {
                            flowable = fetch.apply(pos, n);
                        } catch (Throwable e) {
                            Exceptions.throwIfFatal(e);
                            subject.onError(e);
                            return;
                        }
                        // reduce allocations by incorporating the onNext
                        // and onComplete actions into the mutable count
                        // object
                        final Count count = new Count(subject, n);
                        flowable = flowable //
                                .doOnNext(count) //
                                .doOnComplete(count);
                        subject.onNext(flowable);
                    }
                }
            };
            return Flowable //
                    .concatEager(subject.serialize() //
                            .toFlowable(BackpressureStrategy.BUFFER), maxConcurrency, 128) //
                    .doOnRequest(request);
        }
    });
}