下面列出了怎么用io.reactivex.Emitter的API类实例代码及写法,或者点击链接到github查看源代码。
public static <T> Flowable<T> create(Callable<Connection> connectionFactory, List<Object> parameters, String sql,
Function<? super ResultSet, T> mapper) {
Callable<ResultSet> initialState = () -> {
Connection con = connectionFactory.call();
PreparedStatement ps = con.prepareStatement(sql);
// TODO set parameters
ResultSet rs = ps.executeQuery();
return rs;
};
BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
if (rs.next()) {
emitter.onNext(mapper.apply(rs));
} else {
emitter.onComplete();
}
};
Consumer<ResultSet> disposeState = FlowableSelect::closeSilently;
return Flowable.generate(initialState, generator, disposeState);
}
@Override
public Flowable<String> getRepos(String user) {
Callable<Iterator<String>> initialState =
() -> gitHbubRepos.getRepos(user)
.stream()
.map(Repository::getName).iterator();
BiConsumer<Iterator<String>, Emitter<String>> generator =
(iterator, emitter) -> {
if (iterator.hasNext()) {
emitter.onNext(iterator.next() + " ");
} else {
emitter.onComplete();
}
};
return Flowable.generate(initialState, generator);
}
@Override
public Flowable<String> getRepos(String user) {
Callable<Iterator<String>> initialState =
() -> gitHbubRepos.getRepos(user)
.stream()
.map(Repository::getName).iterator();
BiConsumer<Iterator<String>, Emitter<String>> generator =
(iterator, emitter) -> {
if (iterator.hasNext()) {
emitter.onNext(iterator.next() + " ");
} else {
emitter.onComplete();
}
};
return Flowable.generate(initialState, generator);
}
private static <T> Flowable<T> create(NamedPreparedStatement ps, List<Object> parameters,
Function<? super ResultSet, T> mapper) {
Callable<ResultSet> initialState = () -> {
Util.convertAndSetParameters(ps.ps, parameters, ps.names);
ps.ps.execute();
return ps.ps.getGeneratedKeys();
};
BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
if (rs.next()) {
emitter.onNext(mapper.apply(rs));
} else {
emitter.onComplete();
}
};
Consumer<ResultSet> disposer = Util::closeSilently;
return Flowable.generate(initialState, generator, disposer);
}
private static <T> Flowable<T> createFlowable(NamedCallableStatement stmt,
Function<? super ResultSet, ? extends T> f) throws SQLException {
ResultSet rsActual = stmt.stmt.getResultSet();
Callable<ResultSet> initialState = () -> rsActual;
BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
log.debug("getting row from ps={}, rs={}", stmt.stmt, rs);
if (rs.next()) {
T v = f.apply(rs);
log.debug("emitting {}", v);
emitter.onNext(v);
} else {
log.debug("completed");
emitter.onComplete();
}
};
Consumer<ResultSet> disposeState = Util::closeSilently;
return Flowable.generate(initialState, generator, disposeState);
}
@Override
protected void subscribeActual(Subscriber<? super byte[]> subscriber) {
Flowable.generate(new Consumer<Emitter<byte[]>>() {
@Override
public void accept(Emitter<byte[]> emitter) {
int nextBatchSize = Math.min(byteBuffer.remaining(), maxBatchSize);
if (nextBatchSize == 0) {
emitter.onComplete();
return;
}
final byte[] nextBatch = new byte[nextBatchSize];
byteBuffer.get(nextBatch);
emitter.onNext(nextBatch);
}
}).subscribe(subscriber);
}
/**
* Returns a Flowable stream of byte arrays from the given
* {@link InputStream} between 1 and {@code bufferSize} bytes.
*
* @param is
* input stream of bytes
* @param bufferSize
* max emitted byte array size
* @return a stream of byte arrays
*/
public static Flowable<byte[]> from(final InputStream is, final int bufferSize) {
return Flowable.generate(new Consumer<Emitter<byte[]>>() {
@Override
public void accept(Emitter<byte[]> emitter) throws Exception {
byte[] buffer = new byte[bufferSize];
int count = is.read(buffer);
if (count == -1) {
emitter.onComplete();
} else if (count < bufferSize) {
emitter.onNext(Arrays.copyOf(buffer, count));
} else {
emitter.onNext(buffer);
}
}
});
}
public static Flowable<ZippedEntry> unzip(final ZipInputStream zis) {
return Flowable.generate(new Consumer<Emitter<ZippedEntry>>() {
@Override
public void accept(Emitter<ZippedEntry> emitter) throws IOException {
ZipEntry zipEntry = zis.getNextEntry();
if (zipEntry != null) {
emitter.onNext(new ZippedEntry(zipEntry, zis));
} else {
// end of stream so eagerly close the stream (might not be a
// good idea since this method did not create the zis
zis.close();
emitter.onComplete();
}
}
});
}
public <T> Flowable<T> read(final Class<T> cls, final Input input) {
return Flowable.generate(new Consumer<Emitter<T>>() {
@Override
public void accept(Emitter<T> emitter) throws Exception {
if (input.eof()) {
emitter.onComplete();
} else {
T t = kryo.readObject(input, cls);
emitter.onNext(t);
}
}
});
}
@Override
public void handleBackPressed() {
// double click to exit.
if (backPressed) {
finishSelf(true);
} else {
backPressed = true;
NotificationHelper.showSnackbar(this, getString(R.string.feedback_click_again_to_exit));
Observable.create(Emitter::onComplete)
.compose(RxLifecycle.bind(this).disposeObservableWhen(LifecycleEvent.DESTROY))
.delay(2, TimeUnit.SECONDS)
.doOnComplete(() -> backPressed = false)
.subscribe();
}
}
@Override
public void handleBackPressed() {
// double click to exit.
if (backPressed) {
finishSelf(true);
} else {
backPressed = true;
NotificationHelper.showSnackbar(
this, getString(R.string.feedback_click_again_to_exit));
Observable.create(Emitter::onComplete)
.compose(RxLifecycle.bind(this).disposeObservableWhen(LifecycleEvent.DESTROY))
.delay(2, TimeUnit.SECONDS)
.doOnComplete(() -> backPressed = false)
.subscribe();
}
}
@Override
public void onFailed() {
if (getActivity() == null) {
return;
}
if (!TextUtils.isEmpty(username)) {
Observable.create(Emitter::onComplete)
.compose(
RxLifecycleCompact.bind(ProfileDialog.this)
.disposeObservableWhen(LifecycleEvent.DESTROY)
).delay(2, TimeUnit.SECONDS)
.doOnComplete(() -> service.requestUserProfile(username, new ProfileCallback()))
.subscribe();
}
}
private void finish(List<BookChapterBean> chapterList, Emitter<List<BookChapterBean>> emitter) {
//去除重复,保留后面的,先倒序,从后面往前判断
if (!dx) {
Collections.reverse(chapterList);
}
LinkedHashSet<BookChapterBean> lh = new LinkedHashSet<>(chapterList);
chapterList = new ArrayList<>(lh);
Collections.reverse(chapterList);
Debug.printLog(tag, 1, "-目录解析完成", analyzeNextUrl);
emitter.onNext(chapterList);
emitter.onComplete();
}
private static <T> Flowable<T> create(PreparedStatement ps, Function<? super ResultSet, T> mapper) {
Callable<ResultSet> initialState = () -> {
ps.execute();
return ps.getGeneratedKeys();
};
BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
if (rs.next()) {
emitter.onNext(mapper.apply(rs));
} else {
emitter.onComplete();
}
};
Consumer<ResultSet> disposer = Update::closeAll;
return Flowable.generate(initialState, generator, disposer);
}
@Override
public Flowable<Repository> getRepos0(String user) {
Callable<Iterator<Repository>> initialState =
gitHbubRepos.getRepos(user)
.stream()::iterator;
BiConsumer<Iterator<Repository>, Emitter<Repository>> generator =
(iterator, emitter) -> {
if (iterator.hasNext()) {
emitter.onNext(iterator.next());
} else {
emitter.onComplete();
}
};
return Flowable.generate(initialState, generator);
}
private void finish(List<BookChapterBean> chapterList, Emitter<List<BookChapterBean>> emitter) {
//去除重复,保留后面的,先倒序,从后面往前判断
if (!dx) {
Collections.reverse(chapterList);
}
LinkedHashSet<BookChapterBean> lh = new LinkedHashSet<>(chapterList);
chapterList = new ArrayList<>(lh);
Collections.reverse(chapterList);
Debug.printLog(tag, 1, "-目录解析完成", analyzeNextUrl);
emitter.onNext(chapterList);
emitter.onComplete();
}
static <X> void toStreamEvents(CompletionStage<X> cs, Emitter<Object> emitter) {
cs.whenComplete((X res, Throwable err) -> {
if (res != null) {
emitter.onNext(res);
emitter.onComplete();
} else {
if (err != null) {
emitter.onError(err instanceof CompletionException ? err.getCause() : err);
} else {
emitter.onComplete();
}
}
});
}
@Override
public Flowable<Repository> getRepos0(String user) {
Callable<Iterator<Repository>> initialState =
gitHbubRepos.getRepos(user)
.stream()::iterator;
BiConsumer<Iterator<Repository>, Emitter<Repository>> generator =
(iterator, emitter) -> {
if (iterator.hasNext()) {
emitter.onNext(iterator.next());
} else {
emitter.onComplete();
}
};
return Flowable.generate(initialState, generator);
}
@SuppressWarnings("WeakerAccess")
@RestrictTo(RestrictTo.Scope.SUBCLASSES)
void considerGattDisconnected(
final Emitter<Void> emitter,
final QueueReleaseInterface queueReleaseInterface
) {
connectionStateChangeListener.onConnectionStateChange(DISCONNECTED);
queueReleaseInterface.release();
emitter.onComplete();
}
private static Flowable<Flowable<byte[]>> createServerSocketFlowable(final ServerSocket serverSocket,
final long timeoutMs, final int bufferSize, final Action preAcceptAction,
final Predicate<? super Socket> acceptSocket) {
return Flowable.generate( //
new Consumer<Emitter<Flowable<byte[]>>>() {
@Override
public void accept(Emitter<Flowable<byte[]>> emitter) throws Exception {
acceptConnection(timeoutMs, bufferSize, serverSocket, emitter, preAcceptAction, acceptSocket);
}
});
}
public static Flowable<String> from(final Reader reader, final int bufferSize) {
return Flowable.generate(new Consumer<Emitter<String>>() {
final char[] buffer = new char[bufferSize];
@Override
public void accept(Emitter<String> emitter) throws Exception {
int count = reader.read(buffer);
if (count == -1) {
emitter.onComplete();
} else {
emitter.onNext(String.valueOf(buffer, 0, count));
}
}
});
}
@Override
public Publisher<? extends Integer> call() throws Exception {
return Flowable.generate(new Consumer<Emitter<Integer>>() {
final int[] count = new int[1];
@Override
public void accept(Emitter<Integer> emitter) throws Exception {
count[0]++;
emitter.onNext(count[0]);
if (count[0] == 1000) {
emitter.onComplete();
}
}
});
}
@Override
public void accept(Signals<Id> signals, Emitter<EntityStateMachine<?, Id>> observer)
throws Exception {
@SuppressWarnings("unchecked")
Event<Object> event = (Event<Object>) signals.signalsToSelf.pollLast();
if (event != null) {
applySignalToSelf(signals, observer, event);
} else {
applySignalsToOthers(classId, worker, signals);
observer.onComplete();
}
}
@Override
public void onUpdateFailed() {
if (AuthManager.getInstance().getUser() == null) {
timerDisposable = Observable.create(Emitter::onComplete)
.delay(DEFAULT_REQUEST_INTERVAL_SECOND, TimeUnit.SECONDS)
.doOnComplete(() -> AuthManager.getInstance().requestPersonalProfile())
.subscribe();
} else {
setResource(Resource.success(AuthManager.getInstance().getUser()));
}
}
@Override
public void handleBackPressed() {
if (state == INPUT_STATE && backPressed) {
finishSelf(true);
} else if (state == INPUT_STATE) {
backPressed = true;
NotificationHelper.showSnackbar(this, getString(R.string.feedback_click_again_to_exit));
Observable.create(Emitter::onComplete)
.compose(RxLifecycle.bind(this).disposeObservableWhen(LifecycleEvent.DESTROY))
.delay(2, TimeUnit.SECONDS)
.doOnComplete(() -> backPressed = false)
.subscribe();
}
}
private void requestCollections(String username) {
collectionService.cancel();
setLoading(true);
int perPage = ListPager.DEFAULT_PER_PAGE;
collectionService.requestUserCollections(username, page + 1, perPage, new BaseObserver<List<Collection>>() {
@Override
public void onSucceed(List<Collection> collectionList) {
page ++;
setLoading(false);
if (collectionList.size() > 0) {
AuthManager.getInstance()
.getCollectionsManager()
.addCollections(collectionList);
updatePhotoAdapter();
}
if (collectionList.size() < ListPager.DEFAULT_PER_PAGE) {
AuthManager.getInstance().getCollectionsManager().setLoadFinish(true);
}
}
@Override
public void onFailed() {
setLoading(false);
Observable.create(Emitter::onComplete)
.compose(
RxLifecycleCompact.bind(SelectCollectionDialog.this)
.disposeObservableWhen(LifecycleEvent.DESTROY)
).delay(DEFAULT_REQUEST_INTERVAL_SECOND, TimeUnit.SECONDS)
.doOnComplete(() -> requestCollections(AuthManager.getInstance().getUser().username))
.subscribe();
}
});
}
@Override
public void onUpdateFailed() {
if (AuthManager.getInstance().getUser() == null) {
Observable.create(Emitter::onComplete)
.compose(RxLifecycleCompact.bind(this).disposeObservableWhen(LifecycleEvent.DESTROY))
.delay(DEFAULT_REQUEST_INTERVAL_SECOND, TimeUnit.SECONDS)
.doOnComplete(this::requestProfile)
.subscribe();
}
}
@Context
public void setSse(Sse sse) {
this.broadcaster = sse.newBroadcaster();
this.builder = sse.newEventBuilder();
Flowable
.interval(500, TimeUnit.MILLISECONDS)
.zipWith(
Flowable.generate((Emitter<OutboundSseEvent.Builder> emitter) -> emitter.onNext(builder.name("stats"))),
(id, bldr) -> createStatsEvent(bldr, id)
)
.subscribeOn(Schedulers.single())
.subscribe(broadcaster::broadcast);
}
private static <T> Flowable<? extends T> create(PreparedStatement ps, List<Object> parameters,
Function<? super ResultSet, T> mapper, List<String> names, String sql, int fetchSize, int queryTimeoutSec) {
log.debug("parameters={}", parameters);
log.debug("names={}", names);
Callable<ResultSet> initialState = () -> {
List<Parameter> params = Util.toParameters(parameters);
boolean hasCollection = params.stream().anyMatch(x -> x.isCollection());
final PreparedStatement ps2;
if (hasCollection) {
// create a new prepared statement with the collection ? substituted with
// ?s to match the size of the collection parameter
ps2 = Util.prepare(ps.getConnection(), fetchSize, sql, params, queryTimeoutSec);
// now wrap the rs to auto close ps2 because it is single use (the next
// collection parameter may have a different ordinality so we need to build
// a new PreparedStatement with a different number of question marks
// substituted
return new ResultSetAutoClosesStatement(Util //
.setParameters(ps2, params, names) //
.executeQuery(), ps2);
} else {
// use the current prepared statement (normal re-use)
ps2 = ps;
return Util //
.setParameters(ps2, params, names) //
.executeQuery();
}
};
BiConsumer<ResultSet, Emitter<T>> generator = (rs, emitter) -> {
log.debug("getting row from ps={}, rs={}", rs.getStatement(), rs);
if (rs.next()) {
T v = mapper.apply(rs);
log.debug("emitting {}", v);
emitter.onNext(v);
} else {
log.debug("completed");
emitter.onComplete();
}
};
Consumer<ResultSet> disposeState = Util::closeSilently;
return Flowable.generate(initialState, generator, disposeState);
}
private void saveEmitter(Emitter<String> emitter) {
this.emitter = emitter;
}