类io.reactivex.Emitter源码实例Demo

下面列出了怎么用io.reactivex.Emitter的API类实例代码及写法,或者点击链接到github查看源代码。

public static <T> Flowable<T> create(Callable<Connection> connectionFactory, List<Object> parameters, String sql,
                                     Function<? super ResultSet, T> mapper) {
    Callable<ResultSet> initialState = () -> {
        Connection con = connectionFactory.call();
        PreparedStatement ps = con.prepareStatement(sql);
        // TODO set parameters
        ResultSet rs = ps.executeQuery();
        return rs;
    };
    BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
        if (rs.next()) {
            emitter.onNext(mapper.apply(rs));
        } else {
            emitter.onComplete();
        }
    };
    Consumer<ResultSet> disposeState = FlowableSelect::closeSilently;
    return Flowable.generate(initialState, generator, disposeState);
}
 
@Override
public Flowable<String> getRepos(String user) {
    Callable<Iterator<String>> initialState =
            () -> gitHbubRepos.getRepos(user)
                              .stream()
                              .map(Repository::getName).iterator();
    BiConsumer<Iterator<String>, Emitter<String>> generator =
            (iterator, emitter) -> {
                if (iterator.hasNext()) {
                    emitter.onNext(iterator.next() + "  ");
                } else {
                    emitter.onComplete();
                }
            };
    return Flowable.generate(initialState, generator);
}
 
源代码3 项目: Java-9-Spring-Webflux   文件: GitHubServiceImpl.java
@Override
public Flowable<String> getRepos(String user) {
    Callable<Iterator<String>> initialState =
            () -> gitHbubRepos.getRepos(user)
                              .stream()
                              .map(Repository::getName).iterator();
    BiConsumer<Iterator<String>, Emitter<String>> generator =
            (iterator, emitter) -> {
                if (iterator.hasNext()) {
                    emitter.onNext(iterator.next() + "  ");
                } else {
                    emitter.onComplete();
                }
            };
    return Flowable.generate(initialState, generator);
}
 
源代码4 项目: rxjava2-jdbc   文件: Update.java
private static <T> Flowable<T> create(NamedPreparedStatement ps, List<Object> parameters,
        Function<? super ResultSet, T> mapper) {
    Callable<ResultSet> initialState = () -> {
        Util.convertAndSetParameters(ps.ps, parameters, ps.names);
        ps.ps.execute();
        return ps.ps.getGeneratedKeys();
    };
    BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
        if (rs.next()) {
            emitter.onNext(mapper.apply(rs));
        } else {
            emitter.onComplete();
        }
    };
    Consumer<ResultSet> disposer = Util::closeSilently;
    return Flowable.generate(initialState, generator, disposer);
}
 
源代码5 项目: rxjava2-jdbc   文件: Call.java
private static <T> Flowable<T> createFlowable(NamedCallableStatement stmt,
        Function<? super ResultSet, ? extends T> f) throws SQLException {
    ResultSet rsActual = stmt.stmt.getResultSet();
    Callable<ResultSet> initialState = () -> rsActual;
    BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
        log.debug("getting row from ps={}, rs={}", stmt.stmt, rs);
        if (rs.next()) {
            T v = f.apply(rs);
            log.debug("emitting {}", v);
            emitter.onNext(v);
        } else {
            log.debug("completed");
            emitter.onComplete();
        }
    };
    Consumer<ResultSet> disposeState = Util::closeSilently;
    return Flowable.generate(initialState, generator, disposeState);
}
 
源代码6 项目: RxAndroidBle   文件: ByteArrayBatchObservable.java
@Override
protected void subscribeActual(Subscriber<? super byte[]> subscriber) {
    Flowable.generate(new Consumer<Emitter<byte[]>>() {

        @Override
        public void accept(Emitter<byte[]> emitter) {
            int nextBatchSize = Math.min(byteBuffer.remaining(), maxBatchSize);
            if (nextBatchSize == 0) {
                emitter.onComplete();
                return;
            }
            final byte[] nextBatch = new byte[nextBatchSize];
            byteBuffer.get(nextBatch);
            emitter.onNext(nextBatch);
        }
    }).subscribe(subscriber);
}
 
源代码7 项目: rxjava2-extras   文件: Bytes.java
/**
 * Returns a Flowable stream of byte arrays from the given
 * {@link InputStream} between 1 and {@code bufferSize} bytes.
 * 
 * @param is
 *            input stream of bytes
 * @param bufferSize
 *            max emitted byte array size
 * @return a stream of byte arrays
 */
public static Flowable<byte[]> from(final InputStream is, final int bufferSize) {
    return Flowable.generate(new Consumer<Emitter<byte[]>>() {
        @Override
        public void accept(Emitter<byte[]> emitter) throws Exception {
            byte[] buffer = new byte[bufferSize];
            int count = is.read(buffer);
            if (count == -1) {
                emitter.onComplete();
            } else if (count < bufferSize) {
                emitter.onNext(Arrays.copyOf(buffer, count));
            } else {
                emitter.onNext(buffer);
            }
        }
    });
}
 
源代码8 项目: rxjava2-extras   文件: Bytes.java
public static Flowable<ZippedEntry> unzip(final ZipInputStream zis) {

        return Flowable.generate(new Consumer<Emitter<ZippedEntry>>() {
            @Override
            public void accept(Emitter<ZippedEntry> emitter) throws IOException {
                ZipEntry zipEntry = zis.getNextEntry();
                if (zipEntry != null) {
                    emitter.onNext(new ZippedEntry(zipEntry, zis));
                } else {
                    // end of stream so eagerly close the stream (might not be a
                    // good idea since this method did not create the zis
                    zis.close();
                    emitter.onComplete();
                }
            }
        });

    }
 
源代码9 项目: rxjava2-extras   文件: Serialized.java
public <T> Flowable<T> read(final Class<T> cls, final Input input) {

            return Flowable.generate(new Consumer<Emitter<T>>() {

                @Override
                public void accept(Emitter<T> emitter) throws Exception {
                    if (input.eof()) {
                        emitter.onComplete();
                    } else {
                        T t = kryo.readObject(input, cls);
                        emitter.onNext(t);
                    }
                }

            });
        }
 
源代码10 项目: Mysplash   文件: IntroduceActivity.java
@Override
public void handleBackPressed() {
    // double click to exit.
    if (backPressed) {
        finishSelf(true);
    } else {
        backPressed = true;
        NotificationHelper.showSnackbar(this, getString(R.string.feedback_click_again_to_exit));

        Observable.create(Emitter::onComplete)
                .compose(RxLifecycle.bind(this).disposeObservableWhen(LifecycleEvent.DESTROY))
                .delay(2, TimeUnit.SECONDS)
                .doOnComplete(() -> backPressed = false)
                .subscribe();
    }
}
 
源代码11 项目: Mysplash   文件: CustomApiActivity.java
@Override
public void handleBackPressed() {
    // double click to exit.
    if (backPressed) {
        finishSelf(true);
    } else {
        backPressed = true;
        NotificationHelper.showSnackbar(
                this, getString(R.string.feedback_click_again_to_exit));

        Observable.create(Emitter::onComplete)
                .compose(RxLifecycle.bind(this).disposeObservableWhen(LifecycleEvent.DESTROY))
                .delay(2, TimeUnit.SECONDS)
                .doOnComplete(() -> backPressed = false)
                .subscribe();
    }
}
 
源代码12 项目: Mysplash   文件: ProfileDialog.java
@Override
public void onFailed() {
    if (getActivity() == null) {
        return;
    }

    if (!TextUtils.isEmpty(username)) {
        Observable.create(Emitter::onComplete)
                .compose(
                        RxLifecycleCompact.bind(ProfileDialog.this)
                                .disposeObservableWhen(LifecycleEvent.DESTROY)
                ).delay(2, TimeUnit.SECONDS)
                .doOnComplete(() -> service.requestUserProfile(username, new ProfileCallback()))
                .subscribe();
    }
}
 
源代码13 项目: a   文件: BookChapterList.java
private void finish(List<BookChapterBean> chapterList, Emitter<List<BookChapterBean>> emitter) {
    //去除重复,保留后面的,先倒序,从后面往前判断
    if (!dx) {
        Collections.reverse(chapterList);
    }
    LinkedHashSet<BookChapterBean> lh = new LinkedHashSet<>(chapterList);
    chapterList = new ArrayList<>(lh);
    Collections.reverse(chapterList);
    Debug.printLog(tag, 1, "-目录解析完成", analyzeNextUrl);
    emitter.onNext(chapterList);
    emitter.onComplete();
}
 
private static <T> Flowable<T> create(PreparedStatement ps, Function<? super ResultSet, T> mapper) {
    Callable<ResultSet> initialState = () -> {
        ps.execute();
        return ps.getGeneratedKeys();
    };
    BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
        if (rs.next()) {
            emitter.onNext(mapper.apply(rs));
        } else {
            emitter.onComplete();
        }
    };
    Consumer<ResultSet> disposer = Update::closeAll;
    return Flowable.generate(initialState, generator, disposer);
}
 
@Override
public Flowable<Repository> getRepos0(String user) {
    Callable<Iterator<Repository>> initialState =
            gitHbubRepos.getRepos(user)
                        .stream()::iterator;
    BiConsumer<Iterator<Repository>, Emitter<Repository>> generator =
            (iterator, emitter) -> {
                if (iterator.hasNext()) {
                    emitter.onNext(iterator.next());
                } else {
                    emitter.onComplete();
                }
            };
    return Flowable.generate(initialState, generator);
}
 
源代码16 项目: MyBookshelf   文件: BookChapterList.java
private void finish(List<BookChapterBean> chapterList, Emitter<List<BookChapterBean>> emitter) {
    //去除重复,保留后面的,先倒序,从后面往前判断
    if (!dx) {
        Collections.reverse(chapterList);
    }
    LinkedHashSet<BookChapterBean> lh = new LinkedHashSet<>(chapterList);
    chapterList = new ArrayList<>(lh);
    Collections.reverse(chapterList);
    Debug.printLog(tag, 1, "-目录解析完成", analyzeNextUrl);
    emitter.onNext(chapterList);
    emitter.onComplete();
}
 
static <X> void toStreamEvents(CompletionStage<X> cs, Emitter<Object> emitter) {
    cs.whenComplete((X res, Throwable err) -> {
        if (res != null) {
            emitter.onNext(res);
            emitter.onComplete();
        } else {
            if (err != null) {
                emitter.onError(err instanceof CompletionException ? err.getCause() : err);
            } else {
                emitter.onComplete();
            }
        }
    });
}
 
源代码18 项目: Java-9-Spring-Webflux   文件: GitHubServiceImpl.java
@Override
public Flowable<Repository> getRepos0(String user) {
    Callable<Iterator<Repository>> initialState =
            gitHbubRepos.getRepos(user)
                        .stream()::iterator;
    BiConsumer<Iterator<Repository>, Emitter<Repository>> generator =
            (iterator, emitter) -> {
                if (iterator.hasNext()) {
                    emitter.onNext(iterator.next());
                } else {
                    emitter.onComplete();
                }
            };
    return Flowable.generate(initialState, generator);
}
 
源代码19 项目: RxAndroidBle   文件: DisconnectOperation.java
@SuppressWarnings("WeakerAccess")
@RestrictTo(RestrictTo.Scope.SUBCLASSES)
void considerGattDisconnected(
        final Emitter<Void> emitter,
        final QueueReleaseInterface queueReleaseInterface
) {
    connectionStateChangeListener.onConnectionStateChange(DISCONNECTED);
    queueReleaseInterface.release();
    emitter.onComplete();
}
 
源代码20 项目: rxjava2-extras   文件: FlowableServerSocket.java
private static Flowable<Flowable<byte[]>> createServerSocketFlowable(final ServerSocket serverSocket,
        final long timeoutMs, final int bufferSize, final Action preAcceptAction,
        final Predicate<? super Socket> acceptSocket) {
    return Flowable.generate( //
            new Consumer<Emitter<Flowable<byte[]>>>() {
                @Override
                public void accept(Emitter<Flowable<byte[]>> emitter) throws Exception {
                    acceptConnection(timeoutMs, bufferSize, serverSocket, emitter, preAcceptAction, acceptSocket);
                }
            });
}
 
源代码21 项目: rxjava2-extras   文件: Strings.java
public static Flowable<String> from(final Reader reader, final int bufferSize) {
    return Flowable.generate(new Consumer<Emitter<String>>() {
        final char[] buffer = new char[bufferSize];

        @Override
        public void accept(Emitter<String> emitter) throws Exception {
            int count = reader.read(buffer);
            if (count == -1) {
                emitter.onComplete();
            } else {
                emitter.onNext(String.valueOf(buffer, 0, count));
            }
        }
    });
}
 
源代码22 项目: rxjava2-extras   文件: Benchmarks.java
@Override
public Publisher<? extends Integer> call() throws Exception {
    return Flowable.generate(new Consumer<Emitter<Integer>>() {
        final int[] count = new int[1];

        @Override
        public void accept(Emitter<Integer> emitter) throws Exception {
            count[0]++;
            emitter.onNext(count[0]);
            if (count[0] == 1000) {
                emitter.onComplete();
            }
        }
    });
}
 
源代码23 项目: state-machine   文件: Processor.java
@Override
public void accept(Signals<Id> signals, Emitter<EntityStateMachine<?, Id>> observer)
        throws Exception {
    @SuppressWarnings("unchecked")
    Event<Object> event = (Event<Object>) signals.signalsToSelf.pollLast();
    if (event != null) {
        applySignalToSelf(signals, observer, event);
    } else {
        applySignalsToOthers(classId, worker, signals);
        observer.onComplete();
    }
}
 
源代码24 项目: Mysplash   文件: MeActivityModel.java
@Override
public void onUpdateFailed() {
    if (AuthManager.getInstance().getUser() == null) {
        timerDisposable = Observable.create(Emitter::onComplete)
                .delay(DEFAULT_REQUEST_INTERVAL_SECOND, TimeUnit.SECONDS)
                .doOnComplete(() -> AuthManager.getInstance().requestPersonalProfile())
                .subscribe();
    } else {
        setResource(Resource.success(AuthManager.getInstance().getUser()));
    }
}
 
源代码25 项目: Mysplash   文件: UpdateMeActivity.java
@Override
public void handleBackPressed() {
    if (state == INPUT_STATE && backPressed) {
        finishSelf(true);
    } else if (state == INPUT_STATE) {
        backPressed = true;
        NotificationHelper.showSnackbar(this, getString(R.string.feedback_click_again_to_exit));

        Observable.create(Emitter::onComplete)
                .compose(RxLifecycle.bind(this).disposeObservableWhen(LifecycleEvent.DESTROY))
                .delay(2, TimeUnit.SECONDS)
                .doOnComplete(() -> backPressed = false)
                .subscribe();
    }
}
 
源代码26 项目: Mysplash   文件: SelectCollectionDialog.java
private void requestCollections(String username) {
    collectionService.cancel();
    setLoading(true);

    int perPage = ListPager.DEFAULT_PER_PAGE;
    collectionService.requestUserCollections(username, page + 1, perPage, new BaseObserver<List<Collection>>() {
        @Override
        public void onSucceed(List<Collection> collectionList) {
            page ++;

            setLoading(false);

            if (collectionList.size() > 0) {
                AuthManager.getInstance()
                        .getCollectionsManager()
                        .addCollections(collectionList);
                updatePhotoAdapter();
            }
            if (collectionList.size() < ListPager.DEFAULT_PER_PAGE) {
                AuthManager.getInstance().getCollectionsManager().setLoadFinish(true);
            }
        }

        @Override
        public void onFailed() {
            setLoading(false);
            Observable.create(Emitter::onComplete)
                    .compose(
                            RxLifecycleCompact.bind(SelectCollectionDialog.this)
                                    .disposeObservableWhen(LifecycleEvent.DESTROY)
                    ).delay(DEFAULT_REQUEST_INTERVAL_SECOND, TimeUnit.SECONDS)
                    .doOnComplete(() -> requestCollections(AuthManager.getInstance().getUser().username))
                    .subscribe();
        }
    });
}
 
源代码27 项目: Mysplash   文件: SelectCollectionDialog.java
@Override
public void onUpdateFailed() {
    if (AuthManager.getInstance().getUser() == null) {
        Observable.create(Emitter::onComplete)
                .compose(RxLifecycleCompact.bind(this).disposeObservableWhen(LifecycleEvent.DESTROY))
                .delay(DEFAULT_REQUEST_INTERVAL_SECOND, TimeUnit.SECONDS)
                .doOnComplete(this::requestProfile)
                .subscribe();
    }
}
 
源代码28 项目: cxf   文件: StatsRestServiceImpl.java
@Context 
public void setSse(Sse sse) {
    this.broadcaster = sse.newBroadcaster();
    this.builder = sse.newEventBuilder();
    
    Flowable
        .interval(500, TimeUnit.MILLISECONDS)
        .zipWith(
            Flowable.generate((Emitter<OutboundSseEvent.Builder> emitter) -> emitter.onNext(builder.name("stats"))),
            (id, bldr) -> createStatsEvent(bldr, id)
        )
        .subscribeOn(Schedulers.single())
        .subscribe(broadcaster::broadcast);
}
 
源代码29 项目: rxjava2-jdbc   文件: Select.java
private static <T> Flowable<? extends T> create(PreparedStatement ps, List<Object> parameters,
                                                Function<? super ResultSet, T> mapper, List<String> names, String sql, int fetchSize, int queryTimeoutSec) {
    log.debug("parameters={}", parameters);
    log.debug("names={}", names);

    Callable<ResultSet> initialState = () -> {
        List<Parameter> params = Util.toParameters(parameters);
        boolean hasCollection = params.stream().anyMatch(x -> x.isCollection());
        final PreparedStatement ps2;
        if (hasCollection) {
            // create a new prepared statement with the collection ? substituted with
            // ?s to match the size of the collection parameter
            ps2 = Util.prepare(ps.getConnection(), fetchSize, sql, params, queryTimeoutSec);
            // now wrap the rs to auto close ps2 because it is single use (the next
            // collection parameter may have a different ordinality so we need to build
            // a new PreparedStatement with a different number of question marks
            // substituted
            return new ResultSetAutoClosesStatement(Util //
                    .setParameters(ps2, params, names) //
                    .executeQuery(), ps2);
        } else {
            // use the current prepared statement (normal re-use)
            ps2 = ps;
            return Util //
                    .setParameters(ps2, params, names) //
                    .executeQuery();
        }
    };
    BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
        log.debug("getting row from ps={}, rs={}", rs.getStatement(), rs);
        if (rs.next()) {
            T v = mapper.apply(rs);
            log.debug("emitting {}", v);
            emitter.onNext(v);
        } else {
            log.debug("completed");
            emitter.onComplete();
        }
    };
    Consumer<ResultSet> disposeState = Util::closeSilently;
    return Flowable.generate(initialState, generator, disposeState);
}
 
源代码30 项目: jadx   文件: SearchDialog.java
private void saveEmitter(Emitter<String> emitter) {
	this.emitter = emitter;
}
 
 类所在包
 类方法
 同包方法