下面列出了怎么用io.reactivex.SingleOnSubscribe的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
protected void firstRequest() {
super.firstRequest();
Single.create((SingleOnSubscribe<Boolean>) emitter -> {
if (bookShelf != null) {
bookmarkBeanList = BookshelfHelp.getBookmarkList(bookShelf.getBookInfoBean().getName());
emitter.onSuccess(true);
} else {
emitter.onSuccess(false);
}
}).compose(RxUtils::toSimpleSingle)
.subscribe(new MySingleObserver<Boolean>() {
@Override
public void onSuccess(Boolean aBoolean) {
if (aBoolean) {
adapter.setAllBookmark(bookmarkBeanList);
}
}
});
}
@Override
protected void firstRequest() {
super.firstRequest();
Single.create((SingleOnSubscribe<Boolean>) emitter -> {
if (bookShelf != null) {
bookmarkBeanList = BookshelfHelp.getBookmarkList(bookShelf.getBookInfoBean().getName());
emitter.onSuccess(true);
} else {
emitter.onSuccess(false);
}
}).compose(RxUtils::toSimpleSingle)
.subscribe(new MySingleObserver<Boolean>() {
@Override
public void onSuccess(Boolean aBoolean) {
if (aBoolean) {
adapter.setAllBookmark(bookmarkBeanList);
}
}
});
}
/**
* use only request with an empty array to request all manifest permissions
*/
public Single<PermissionResult> requestAsSingle(final List<String> permissions) {
return Single.create(new SingleOnSubscribe<PermissionResult>() {
@Override
public void subscribe(final SingleEmitter<PermissionResult> emitter) throws Exception {
runtimePermission
.request(permissions)
.onResponse(new ResponseCallback() {
@Override
public void onResponse(PermissionResult result) {
if (result.isAccepted()) {
emitter.onSuccess(result);
} else {
emitter.onError(new Error(result));
}
}
}).ask();
}
});
}
/**
* The returned Single emits one Query result as a List.
*/
public static <T> Single<List<T>> single(final Query<T> query) {
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override
public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
query.subscribe().single().observer(new DataObserver<List<T>>() {
@Override
public void onData(List<T> data) {
if (!emitter.isDisposed()) {
emitter.onSuccess(data);
}
}
});
// no need to cancel, single never subscribes
}
});
}
public Session(Process process, Disposable processDisposable) {
this.process = process;
this.destroy = Completable
.create(e -> {
processDisposable.dispose();
e.onComplete();
})
.subscribeOn(Schedulers.io())
.doOnComplete(() -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v("destroy():doOnComplete");})
.doOnError(t -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v(t, "destroy():doOnError");})
.cache();
this.waitFor = Single
.create((SingleOnSubscribe<Integer>) e -> {
if (RXSDebug.isDebug()) Timber.tag(TAG).d("Waiting for %s to exit.", process);
int exitCode = process.waitFor();
if (RXSDebug.isDebug()) Timber.tag(TAG).d("Exitcode: %d, Process: %s", exitCode, process);
e.onSuccess(exitCode);
})
.subscribeOn(Schedulers.io())
.doOnSuccess(s -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v("waitFor():doOnSuccess %s", s);})
.doOnError(t -> { if (RXSDebug.isDebug()) Timber.tag(TAG).v(t, "waitFor():doOnError");})
.cache();
}
public Single<Cmd.Result> submit(Cmd cmd) {
return Single.create((SingleOnSubscribe<Cmd.Result>) emitter -> {
QueueCmd item = new QueueCmd(cmd, emitter);
synchronized (CmdProcessor.this) {
if (dead) {
if (RXSDebug.isDebug()) Timber.tag(TAG).w("Processor wasn't running: %s", cmd);
item.exitCode(Cmd.ExitCode.SHELL_DIED);
item.emit();
} else {
if (RXSDebug.isDebug()) Timber.tag(TAG).d("Submitted: %s", cmd);
cmdQueue.add(item);
}
}
}).doOnSuccess(item -> {
if (RXSDebug.isDebug()) {
Timber.tag(TAG).log(item.getErrors() != null && item.getErrors().size() > 0 ? Log.WARN : Log.INFO, "Processed: %s", item);
}
});
}
/************************************************************/
public Single<Void> deleteCollBookInRx(CollBookBean bean) {
return Single.create(new SingleOnSubscribe<Void>() {
@Override
public void subscribe(SingleEmitter<Void> e) throws Exception {
//查看文本中是否存在删除的数据
deleteBook(bean.get_id());
//删除任务
deleteDownloadTask(bean.get_id());
//删除目录
deleteBookChapter(bean.get_id());
//删除CollBook
mCollBookDao.delete(bean);
e.onSuccess(new Void());
}
});
}
public Single<T> toSingle() {
return Single.create(new SingleOnSubscribe<T>() {
@Override
public void subscribe(final @NonNull SingleEmitter<T> e) throws Exception {
enqueue(new Callback<T>() {
@Override
public void onResponse(@NonNull T data) {
e.onSuccess(data);
}
}
, new ErrorCallback() {
@Override
public void onError(@NonNull Throwable error) {
e.onError(error);
}
}
);
}
});
}
private Single<Integer> rename(RecordingItem recordingItem, int adapterPosition, String name) {
return Single.create((SingleOnSubscribe<Integer>) e -> {
File newFile = new File(
Environment.getExternalStorageDirectory().getAbsolutePath() + "/SoundRecorder/" + name);
if (newFile.exists() && !newFile.isDirectory()) {
e.onError(new Exception("File with same name already exists"));
} else {
File oldFilePath = new File(recordingItem.getFilePath());
if (oldFilePath.renameTo(newFile)) {
recordingItem.setName(name);
recordingItem.setFilePath(newFile.getPath());
recordItemDataSource.updateRecordItem(recordingItem);
e.onSuccess(adapterPosition);
} else {
e.onError(new Throwable("Cannot Rename file. Please try again"));
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
public static <T> Single<T> single(final OnSubscribeAction<T> subscribe) {
return Single.<T>create(new SingleOnSubscribe<T>() {
@Override
public void subscribe(final SingleEmitter<T> emitter) throws Exception {
RxActionDelegate<T> delegate = new RxActionDelegate<>(new ActionDelegate<T>() {
@Override
public void onSuccess(T result) {
emitter.onSuccess(result);
}
@Override
public void onError(Exception e) {
emitter.onError(e);
}
});
emitter.setDisposable(delegate);
subscribe.subscribe(delegate);
}
});
}
public static <T> Single<Result<T>> singleWrapped(final OnSubscribeAction<T> subscribe) {
return Single.<Result<T>>create(new SingleOnSubscribe<Result<T>>() {
@Override
public void subscribe(final SingleEmitter<Result<T>> emitter) throws Exception {
final RxActionDelegate<T> delegate = new RxActionDelegate<>(new ActionDelegate<T>() {
@Override
public void onSuccess(T result) {
emitter.onSuccess(new Result<T>(result));
}
@Override
public void onError(Exception e) {
emitter.onSuccess(new Result<T>(e));
}
});
emitter.setDisposable(delegate);
subscribe.subscribe(delegate);
}
});
}
/**
* Given an operation that takes a {@link MaybeConsumer<T>}, create a JavaRX {@link Single<T>}
* that produces the value passed to the MaybeConsumer.
*
* <p>Example:
*
* <pre>
* // log the name of the experiment with a given id
* DataController dc = getDataController();
* MaybeConsumers.buildSingle(mc -> dc.getExperimentById(id, mc))
* .subscribe(experiment -> log("Name: " + experiment.getName()));
* </pre>
*/
public static <T> Single<T> buildSingle(io.reactivex.functions.Consumer<MaybeConsumer<T>> c) {
return Single.create(
new SingleOnSubscribe<T>() {
@Override
public void subscribe(SingleEmitter<T> emitter) throws Exception {
c.accept(
new MaybeConsumer<T>() {
@Override
public void success(T value) {
emitter.onSuccess(value);
}
@Override
public void fail(Exception e) {
emitter.onError(e);
}
});
}
});
}
public Single<Result<String>> loadData() {
return Single.create(new SingleOnSubscribe<Result<String>>() {
@Override
public void subscribe(SingleEmitter<Result<String>> e) throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (new Random().nextBoolean()) {
e.onSuccess(Result.success("Result from data service"));
} else {
e.onSuccess(Result.<String>error(new Throwable("Fake error")));
}
}
}).subscribeOn(Schedulers.computation()).observeOn(AndroidSchedulers.mainThread());
}
/**
* Converts an {@link ApolloStoreOperation} to a Single.
*
* @param operation the ApolloStoreOperation to convert
* @param <T> the value type
* @return the converted Single
*/
@NotNull
@CheckReturnValue
public static <T> Single<T> from(@NotNull final ApolloStoreOperation<T> operation) {
checkNotNull(operation, "operation == null");
return Single.create(new SingleOnSubscribe<T>() {
@Override
public void subscribe(final SingleEmitter<T> emitter) {
operation.enqueue(new ApolloStoreOperation.Callback<T>() {
@Override
public void onSuccess(T result) {
emitter.onSuccess(result);
}
@Override
public void onFailure(Throwable t) {
emitter.onError(t);
}
});
}
});
}
/**
* @param query
* @return
*/
@NonNull
@CheckReturnValue
public static Single<DataSnapshot> single(@NonNull final Query query) {
return Single.create(new SingleOnSubscribe<DataSnapshot>() {
@Override
public void subscribe(
@NonNull final SingleEmitter<DataSnapshot> emit) throws Exception {
final ValueEventListener listener = listener(emit);
emit.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(listener);
}
});
query.addListenerForSingleValueEvent(listener);
}
});
}
/**
* @param query
* @return
*/
@NonNull
@CheckReturnValue
public static Single<DataSnapshot> single(@NonNull final Query query) {
return Single.create(new SingleOnSubscribe<DataSnapshot>() {
@Override
public void subscribe(
@NonNull final SingleEmitter<DataSnapshot> emit) throws Exception {
final ValueEventListener listener = listener(emit);
emit.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(listener);
}
});
query.addListenerForSingleValueEvent(listener);
}
});
}
void onTestAll() {
onView(View::showWorking);
Single.create((SingleOnSubscribe<List<TestResult>>) emitter -> {
Timber.d("loadInBackground start...");
long dur = System.currentTimeMillis();
ArrayList<TestResult> results = new ArrayList<>();
for (TestSuite suite : tests) {
results.addAll(suite.test().blockingGet());
}
Timber.d("loadInBackground done: %dms", (System.currentTimeMillis() - dur));
emitter.onSuccess(results);
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSuccess(testInfos -> ValidatorPresenter.this.testData = testInfos)
.subscribe(testInfos -> onView(v -> v.display(testInfos)));
}
@Override @NonNull public Single<List<T>> get() {
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
runInReadLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
if (!file.exists()) {
emitter.onSuccess(Collections.<T>emptyList());
return;
}
List<T> list = converter.read(file, type);
if (list == null) list = Collections.emptyList();
emitter.onSuccess(list);
}
});
}
});
}
@Override @NonNull public Single<List<T>> observePut(@NonNull final List<T> list) {
assertNotNull(list, "list");
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
runInWriteLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
if (!file.exists() && !file.createNewFile()) {
throw new IOException("Could not create file for store.");
}
converterWrite(list, converter, type, file);
emitter.onSuccess(list);
updateSubject.onNext(list);
}
});
}
});
}
@Override @NonNull public Single<List<T>> observeClear() {
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
runInWriteLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
if (file.exists() && !file.delete()) {
throw new IOException("Clear operation on store failed.");
}
emitter.onSuccess(Collections.<T>emptyList());
updateSubject.onNext(Collections.<T>emptyList());
}
});
}
});
}
@Override @NonNull public Single<List<T>> observeAdd(@NonNull final T value) {
assertNotNull(value, "value");
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
runInWriteLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
if (!file.exists() && !file.createNewFile()) {
throw new IOException("Could not create file for store.");
}
List<T> originalList = converter.read(file, type);
if (originalList == null) originalList = Collections.emptyList();
List<T> result = new ArrayList<T>(originalList.size() + 1);
result.addAll(originalList);
result.add(value);
converterWrite(result, converter, type, file);
emitter.onSuccess(result);
updateSubject.onNext(result);
}
});
}
});
}
@Override @NonNull public Single<List<T>> observeRemove(final int position) {
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
runInWriteLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
List<T> originalList = converter.read(file, type);
if (originalList == null) originalList = Collections.emptyList();
List<T> modifiedList = new ArrayList<T>(originalList);
modifiedList.remove(position);
converterWrite(modifiedList, converter, type, file);
emitter.onSuccess(modifiedList);
updateSubject.onNext(modifiedList);
}
});
}
});
}
@Override @NonNull public Single<T> observePut(@NonNull final T value) {
assertNotNull(value, "value");
return Single.create(new SingleOnSubscribe<T>() {
@Override public void subscribe(final SingleEmitter<T> emitter) throws Exception {
runInWriteLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
if (!file.exists() && !file.createNewFile()) {
throw new IOException("Could not create file for store.");
}
converterWrite(value, converter, type, file);
emitter.onSuccess(value);
updateSubject.onNext(new ValueUpdate<T>(value));
}
});
}
});
}
public void autoSave() {
Single.create((SingleOnSubscribe<Boolean>) e -> {
long currentTime = System.currentTimeMillis();
List<String> per = PermissionUtils.checkMorePermissions(MApplication.getInstance(), MApplication.PerList);
if (per.isEmpty() && !BuildConfig.DEBUG) {
File file = new File(FileUtils.getSdCardPath() + File.separator + "YueDu" + File.separator + "autoSave" + File.separator + "myBookShelf.json");
if (file.exists()) {
if (currentTime - file.lastModified() < TimeUnit.DAYS.toMillis(1)) {
e.onSuccess(false);
return;
}
}
DocumentHelper.createDirIfNotExist(FileUtils.getSdCardPath(), "YueDu");
String dirPath = FileUtils.getSdCardPath() + File.separator + "YueDu";
DocumentHelper.createDirIfNotExist(dirPath, "autoSave");
dirPath += File.separator + "autoSave";
backupConfig(dirPath);
backupBookShelf(dirPath);
backupBookSource(dirPath);
backupSearchHistory(dirPath);
backupReplaceRule(dirPath);
backupTxtChapterRule(dirPath);
upload(dirPath);
e.onSuccess(true);
}
e.onSuccess(false);
}).compose(RxUtils::toSimpleSingle)
.subscribe();
}
private void delSelect() {
Single.create((SingleOnSubscribe<Boolean>) emitter -> {
for (String noteUrl : bookShelfAdapter.getSelected()) {
BookshelfHelp.removeFromBookShelf(BookshelfHelp.getBook(noteUrl));
}
bookShelfAdapter.getSelected().clear();
emitter.onSuccess(true);
}).compose(RxUtils::toSimpleSingle)
.subscribe(new MySingleObserver<Boolean>() {
@Override
public void onSuccess(Boolean aBoolean) {
mPresenter.queryBookShelf(false, group);
}
});
}
public static Single<Boolean> toTop(BookSourceBean sourceBean) {
return Single.create((SingleOnSubscribe<Boolean>) e -> {
List<BookSourceBean> beanList = getAllBookSourceBySerialNumber();
for (int i = 0; i < beanList.size(); i++) {
beanList.get(i).setSerialNumber(i + 1);
}
sourceBean.setSerialNumber(0);
DbHelper.getDaoSession().getBookSourceBeanDao().insertOrReplaceInTx(beanList);
DbHelper.getDaoSession().getBookSourceBeanDao().insertOrReplace(sourceBean);
e.onSuccess(true);
}).compose(RxUtils::toSimpleSingle);
}
public static Single<Boolean> saveData(ReplaceRuleBean replaceRuleBean) {
return Single.create((SingleOnSubscribe<Boolean>) emitter -> {
if (replaceRuleBean.getSerialNumber() == 0) {
replaceRuleBean.setSerialNumber((int) (DbHelper.getDaoSession().getReplaceRuleBeanDao().queryBuilder().count() + 1));
}
DbHelper.getDaoSession().getReplaceRuleBeanDao().insertOrReplace(replaceRuleBean);
refreshDataS();
emitter.onSuccess(true);
}).compose(RxUtils::toSimpleSingle);
}
public static <T> Single<T> singleExec(final RealmConfiguration configuration,
final Consumer<Pair<SingleEmitter, Realm>> emitter) {
return Single.create(new SingleOnSubscribe<T>() {
@Override
public void subscribe(SingleEmitter<T> e) throws Exception {
try (Realm realm = Realm.getInstance(configuration)) {
emitter.accept(new Pair<SingleEmitter, Realm>(e, realm));
}
}
});
}
private void delSelect() {
Single.create((SingleOnSubscribe<Boolean>) emitter -> {
for (String noteUrl : bookShelfAdapter.getSelected()) {
BookshelfHelp.removeFromBookShelf(BookshelfHelp.getBook(noteUrl));
}
bookShelfAdapter.getSelected().clear();
emitter.onSuccess(true);
}).compose(RxUtils::toSimpleSingle)
.subscribe(new MySingleObserver<Boolean>() {
@Override
public void onSuccess(Boolean aBoolean) {
mPresenter.queryBookShelf(false, group);
}
});
}
private void restore() {
Single.create((SingleOnSubscribe<ArrayList<String>>) emitter -> {
emitter.onSuccess(WebDavHelp.INSTANCE.getWebDavFileNames());
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MySingleObserver<ArrayList<String>>() {
@Override
public void onSuccess(ArrayList<String> strings) {
if (!WebDavHelp.INSTANCE.showRestoreDialog(getActivity(), strings, BackupRestoreUi.INSTANCE)) {
Toast.makeText(getActivity(), "没有备份", Toast.LENGTH_SHORT).show();
}
}
});
}