下面列出了io.reactivex.SingleSource#io.reactivex.annotations.NonNull 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
view.showLoading();//显示进度条
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doFinally(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
}).compose(RxLifecycleUtils.bindToLifecycle(view));
}
};
}
/**
* 订阅请求
*/
public static <T> void toSubscribe(Observable<T> observable, BaseObserver<T> observer) {
// 指定subscribe()发生在IO线程
observable.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
private int mRetryCount;
@Override
public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) {
return throwableObservable.flatMap((Function<Throwable, ObservableSource<?>>) throwable -> {
boolean exceptionType = (throwable instanceof NetworkErrorException
|| throwable instanceof ConnectException
|| throwable instanceof SocketTimeoutException
|| throwable instanceof TimeoutException) && mRetryCount < 3;
if (exceptionType) {
mRetryCount++;
return Observable.timer(4000, TimeUnit.MILLISECONDS);
}
return Observable.error(throwable);
});
}
})
.subscribe(observer);
}
/**
* 插入联系人
*
* @param name
* @param accountName
* @param memo
*/
public void insertContact(@NonNull String name, @NonNull String accountName, @NonNull String memo) {
Cursor cursor = mDatabase.query(ContactEntry.TABLE_NAME, new String[]{ContactEntry.COLUMN_ACCOUNT_NAME}, ContactEntry.COLUMN_ACCOUNT_NAME + " = ?", new String[]{accountName}, null, null, null);
if (cursor.getCount() == 0) {
ContentValues values = new ContentValues();
values.put(ContactEntry.COLUMN_ACCOUNT_NAME, accountName);
values.put(ContactEntry.COLUMN_CONTACT_NAME, name);
values.put(ContactEntry.COLUMN_MEMO, memo);
mDatabase.insert(ContactEntry.TABLE_NAME, null, values);
} else {
updateContact(accountName, name, memo);
}
if (cursor != null && !cursor.isClosed()) {
cursor.close();
}
}
@SuppressWarnings("unchecked")
public void subscribe(@NonNull Subscriber<? super T>[] subscribers) {
if (!validate(subscribers)) {
return;
}
int n = subscribers.length;
Subscriber<? super T>[] parents = new Subscriber[n];
for (int i = 0; i < n; i++) {
Subscriber<? super T> a = subscribers[i];
if (a instanceof ConditionalSubscriber) {
parents[i] = new LifeConditionalSubscriber<>((ConditionalSubscriber<? super T>) a, scope);
} else {
parents[i] = new LifeSubscriber<>(a, scope);
}
}
ParallelFlowable<T> upStream = this.upStream;
if (onMain) upStream = upStream.runOn(AndroidSchedulers.mainThread());
upStream.subscribe(parents);
}
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
view.showLoading();//显示进度条
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doFinally(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
}).compose(RxLifecycleUtils.bindToLifecycle(view));
}
};
}
@Override
public Flowable<ZeroFiveNewsDetail> getAcgNewsDetail(final String url) {
return Flowable.create(new FlowableOnSubscribe<ZeroFiveNewsDetail>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ZeroFiveNewsDetail> e) throws Exception {
Element html = Jsoup.connect(url).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
ZeroFiveNewsDetail zeroFiveNewsDetail = JP.from(html, ZeroFiveNewsDetail.class);
e.onNext(zeroFiveNewsDetail);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<ZeroFiveNewsPage> getAcgNews(final String typeUrl) {
return Flowable.create(new FlowableOnSubscribe<ZeroFiveNewsPage>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ZeroFiveNewsPage> e) throws Exception {
Element html = Jsoup.connect(typeUrl).get();
if(html == null){
e.onError(new Throwable("element html is null"));
}else {
ZeroFiveNewsPage zeroFiveNewsPage = JP.from(html, ZeroFiveNewsPage.class);
e.onNext(zeroFiveNewsPage);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
public void start2Share(RxPermissions rxPermissions) {
rxPermissions.request(
Manifest.permission.WRITE_EXTERNAL_STORAGE,
Manifest.permission.ACCESS_FINE_LOCATION,
Manifest.permission.WRITE_EXTERNAL_STORAGE)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(@NonNull Boolean aBoolean) throws Exception {
if (aBoolean) {
mView.showShareView();
} else {
mView.showError(R.string.msg_error_check_permission);
}
}
});
}
@Override
public Flowable<ScheduleNew> getScheduleNew(final String url) {
return Flowable.create(new FlowableOnSubscribe<ScheduleNew>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ScheduleNew> e) throws Exception {
Element html = Jsoup.connect(url).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
ScheduleNew scheduleNew = JP.from(html, ScheduleNew.class);
e.onNext(scheduleNew);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<ScheduleOtherPage> getScheduleOtherPage(final String url) {
return Flowable.create(new FlowableOnSubscribe<ScheduleOtherPage>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ScheduleOtherPage> e) throws Exception {
Element html = Jsoup.connect(url).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
ScheduleOtherPage scheduleOtherPage = JP.from(html, ScheduleOtherPage.class);
e.onNext(scheduleOtherPage);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<ScheduleVideo> getScheduleVideo(final String url) {
return Flowable.create(new FlowableOnSubscribe<ScheduleVideo>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ScheduleVideo> e) throws Exception {
Element html = Jsoup.connect(url).get();
if(html == null){
e.onError(new Throwable("element html is null"));
}else {
ScheduleVideo scheduleVideo = JP.from(html, ScheduleVideo.class);
if (!TextUtils.isEmpty(scheduleVideo.getVideoHtml())) {
scheduleVideo.setVideoUrl("http://tup.yhdm.tv/?m=1&vid=" + scheduleVideo.getVideoUrl());
}
/*StringBuilder scheduleVideoHtmlBuilder = new StringBuilder();
scheduleVideoHtmlBuilder.append(HtmlConstant.SCHEDULE_VIDEO_CSS);
scheduleVideoHtmlBuilder.append("<div class=\"player_main\" style=\"position: relative;\"> ");
scheduleVideoHtmlBuilder.append(scheduleVideo.getVideoHtml());
scheduleVideoHtmlBuilder.append("</div>");
scheduleVideo.setVideoHtml(scheduleVideoHtmlBuilder.toString());*/
e.onNext(scheduleVideo);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
/**
* 视频观看权限申请
*/
public void checkPermission2ScheduleVideo(RxPermissions rxPermissions, final String videoUrl) {
if (TextUtils.isEmpty(videoUrl)) {
mView.showError(R.string.msg_error_url_null);
return;
}
rxPermissions.request(permission.WRITE_EXTERNAL_STORAGE,
permission.READ_PHONE_STATE,
permission.ACCESS_NETWORK_STATE,
permission.ACCESS_WIFI_STATE)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(@NonNull Boolean aBoolean) throws Exception {
if (aBoolean) {
mView.start2ScheduleVideo(videoUrl);
} else {
mView.showError(R.string.msg_error_check_permission);
}
}
});
}
/**
* 搜索图片
*/
private void search(String key) {
swipeRefreshLayout.setRefreshing(true);
unsuscribe();
disposable = HttpUtil.getZhuangBiApi().search(key).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<ZhuangbiImage>>() {
@Override
public void accept(@NonNull List<ZhuangbiImage> zhuangbiImages) throws Exception {
Logger.e(LOG_TAG, "==网络请求成功==");
//进度条停止
swipeRefreshLayout.setRefreshing(false);
//给列表设置数据
zhuangBiListAdapter.setZhuangbiImages(zhuangbiImages);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
swipeRefreshLayout.setRefreshing(false);
Toast.makeText(LQBApp.getApp(), "请求失败,请刷新重试", Toast.LENGTH_SHORT).show();
Logger.e(LOG_TAG, "==网络请求错误==");
}
});
}
@Override
public List<Item> apply(@NonNull GankBeautyResult gankBeautyResult) throws Exception {
List<GankBeautyResult.ResultsBean> results = gankBeautyResult.getResults();
List<Item> itemList = new ArrayList<>();
//2018-01-29T07:40:56.269Z
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SS'Z'");
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
for (GankBeautyResult.ResultsBean result : results) {
Date inDate = sdf1.parse(result.getCreatedAt());
String outDateStr = sdf2.format(inDate);
Item item = new Item();
item.description = outDateStr;
item.imageUrl = result.getUrl();
itemList.add(item);
}
return itemList;
}
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
view.showLoading();//显示进度条
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doFinally(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
}).compose(RxLifecycleUtils.bindToLifecycle(view));
}
};
}
public Observable<AVUser> dissociateWithAuthData(final String platform) {
if (StringUtil.isEmpty(platform)) {
return Observable.error(new IllegalArgumentException(String.format(ILLEGALARGUMENT_MSG_FORMAT, "platform")));
}
String objectId = getObjectId();
if (StringUtil.isEmpty(objectId) || !isAuthenticated()) {
return Observable.error(new AVException(AVException.SESSION_MISSING,
"the user object missing a valid session"));
}
this.remove(AUTHDATA_TAG + "." + platform);
return this.saveInBackground().map(new Function<AVObject, AVUser>() {
public AVUser apply(@NonNull AVObject var1) throws Exception {
Map<String, Object> authData = (Map<String, Object>) AVUser.this.get(AUTHDATA_TAG);
if (authData != null) {
authData.remove(platform);
}
return AVUser.this;
}
});
}
public CustomHttpClient addHeader(@NonNull Pair<String, String> header) {
if (headers == null) {
headers = new ArrayList<>();
}
headers.add(header);
return this;
}
private boolean validate(@NonNull Subscriber<?>[] subscribers) {
int p = parallelism();
if (subscribers.length != p) {
Throwable iae = new IllegalArgumentException("parallelism = " + p + ", subscribers = " + subscribers.length);
for (Subscriber<?> s : subscribers) {
EmptySubscription.error(iae, s);
}
return false;
}
return true;
}
@Override
public void getSubCategories(String parent) {
Observable<BaseResponse<XianDuSubCategory>> observable = dataManager.getXianDuSubCategories(parent);
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.map(new Function<BaseResponse<XianDuSubCategory>, List<XianDuSubCategory>>() {
@Override
public List<XianDuSubCategory> apply(@NonNull BaseResponse<XianDuSubCategory> xianDuSubCategoryBaseResponse)
throws Exception {
return xianDuSubCategoryBaseResponse.getResults();
}
}).subscribe(new Observer<List<XianDuSubCategory>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull List<XianDuSubCategory> xianDuSubCategories) {
getView().showList(xianDuSubCategories);
}
@Override
public void onError(@NonNull Throwable e) {
getView().showError(e.getMessage());
}
@Override
public void onComplete() {
// getView().showSuccessful();
}
});
}
@Override
public void getXianDu(String categoryId, final int page) {
Observable<BaseResponse<XianDu>> observable = dataManager.getXianDu(categoryId, page);
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.map(new Function<BaseResponse<XianDu>, List<XianDu>>() {
@Override
public List<XianDu> apply(@NonNull BaseResponse<XianDu> xianDuBaseResponse)
throws Exception {
return xianDuBaseResponse.getResults();
}
}).subscribe(new Observer<List<XianDu>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull List<XianDu> xianDuList) {
getView().showXianDu(xianDuList, page);
}
@Override
public void onError(@NonNull Throwable e) {
getView().showError(e.getMessage());
}
@Override
public void onComplete() {
// getView().showSuccessful();
}
});
}
@Override
public void getPictures(String category, final int page) {
Observable<BaseResponse<GanHuo>> observable = dataManager.getGanHuo(category, page);
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.map(new Function<BaseResponse<GanHuo>, List<GanHuo>>() {
@Override
public List<GanHuo> apply(@NonNull BaseResponse<GanHuo> ganHuoBaseResponse)
throws Exception {
return ganHuoBaseResponse.getResults();
}
}).subscribe(new Observer<List<GanHuo>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull List<GanHuo> ganHuoList) {
getView().showList(ganHuoList, page);
}
@Override
public void onError(@NonNull Throwable e) {
getView().showError(e.getMessage());
}
@Override
public void onComplete() {
// getView().showComplete();
}
});
}
@Override
public void getProjectArticles(final int page, int cid) {
Observable<BaseResponse<ArticleListResponse>> observable =
dataManager.getProjectArticles(page, cid);
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.map(new Function<BaseResponse<ArticleListResponse>, List<Article>>() {
@Override
public List<Article> apply(@NonNull BaseResponse<ArticleListResponse> response)
throws Exception {
return response.getData().getDatas();
}
}).subscribeWith(new Observer<List<Article>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull List<Article> articles) {
getView().showProjectArticles(page, articles);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void getHierarchyCategories() {
Observable<BaseResponse<List<Category>>> observable = dataManager.getHierarchyCategories();
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.map(new Function<BaseResponse<List<Category>>,
List<Category>>() {
@Override
public List<Category> apply(@NonNull BaseResponse<List<Category>> response)
throws Exception {
return response.getData();
}
}).subscribeWith(new Observer<List<Category>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull List<Category> categories) {
getView().showHierarchyCategories(categories);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void getHierarchyArticles(final int page, int cid) {
Observable<BaseResponse<ArticleListResponse>> observable =
dataManager.getHierarchyArticles(page, cid);
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.map(new Function<BaseResponse<ArticleListResponse>, List<Article>>() {
@Override
public List<Article> apply(@NonNull BaseResponse<ArticleListResponse> response)
throws Exception {
return response.getData().getDatas();
}
}).subscribeWith(new Observer<List<Article>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull List<Article> articles) {
getView().showHierarchyArticles(page, articles);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void getArticles(final int page) {
Observable<BaseResponse<ArticleListResponse>> observable = dataManager.getArticles(page);
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.map(new Function<BaseResponse<ArticleListResponse>, List<Article>>() {
@Override
public List<Article> apply(
@NonNull BaseResponse<ArticleListResponse> response)
throws Exception {
return response.getData().getDatas();
}
}).subscribeWith(new Observer<List<Article>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable.add(d);
}
@Override
public void onNext(@NonNull List<Article> articles) {
getView().showArticles(page, articles);
}
@Override
public void onError(@NonNull Throwable e) {
getView().showError(e.getMessage());
}
@Override
public void onComplete() {
}
});
}
@Override
public void getBannerData() {
Observable<BaseResponse<List<Banner>>> observable = dataManager.getBannerData();
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.map(new Function<BaseResponse<List<Banner>>, List<Banner>>() {
@Override
public List<Banner> apply(@NonNull BaseResponse<List<Banner>> response)
throws Exception {
return response.getData();
}
}).subscribeWith(new Observer<List<Banner>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable.add(d);
}
@Override
public void onNext(@NonNull List<Banner> banners) {
getView().showBannerData(banners);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void getNavCategories() {
Observable<BaseResponse<List<NavCategory>>> observable = dataManager.getNavCategories();
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.map(new Function<BaseResponse<List<NavCategory>>, List<NavCategory>>() {
@Override
public List<NavCategory> apply(@NonNull BaseResponse<List<NavCategory>> response)
throws Exception {
return response.getData();
}
}).subscribeWith(new Observer<List<NavCategory>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull List<NavCategory> categories) {
getView().showNavCategories(categories);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void getHotKey() {
Observable<BaseResponse<List<HotKey>>> observable = dataManager.getHotKey();
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.map(new Function<BaseResponse<List<HotKey>>, List<HotKey>>() {
@Override
public List<HotKey> apply(@NonNull BaseResponse<List<HotKey>> response)
throws Exception {
return response.getData();
}
}).subscribeWith(new Observer<List<HotKey>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull List<HotKey> hotKeys) {
getView().showHotKey(hotKeys);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void search(final int page, String keyword) {
Observable<BaseResponse<ArticleListResponse>> observable =
dataManager.searchArticles(page, keyword);
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.map(new Function<BaseResponse<ArticleListResponse>, List<Article>>() {
@Override
public List<Article> apply(@NonNull BaseResponse<ArticleListResponse> response)
throws Exception {
return response.getData().getDatas();
}
}).subscribeWith(new Observer<List<Article>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull List<Article> articles) {
getView().showSearchResult(page, articles);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
@Override
public void signUp(String username, String password, String repassword) {
Observable<BaseResponse<LoginResponse>> observable =
dataManager.signup(username, password, repassword);
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.map(new Function<BaseResponse<LoginResponse>, LoginResponse>() {
@Override
public LoginResponse apply(@NonNull BaseResponse<LoginResponse> response)
throws Exception {
return response.getData();
}
})
.subscribeWith(new Observer<LoginResponse>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
@Override
public void onNext(@NonNull LoginResponse loginResponse) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}