android.os.DeadObjectException#io.reactivex.ObservableOnSubscribe源码实例Demo

下面列出了android.os.DeadObjectException#io.reactivex.ObservableOnSubscribe 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: Tok-Android   文件: SettingPresenter.java
@Override
public void clearMsgHistory() {
    mSettingView.showLoading();
    mCleanHisDis = Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            boolean result = mUserModel.clearChatHistory();
            emitter.onNext(result);
        }
    })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean o) throws Exception {
                mSettingView.hideLoading();
                mSettingView.showMsg(StringUtils.getTextFromResId(R.string.successful));
            }
        });
}
 
源代码2 项目: Tok-Android   文件: SettingPresenter.java
@Override
public void delProfile() {
    mSettingView.showLoading();

    mDelProfileDis = Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            boolean result = mUserModel.destroyAccount();
            emitter.onNext(result);
        }
    })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                mSettingView.hideLoading();
                mSettingView.showMsg(StringUtils.getTextFromResId(R.string.successful));
                SettingPresenter.this.logout();
            }
        });
}
 
源代码3 项目: bcm-android   文件: IdentityUtil.java
@UiThread
public static ListenableFuture<IdentityRecord> getRemoteIdentityKey(final Context context, final Recipient recipient) {
    final SettableFuture<IdentityRecord> future = new SettableFuture<>();

    Observable.create(new ObservableOnSubscribe<IdentityRecord>() {
        @Override
        public void subscribe(ObservableEmitter<IdentityRecord> emitter) throws Exception {
            emitter.onNext(Repository.getIdentityRepo(recipient.getAddress().context()).getIdentityRecord(recipient.getAddress().serialize()));
            emitter.onComplete();
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(result -> future.set(result), throwable -> future.set(null));

    return future;
}
 
源代码4 项目: RxAndroid-Sample   文件: ExampleUnitTest.java
private Observable<Integer> getOriginalObservable() {
    final List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6);

    return Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) {
                    for (Integer integer : integers) {

                        if (!emitter.isDisposed()) {
                            emitter.onNext(integer);
                        }
                    }

                    if (!emitter.isDisposed()) {
                        emitter.onComplete();
                    }
                }

            });
}
 
源代码5 项目: a   文件: PageLoaderEpub.java
private Observable<BookShelfBean> checkChapterList(BookShelfBean collBook) {
    if (!collBook.getHasUpdate() && !callback.getChapterList().isEmpty()) {
        return Observable.just(collBook);
    } else {
        return Observable.create((ObservableOnSubscribe<List<BookChapterBean>>) e -> {
            List<BookChapterBean> chapterList = loadChapters();
            if (!chapterList.isEmpty()) {
                e.onNext(chapterList);
            } else {
                e.onError(new IllegalAccessException("epubBook sub-chapter failed!"));
            }
            e.onComplete();
        })
                .flatMap(chapterList -> {
                    collBook.setChapterListSize(chapterList.size());
                    callback.onCategoryFinish(chapterList);
                    return Observable.just(collBook);
                })
                .doOnNext(bookShelfBean -> {
                    // 存储章节到数据库
                    bookShelfBean.setHasUpdate(false);
                    bookShelfBean.setFinalRefreshData(System.currentTimeMillis());
                });
    }
}
 
源代码6 项目: AndroidWallet   文件: RxBus.java
/**
 * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
 */
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
    synchronized (mStickyEventMap) {
        Observable<T> observable = mBus.ofType(eventType);
        final Object event = mStickyEventMap.get(eventType);

        if (event != null) {
            return Observable.merge(observable, Observable.create(new ObservableOnSubscribe<T>() {
                @Override
                public void subscribe(ObservableEmitter<T> emitter) throws Exception {
                    emitter.onNext(eventType.cast(event));
                }
            }));
        } else {
            return observable;
        }
    }
}
 
源代码7 项目: AndroidWallet   文件: RxBus.java
/**
 * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
 */
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
    synchronized (mStickyEventMap) {
        Observable<T> observable = mBus.ofType(eventType);
        final Object event = mStickyEventMap.get(eventType);

        if (event != null) {
            return Observable.merge(observable, Observable.create(new ObservableOnSubscribe<T>() {
                @Override
                public void subscribe(ObservableEmitter<T> emitter) throws Exception {
                    emitter.onNext(eventType.cast(event));
                }
            }));
        } else {
            return observable;
        }
    }
}
 
源代码8 项目: SmartEventBus   文件: RxBus.java
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
    synchronized (stickyEventMap) {
        Observable<T> observable = bus.ofType(eventType);
        final Object event = stickyEventMap.get(eventType);
        if (event != null) {
            return observable.mergeWith(Observable.create(new ObservableOnSubscribe<T>() {
                @Override
                public void subscribe(ObservableEmitter<T> subscriber) throws Exception {
                    subscriber.onNext(eventType.cast(event));
                }
            }));
        } else {
            return observable;
        }
    }
}
 
源代码9 项目: mobius   文件: RxEventSources.java
/**
 * Create an observable from the given event source.
 *
 * @param eventSource the eventSource you want to convert to an observable
 * @param <E> the event type
 * @return an Observable based on the provided event source
 */
public static <E> Observable<E> toObservable(final EventSource<E> eventSource) {
  return Observable.create(
      new ObservableOnSubscribe<E>() {
        @Override
        public void subscribe(final ObservableEmitter<E> emitter) throws Exception {
          final Disposable disposable =
              eventSource.subscribe(
                  new Consumer<E>() {
                    @Override
                    public void accept(E value) {
                      emitter.onNext(value);
                    }
                  });

          emitter.setCancellable(
              new Cancellable() {
                @Override
                public void cancel() throws Exception {
                  disposable.dispose();
                }
              });
        }
      });
}
 
源代码10 项目: AndroidQuick   文件: RxJavaFragment.java
private void testLifeCycle3() {
    Observable.create((ObservableOnSubscribe<Boolean>) emitter -> {
                try {
                    Thread.sleep(2000); // 假设此处是耗时操作
                } catch (Exception e) {
                    e.printStackTrace();
                    emitter.onError(new RuntimeException());
                }
                emitter.onNext(true);
            }
    )
            .compose(RxUtil.applySchedulers())
            .compose(lifecycleProvider.bindUntilEvent(Lifecycle.Event.ON_DESTROY))
            .subscribe(new BaseObserver<Boolean>() {
                @Override
                public void onError(ApiException exception) {
                    ToastUtil.showToast("LifeCycle Error");
                }

                @Override
                public void onSuccess(Boolean b) {
                    ToastUtil.showToast("LifeCycle");
                }
            });
}
 
源代码11 项目: AndroidQuick   文件: RxJavaFragment.java
private void testExample() {
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<File>() {
        @Override
        public void subscribe(ObservableEmitter<File> emitter) throws Exception {
            emitter.onNext(Glide.with(getContext())
                    .load("https://hbimg.huabanimg.com/85966739547072c95d2ecd2ff60248e1d09be657c005-Dd5NrU_fw658")
                    .downloadOnly(Target.SIZE_ORIGINAL, Target.SIZE_ORIGINAL)
                    .get());
        }
    }).map(new Function<File, String>() {
        @Override
        public String apply(File file) throws Exception {
            return savePicture(getContext(), file, new SimpleDateFormat("yyyyMMddHHmmss",
                    Locale.getDefault()).format(new Date())
                    + ".jpg");
        }
    });
    Consumer<String> observer = new Consumer<String>() {
        @Override
        public void accept(String path) throws Exception {
            ToastUtil.showToast(StringUtil.isEmpty(path) ? "保存失败" : "保存成功");
            GlideUtils.loadImageView(path, mExample);
        }
    };
    disposables.add(observable.compose(RxUtil.applySchedulers()).subscribe(observer));
}
 
源代码12 项目: AndroidQuick   文件: UserManager.java
public void searchAndInsert() {
    Observable.create((ObservableOnSubscribe<String>) emitter -> emitter.onNext("Start"))
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {
                    if ("Start".equals(s)) {
                        UserDao.getInstance().createUser("Mike");
                    }
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {

                }
            });
}
 
源代码13 项目: MyBookshelf   文件: PageLoaderEpub.java
private Observable<BookShelfBean> checkChapterList(BookShelfBean collBook) {
    if (!collBook.getHasUpdate() && !callback.getChapterList().isEmpty()) {
        return Observable.just(collBook);
    } else {
        return Observable.create((ObservableOnSubscribe<List<BookChapterBean>>) e -> {
            List<BookChapterBean> chapterList = loadChapters();
            if (!chapterList.isEmpty()) {
                e.onNext(chapterList);
            } else {
                e.onError(new IllegalAccessException("epubBook sub-chapter failed!"));
            }
            e.onComplete();
        })
                .flatMap(chapterList -> {
                    collBook.setChapterListSize(chapterList.size());
                    callback.onCategoryFinish(chapterList);
                    return Observable.just(collBook);
                })
                .doOnNext(bookShelfBean -> {
                    // 存储章节到数据库
                    bookShelfBean.setHasUpdate(false);
                    bookShelfBean.setFinalRefreshData(System.currentTimeMillis());
                });
    }
}
 
private static void create(){
    System.out.println();
    ObservableOnSubscribe<String> source = emitter -> {
        emitter.onNext("One");
        emitter.onNext("Two");
        emitter.onComplete();
    };

    Observable.create(source)
              .filter(s -> s.contains("w"))
              .subscribe(v -> System.out.println(v),
                         e -> e.printStackTrace(),
                         () -> System.out.println("Completed"));

    pauseMs(100);
}
 
源代码15 项目: Dainty   文件: MainModel.java
@Override
public Observable<WeatherInfoBean> getWeatherInfo(final String city) {
    return Observable.create(new ObservableOnSubscribe<WeatherInfoBean>() {
        @Override
        public void subscribe(ObservableEmitter<WeatherInfoBean> emitter) throws Exception {
            String path="http://wthrcdn.etouch.cn/weather_mini?city=" + city;
            Request request = new Request.Builder()
                    .url(path)
                    .build();
            Response response = okHttpClient.newCall(request).execute();
            ResponseBody responseBody=response.body();
            if (responseBody!=null){
                JSONObject jsonObject=new JSONObject(responseBody.string());
                JSONObject weatherInfo=jsonObject.getJSONObject("data");
                String temperature=weatherInfo.getString("wendu")+"°";
                String climate=weatherInfo.getString("ganmao");
                emitter.onNext(new WeatherInfoBean(city,temperature,climate));
            }else {
                emitter.onError(new Throwable("响应体为空"));
            }
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
 
源代码16 项目: Dainty   文件: LoginRegisterModel.java
@Override
public Observable<String> login(final String account, final String oldPassword) {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            String path = "http://36078d58.nat123.cc/AndroidRegisterAndLogin_war/login";
            Map<String, String> params = new HashMap<>();
            params.put("username", account);
            params.put("password", oldPassword);

            ResponseBody responseBody=executeHttp(path,params);
            if (responseBody != null) {
                String result = responseBody.string().replaceAll("(\\\r\\\n|\\\r|\\\n|\\\n\\\r)", "");
                emitter.onNext(result);
                responseBody.close();
            } else {
                emitter.onError(new Throwable("响应体为空"));
            }
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
 
源代码17 项目: Dainty   文件: LoginRegisterModel.java
@Override
public Observable<String> register(final String account, final String oldPassword, final String newPassword) {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            String path = "http://36078d58.nat123.cc/AndroidRegisterAndLogin_war/register";
            Map<String, String> params = new HashMap<>();
            params.put("username", account);
            params.put("password", oldPassword);
            params.put("newPassword",newPassword);

            ResponseBody responseBody=executeHttp(path,params);
            if (responseBody != null) {
                String result = responseBody.string().replaceAll("(\\\r\\\n|\\\r|\\\n|\\\n\\\r)", "");
                emitter.onNext(result);
                responseBody.close();
            } else {
                emitter.onError(new Throwable("响应体为空"));
            }
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
 
源代码18 项目: Dainty   文件: LoginRegisterModel.java
@Override
public Observable<String> modifyPassword(final String account, final String oldPassword, final String newPassword) {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            String path = "http://36078d58.nat123.cc/AndroidRegisterAndLogin_war/modify";
            Map<String, String> params = new HashMap<>();
            params.put("username", account);
            params.put("password", oldPassword);
            params.put("newPassword",newPassword);

            ResponseBody responseBody=executeHttp(path,params);
            if (responseBody != null) {
                String result = responseBody.string().replaceAll("(\\\r\\\n|\\\r|\\\n|\\\n\\\r)", "");
                emitter.onNext(result);
                responseBody.close();
            } else {
                emitter.onError(new Throwable("响应体为空"));
            }
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
 
源代码19 项目: Dainty   文件: DecompressionModel.java
@Override
public Observable<Long> decompression(final String type, String original, String purpose, boolean deleteZip) {
    final File mInput = new File(original);
    final File mOutput = new File(purpose);
    mProgress = 0L;
    return Observable.create(new ObservableOnSubscribe<Long>() {
        @Override
        public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
            switch (type) {
                case "zip":
                    unzip(mInput, mOutput, emitter);
                    break;
                case "rar":
                    unRar(mInput, mOutput, emitter);
                    break;
                default:
                    emitter.onError(new Throwable("无法解压该类型的压缩文件"));
                    break;
            }
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
 
源代码20 项目: HaoReader   文件: AudioBookPlayModelImpl.java
@Override
public void saveProgress(int progress, int duration) {
    Observable.create((ObservableOnSubscribe<Boolean>) emitter -> {
        if (inBookShelf()) {
            ChapterBean chapterBean = bookShelf.getChapter(mPlayIndex);
            chapterBean.setStart(progress);
            chapterBean.setEnd(duration);
            BookshelfHelp.saveChapter(chapterBean);
        }
        emitter.onNext(true);
        emitter.onComplete();
    }).subscribeOn(Schedulers.newThread())
            .subscribe(new SimpleObserver<Boolean>() {
                @Override
                public void onNext(Boolean bool) {
                }

                @Override
                public void onError(Throwable e) {

                }
            });
}
 
源代码21 项目: HaoReader   文件: AudioBookPlayModelImpl.java
private void saveBookShelf(BookShelfBean bookShelfBean, boolean forceSave) {
    Observable.create((ObservableOnSubscribe<Boolean>) emitter -> {
        bookShelfBean.setFinalDate(System.currentTimeMillis());
        bookShelfBean.setHasUpdate(false);
        bookShelfBean.setNewChapters(0);
        boolean inShelf = inBookShelf();
        if (forceSave || inShelf) {
            BookshelfHelp.saveBookToShelf(bookShelfBean);
        }
        emitter.onNext(forceSave && !inShelf);
        emitter.onComplete();
    }).subscribeOn(RxExecutors.getDefault())
            .subscribe(new SimpleObserver<Boolean>() {
                @Override
                public void onNext(Boolean value) {
                    RxBus.get().post(value ? RxBusTag.HAD_ADD_BOOK : RxBusTag.UPDATE_BOOK_SHELF, bookShelfBean);
                }

                @Override
                public void onError(Throwable e) {

                }
            });
}
 
源代码22 项目: HaoReader   文件: BookSourcePresenterImpl.java
@Override
public void importBookSource(File file) {
    mView.showLoading("正在导入书源");
    Observable.create((ObservableOnSubscribe<String>) emitter -> {
        DocumentFile documentFile = DocumentFile.fromFile(file);
        String json = DocumentHelper.readString(documentFile);
        emitter.onNext(json);
        emitter.onComplete();
    }).flatMap(json -> BookSourceManager.importFromJson(json)
            .flatMap(aBoolean -> {
                if (aBoolean) {
                    return Observable.just(getAllBookSource());
                }
                return Observable.error(new Exception("import source failed"));
            })).subscribe(getImportObserver());
}
 
源代码23 项目: Tok-Android   文件: MsgHelper.java
private static void sendMessage(final ContactsKey receiverKey, final boolean onLine,
    final String msg, final MessageType msgType, final boolean isFriend) {
    final ToxManager manager = ToxManager.getManager();
    final InfoRepository infoRepo = State.infoRepo();
    for (final String splitMsg : splitMessage(msg)) {
        ContactsKey senderKey = manager.toxBase.getSelfKey();
        ToxNickname senderName = manager.toxBase.getName();
        final long dbId = infoRepo.addMessage(receiverKey, senderKey, senderName, splitMsg,
            GlobalParams.SEND_ING, true, msgType, -2);//can't -1
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                long id = 0;
                if (isFriend) {
                    if (onLine) {
                        id = manager.toxBase.friendSendMessage(receiverKey,
                            ToxFriendMessage.unsafeFromValue(splitMsg.getBytes()),
                            MessageType.toToxMessageType(msgType));
                    } else {
                        id = manager.toxBase.generateUniqueId(receiverKey);
                        ContactsKey botKey =
                            new ToxAddress(GlobalParams.OFFLINE_BOT_TOK_ID).getKey();
                        //ignore the id
                        LogUtil.i(TAG, "botKey:" + botKey.getKey() + ",offlineMsgId:" + id);
                        manager.toxBase.friendSendMessageOffline(botKey,
                            OfflineBuilder.offlineMsgSend(id, receiverKey.getKey(), splitMsg));
                    }
                }

                if (id > 0) {
                    infoRepo.setMessageSending(id, dbId);
                    MsgTimer.startTimer(id);
                } else {
                    infoRepo.setMessageFailByDbId(dbId);
                }
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io()).subscribe();
    }
}
 
源代码24 项目: Tok-Android   文件: SettingPresenter.java
@Override
public void logout() {
    mSettingView.showLoading();

    mLogoutDis = Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
            boolean result = true;
            if (PreferenceUtils.getBoolean(PreferenceUtils.CLEAR_MSG_LOGOUT, false)) {
                result = mUserModel.clearChatHistory();
            }
            if (State.isLoggedIn()) {
                State.logout();
            }
            emitter.onNext(result);
        }
    })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                mSettingView.hideLoading();
                ServiceManager.stopToxService();
                TokApplication.getInstance().finishOpenedActivities();
                PageJumpIn.jumpLoginPage(mSettingView.getActivity());
                mSettingView.viewDestroy();
            }
        });
}
 
源代码25 项目: FlowHelper   文件: RxUtils.java
/**
 * 得到 Observable
 * @param <T> 指定的泛型类型
 * @return Observable
 */
private static <T> Observable<T> createData(final T t) {
    return Observable.create(new ObservableOnSubscribe<T>() {
        @Override
        public void subscribe(ObservableEmitter<T> emitter) throws Exception {
            emitter.onNext(t);
            emitter.onComplete();
        }
    });
}
 
源代码26 项目: a   文件: BookmarkAdapter.java
public void search(final String key) {
    bookmarkBeans.clear();
    if (Objects.equals(key, "")) {
        isSearch = false;
        notifyDataSetChanged();
    } else {
        Observable.create((ObservableOnSubscribe<Boolean>) emitter -> {
            for (BookmarkBean bookmarkBean : allBookmark) {
                if (bookmarkBean.getChapterName().contains(key)) {
                    bookmarkBeans.add(bookmarkBean);
                } else if (bookmarkBean.getContent().contains(key)) {
                    bookmarkBeans.add(bookmarkBean);
                }
            }
            emitter.onNext(true);
            emitter.onComplete();
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new MyObserver<Boolean>() {
                    @Override
                    public void onNext(Boolean aBoolean) {
                        isSearch = true;
                        notifyDataSetChanged();
                    }

                    @Override
                    public void onError(Throwable e) {

                    }
                });
    }
}
 
源代码27 项目: a   文件: ReplaceRulePresenter.java
@Override
public void saveData(List<ReplaceRuleBean> replaceRuleBeans) {
    Observable.create((ObservableOnSubscribe<Boolean>) e -> {
        int i = 0;
        for (ReplaceRuleBean replaceRuleBean : replaceRuleBeans) {
            i++;
            replaceRuleBean.setSerialNumber(i + 1);
        }
        ReplaceRuleManager.addDataS(replaceRuleBeans);
        e.onNext(true);
        e.onComplete();
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}
 
源代码28 项目: a   文件: SourceEditPresenter.java
@Override
public Observable<Boolean> saveSource(BookSourceBean bookSource, BookSourceBean bookSourceOld) {
    return Observable.create((ObservableOnSubscribe<Boolean>) e -> {
        if (!TextUtils.isEmpty(bookSourceOld.getBookSourceUrl()) && !Objects.equals(bookSource.getBookSourceUrl(), bookSourceOld.getBookSourceUrl())) {
            DbHelper.getDaoSession().getBookSourceBeanDao().delete(bookSourceOld);
        }
        BookSourceManager.addBookSource(bookSource);
        e.onNext(true);
    }).compose(RxUtils::toSimpleSingle);
}
 
源代码29 项目: a   文件: TxtChapterRulePresenter.java
@Override
public void saveData(List<TxtChapterRuleBean> txtChapterRuleBeans) {
    Observable.create((ObservableOnSubscribe<Boolean>) e -> {
        int i = 0;
        for (TxtChapterRuleBean ruleBean : txtChapterRuleBeans) {
            i++;
            ruleBean.setSerialNumber(i + 1);
        }
        DbHelper.getDaoSession().getTxtChapterRuleBeanDao().insertOrReplaceInTx(txtChapterRuleBeans);
        e.onNext(true);
        e.onComplete();
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}
 
源代码30 项目: a   文件: ReadBookPresenter.java
@Override
public void saveBookmark(BookmarkBean bookmarkBean) {
    Observable.create((ObservableOnSubscribe<BookmarkBean>) e -> {
        BookshelfHelp.saveBookmark(bookmarkBean);
        e.onNext(bookmarkBean);
        e.onComplete();
    })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}