下面列出了android.os.DeadObjectException#io.reactivex.ObservableOnSubscribe 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void clearMsgHistory() {
mSettingView.showLoading();
mCleanHisDis = Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
boolean result = mUserModel.clearChatHistory();
emitter.onNext(result);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean o) throws Exception {
mSettingView.hideLoading();
mSettingView.showMsg(StringUtils.getTextFromResId(R.string.successful));
}
});
}
@Override
public void delProfile() {
mSettingView.showLoading();
mDelProfileDis = Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
boolean result = mUserModel.destroyAccount();
emitter.onNext(result);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
mSettingView.hideLoading();
mSettingView.showMsg(StringUtils.getTextFromResId(R.string.successful));
SettingPresenter.this.logout();
}
});
}
@UiThread
public static ListenableFuture<IdentityRecord> getRemoteIdentityKey(final Context context, final Recipient recipient) {
final SettableFuture<IdentityRecord> future = new SettableFuture<>();
Observable.create(new ObservableOnSubscribe<IdentityRecord>() {
@Override
public void subscribe(ObservableEmitter<IdentityRecord> emitter) throws Exception {
emitter.onNext(Repository.getIdentityRepo(recipient.getAddress().context()).getIdentityRecord(recipient.getAddress().serialize()));
emitter.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> future.set(result), throwable -> future.set(null));
return future;
}
private Observable<Integer> getOriginalObservable() {
final List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6);
return Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
for (Integer integer : integers) {
if (!emitter.isDisposed()) {
emitter.onNext(integer);
}
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}
});
}
private Observable<BookShelfBean> checkChapterList(BookShelfBean collBook) {
if (!collBook.getHasUpdate() && !callback.getChapterList().isEmpty()) {
return Observable.just(collBook);
} else {
return Observable.create((ObservableOnSubscribe<List<BookChapterBean>>) e -> {
List<BookChapterBean> chapterList = loadChapters();
if (!chapterList.isEmpty()) {
e.onNext(chapterList);
} else {
e.onError(new IllegalAccessException("epubBook sub-chapter failed!"));
}
e.onComplete();
})
.flatMap(chapterList -> {
collBook.setChapterListSize(chapterList.size());
callback.onCategoryFinish(chapterList);
return Observable.just(collBook);
})
.doOnNext(bookShelfBean -> {
// 存储章节到数据库
bookShelfBean.setHasUpdate(false);
bookShelfBean.setFinalRefreshData(System.currentTimeMillis());
});
}
}
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
*/
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (mStickyEventMap) {
Observable<T> observable = mBus.ofType(eventType);
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return Observable.merge(observable, Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
emitter.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}
/**
* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
*/
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (mStickyEventMap) {
Observable<T> observable = mBus.ofType(eventType);
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return Observable.merge(observable, Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
emitter.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (stickyEventMap) {
Observable<T> observable = bus.ofType(eventType);
final Object event = stickyEventMap.get(eventType);
if (event != null) {
return observable.mergeWith(Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> subscriber) throws Exception {
subscriber.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}
/**
* Create an observable from the given event source.
*
* @param eventSource the eventSource you want to convert to an observable
* @param <E> the event type
* @return an Observable based on the provided event source
*/
public static <E> Observable<E> toObservable(final EventSource<E> eventSource) {
return Observable.create(
new ObservableOnSubscribe<E>() {
@Override
public void subscribe(final ObservableEmitter<E> emitter) throws Exception {
final Disposable disposable =
eventSource.subscribe(
new Consumer<E>() {
@Override
public void accept(E value) {
emitter.onNext(value);
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Exception {
disposable.dispose();
}
});
}
});
}
private void testLifeCycle3() {
Observable.create((ObservableOnSubscribe<Boolean>) emitter -> {
try {
Thread.sleep(2000); // 假设此处是耗时操作
} catch (Exception e) {
e.printStackTrace();
emitter.onError(new RuntimeException());
}
emitter.onNext(true);
}
)
.compose(RxUtil.applySchedulers())
.compose(lifecycleProvider.bindUntilEvent(Lifecycle.Event.ON_DESTROY))
.subscribe(new BaseObserver<Boolean>() {
@Override
public void onError(ApiException exception) {
ToastUtil.showToast("LifeCycle Error");
}
@Override
public void onSuccess(Boolean b) {
ToastUtil.showToast("LifeCycle");
}
});
}
private void testExample() {
Observable<String> observable = Observable.create(new ObservableOnSubscribe<File>() {
@Override
public void subscribe(ObservableEmitter<File> emitter) throws Exception {
emitter.onNext(Glide.with(getContext())
.load("https://hbimg.huabanimg.com/85966739547072c95d2ecd2ff60248e1d09be657c005-Dd5NrU_fw658")
.downloadOnly(Target.SIZE_ORIGINAL, Target.SIZE_ORIGINAL)
.get());
}
}).map(new Function<File, String>() {
@Override
public String apply(File file) throws Exception {
return savePicture(getContext(), file, new SimpleDateFormat("yyyyMMddHHmmss",
Locale.getDefault()).format(new Date())
+ ".jpg");
}
});
Consumer<String> observer = new Consumer<String>() {
@Override
public void accept(String path) throws Exception {
ToastUtil.showToast(StringUtil.isEmpty(path) ? "保存失败" : "保存成功");
GlideUtils.loadImageView(path, mExample);
}
};
disposables.add(observable.compose(RxUtil.applySchedulers()).subscribe(observer));
}
public void searchAndInsert() {
Observable.create((ObservableOnSubscribe<String>) emitter -> emitter.onNext("Start"))
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
if ("Start".equals(s)) {
UserDao.getInstance().createUser("Mike");
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private Observable<BookShelfBean> checkChapterList(BookShelfBean collBook) {
if (!collBook.getHasUpdate() && !callback.getChapterList().isEmpty()) {
return Observable.just(collBook);
} else {
return Observable.create((ObservableOnSubscribe<List<BookChapterBean>>) e -> {
List<BookChapterBean> chapterList = loadChapters();
if (!chapterList.isEmpty()) {
e.onNext(chapterList);
} else {
e.onError(new IllegalAccessException("epubBook sub-chapter failed!"));
}
e.onComplete();
})
.flatMap(chapterList -> {
collBook.setChapterListSize(chapterList.size());
callback.onCategoryFinish(chapterList);
return Observable.just(collBook);
})
.doOnNext(bookShelfBean -> {
// 存储章节到数据库
bookShelfBean.setHasUpdate(false);
bookShelfBean.setFinalRefreshData(System.currentTimeMillis());
});
}
}
private static void create(){
System.out.println();
ObservableOnSubscribe<String> source = emitter -> {
emitter.onNext("One");
emitter.onNext("Two");
emitter.onComplete();
};
Observable.create(source)
.filter(s -> s.contains("w"))
.subscribe(v -> System.out.println(v),
e -> e.printStackTrace(),
() -> System.out.println("Completed"));
pauseMs(100);
}
@Override
public Observable<WeatherInfoBean> getWeatherInfo(final String city) {
return Observable.create(new ObservableOnSubscribe<WeatherInfoBean>() {
@Override
public void subscribe(ObservableEmitter<WeatherInfoBean> emitter) throws Exception {
String path="http://wthrcdn.etouch.cn/weather_mini?city=" + city;
Request request = new Request.Builder()
.url(path)
.build();
Response response = okHttpClient.newCall(request).execute();
ResponseBody responseBody=response.body();
if (responseBody!=null){
JSONObject jsonObject=new JSONObject(responseBody.string());
JSONObject weatherInfo=jsonObject.getJSONObject("data");
String temperature=weatherInfo.getString("wendu")+"°";
String climate=weatherInfo.getString("ganmao");
emitter.onNext(new WeatherInfoBean(city,temperature,climate));
}else {
emitter.onError(new Throwable("响应体为空"));
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
@Override
public Observable<String> login(final String account, final String oldPassword) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String path = "http://36078d58.nat123.cc/AndroidRegisterAndLogin_war/login";
Map<String, String> params = new HashMap<>();
params.put("username", account);
params.put("password", oldPassword);
ResponseBody responseBody=executeHttp(path,params);
if (responseBody != null) {
String result = responseBody.string().replaceAll("(\\\r\\\n|\\\r|\\\n|\\\n\\\r)", "");
emitter.onNext(result);
responseBody.close();
} else {
emitter.onError(new Throwable("响应体为空"));
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
@Override
public Observable<String> register(final String account, final String oldPassword, final String newPassword) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String path = "http://36078d58.nat123.cc/AndroidRegisterAndLogin_war/register";
Map<String, String> params = new HashMap<>();
params.put("username", account);
params.put("password", oldPassword);
params.put("newPassword",newPassword);
ResponseBody responseBody=executeHttp(path,params);
if (responseBody != null) {
String result = responseBody.string().replaceAll("(\\\r\\\n|\\\r|\\\n|\\\n\\\r)", "");
emitter.onNext(result);
responseBody.close();
} else {
emitter.onError(new Throwable("响应体为空"));
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
@Override
public Observable<String> modifyPassword(final String account, final String oldPassword, final String newPassword) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String path = "http://36078d58.nat123.cc/AndroidRegisterAndLogin_war/modify";
Map<String, String> params = new HashMap<>();
params.put("username", account);
params.put("password", oldPassword);
params.put("newPassword",newPassword);
ResponseBody responseBody=executeHttp(path,params);
if (responseBody != null) {
String result = responseBody.string().replaceAll("(\\\r\\\n|\\\r|\\\n|\\\n\\\r)", "");
emitter.onNext(result);
responseBody.close();
} else {
emitter.onError(new Throwable("响应体为空"));
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
@Override
public Observable<Long> decompression(final String type, String original, String purpose, boolean deleteZip) {
final File mInput = new File(original);
final File mOutput = new File(purpose);
mProgress = 0L;
return Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
switch (type) {
case "zip":
unzip(mInput, mOutput, emitter);
break;
case "rar":
unRar(mInput, mOutput, emitter);
break;
default:
emitter.onError(new Throwable("无法解压该类型的压缩文件"));
break;
}
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
@Override
public void saveProgress(int progress, int duration) {
Observable.create((ObservableOnSubscribe<Boolean>) emitter -> {
if (inBookShelf()) {
ChapterBean chapterBean = bookShelf.getChapter(mPlayIndex);
chapterBean.setStart(progress);
chapterBean.setEnd(duration);
BookshelfHelp.saveChapter(chapterBean);
}
emitter.onNext(true);
emitter.onComplete();
}).subscribeOn(Schedulers.newThread())
.subscribe(new SimpleObserver<Boolean>() {
@Override
public void onNext(Boolean bool) {
}
@Override
public void onError(Throwable e) {
}
});
}
private void saveBookShelf(BookShelfBean bookShelfBean, boolean forceSave) {
Observable.create((ObservableOnSubscribe<Boolean>) emitter -> {
bookShelfBean.setFinalDate(System.currentTimeMillis());
bookShelfBean.setHasUpdate(false);
bookShelfBean.setNewChapters(0);
boolean inShelf = inBookShelf();
if (forceSave || inShelf) {
BookshelfHelp.saveBookToShelf(bookShelfBean);
}
emitter.onNext(forceSave && !inShelf);
emitter.onComplete();
}).subscribeOn(RxExecutors.getDefault())
.subscribe(new SimpleObserver<Boolean>() {
@Override
public void onNext(Boolean value) {
RxBus.get().post(value ? RxBusTag.HAD_ADD_BOOK : RxBusTag.UPDATE_BOOK_SHELF, bookShelfBean);
}
@Override
public void onError(Throwable e) {
}
});
}
@Override
public void importBookSource(File file) {
mView.showLoading("正在导入书源");
Observable.create((ObservableOnSubscribe<String>) emitter -> {
DocumentFile documentFile = DocumentFile.fromFile(file);
String json = DocumentHelper.readString(documentFile);
emitter.onNext(json);
emitter.onComplete();
}).flatMap(json -> BookSourceManager.importFromJson(json)
.flatMap(aBoolean -> {
if (aBoolean) {
return Observable.just(getAllBookSource());
}
return Observable.error(new Exception("import source failed"));
})).subscribe(getImportObserver());
}
private static void sendMessage(final ContactsKey receiverKey, final boolean onLine,
final String msg, final MessageType msgType, final boolean isFriend) {
final ToxManager manager = ToxManager.getManager();
final InfoRepository infoRepo = State.infoRepo();
for (final String splitMsg : splitMessage(msg)) {
ContactsKey senderKey = manager.toxBase.getSelfKey();
ToxNickname senderName = manager.toxBase.getName();
final long dbId = infoRepo.addMessage(receiverKey, senderKey, senderName, splitMsg,
GlobalParams.SEND_ING, true, msgType, -2);//can't -1
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
long id = 0;
if (isFriend) {
if (onLine) {
id = manager.toxBase.friendSendMessage(receiverKey,
ToxFriendMessage.unsafeFromValue(splitMsg.getBytes()),
MessageType.toToxMessageType(msgType));
} else {
id = manager.toxBase.generateUniqueId(receiverKey);
ContactsKey botKey =
new ToxAddress(GlobalParams.OFFLINE_BOT_TOK_ID).getKey();
//ignore the id
LogUtil.i(TAG, "botKey:" + botKey.getKey() + ",offlineMsgId:" + id);
manager.toxBase.friendSendMessageOffline(botKey,
OfflineBuilder.offlineMsgSend(id, receiverKey.getKey(), splitMsg));
}
}
if (id > 0) {
infoRepo.setMessageSending(id, dbId);
MsgTimer.startTimer(id);
} else {
infoRepo.setMessageFailByDbId(dbId);
}
emitter.onComplete();
}
}).subscribeOn(Schedulers.io()).subscribe();
}
}
@Override
public void logout() {
mSettingView.showLoading();
mLogoutDis = Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(ObservableEmitter<Boolean> emitter) throws Exception {
boolean result = true;
if (PreferenceUtils.getBoolean(PreferenceUtils.CLEAR_MSG_LOGOUT, false)) {
result = mUserModel.clearChatHistory();
}
if (State.isLoggedIn()) {
State.logout();
}
emitter.onNext(result);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
mSettingView.hideLoading();
ServiceManager.stopToxService();
TokApplication.getInstance().finishOpenedActivities();
PageJumpIn.jumpLoginPage(mSettingView.getActivity());
mSettingView.viewDestroy();
}
});
}
/**
* 得到 Observable
* @param <T> 指定的泛型类型
* @return Observable
*/
private static <T> Observable<T> createData(final T t) {
return Observable.create(new ObservableOnSubscribe<T>() {
@Override
public void subscribe(ObservableEmitter<T> emitter) throws Exception {
emitter.onNext(t);
emitter.onComplete();
}
});
}
public void search(final String key) {
bookmarkBeans.clear();
if (Objects.equals(key, "")) {
isSearch = false;
notifyDataSetChanged();
} else {
Observable.create((ObservableOnSubscribe<Boolean>) emitter -> {
for (BookmarkBean bookmarkBean : allBookmark) {
if (bookmarkBean.getChapterName().contains(key)) {
bookmarkBeans.add(bookmarkBean);
} else if (bookmarkBean.getContent().contains(key)) {
bookmarkBeans.add(bookmarkBean);
}
}
emitter.onNext(true);
emitter.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MyObserver<Boolean>() {
@Override
public void onNext(Boolean aBoolean) {
isSearch = true;
notifyDataSetChanged();
}
@Override
public void onError(Throwable e) {
}
});
}
}
@Override
public void saveData(List<ReplaceRuleBean> replaceRuleBeans) {
Observable.create((ObservableOnSubscribe<Boolean>) e -> {
int i = 0;
for (ReplaceRuleBean replaceRuleBean : replaceRuleBeans) {
i++;
replaceRuleBean.setSerialNumber(i + 1);
}
ReplaceRuleManager.addDataS(replaceRuleBeans);
e.onNext(true);
e.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
@Override
public Observable<Boolean> saveSource(BookSourceBean bookSource, BookSourceBean bookSourceOld) {
return Observable.create((ObservableOnSubscribe<Boolean>) e -> {
if (!TextUtils.isEmpty(bookSourceOld.getBookSourceUrl()) && !Objects.equals(bookSource.getBookSourceUrl(), bookSourceOld.getBookSourceUrl())) {
DbHelper.getDaoSession().getBookSourceBeanDao().delete(bookSourceOld);
}
BookSourceManager.addBookSource(bookSource);
e.onNext(true);
}).compose(RxUtils::toSimpleSingle);
}
@Override
public void saveData(List<TxtChapterRuleBean> txtChapterRuleBeans) {
Observable.create((ObservableOnSubscribe<Boolean>) e -> {
int i = 0;
for (TxtChapterRuleBean ruleBean : txtChapterRuleBeans) {
i++;
ruleBean.setSerialNumber(i + 1);
}
DbHelper.getDaoSession().getTxtChapterRuleBeanDao().insertOrReplaceInTx(txtChapterRuleBeans);
e.onNext(true);
e.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
@Override
public void saveBookmark(BookmarkBean bookmarkBean) {
Observable.create((ObservableOnSubscribe<BookmarkBean>) e -> {
BookshelfHelp.saveBookmark(bookmarkBean);
e.onNext(bookmarkBean);
e.onComplete();
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}