下面列出了怎么用rx.android.schedulers.AndroidSchedulers的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void getBestComments(String disscussionId) {
Subscription rxSubscription = bookApi.getBestComments(disscussionId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<CommentList>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
LogUtils.e("getBestComments:" + e.toString());
}
@Override
public void onNext(CommentList list) {
mView.showBestComments(list);
}
});
addSubscrebe(rxSubscription);
}
@Override
public void getRecommendBookList(String bookId, String limit) {
Subscription rxSubscription = bookApi.getRecommendBookList(bookId, limit).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<RecommendBookList>() {
@Override
public void onNext(RecommendBookList data) {
LogUtils.i(data.booklists);
List<RecommendBookList.RecommendBook> list = data.booklists;
if (list != null && !list.isEmpty() && mView != null) {
mView.showRecommendBookList(list);
}
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
LogUtils.e("+++" + e.toString());
}
});
addSubscrebe(rxSubscription);
}
private void search() {
RxView.clicks(mSearch)
.throttleFirst(2, TimeUnit.SECONDS)
.map(aVoid -> mEditText.getText().toString())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
if (!types.contains(s)) {
SnackbarUtil.showMessage(mRecyclerView, "请输入正确的干货类型");
}
type = s;
mSwipeRefreshLayout.postDelayed(() -> {
mSwipeRefreshLayout.setRefreshing(true);
page = 1;
ganks.clear();
mIsRefresh = true;
searchGank();
}, 500);
});
}
public static void saveNoteTxtAsync(final SaveNoteListener l) {
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String txtFilePath = NotePersistenceController.saveNoteTxtFile();
subscriber.onNext(txtFilePath);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe(
new Action1<String>() {
@Override
public void call(String path) {
Toast.makeText(MyApp.getInstance().getApplicationContext(),"TXT备份文件保存在"+path,Toast.LENGTH_LONG).show();
l.onZipPath(path);
}
});
}
@Override
protected void onActivityResult(int requestCode, int resultCode, Intent data) {
super.onActivityResult(requestCode, resultCode, data);
if (resultCode == RESULT_OK && requestCode == REQUEST_TEAMBITION_CODE) {
String code = data.getStringExtra(UnionsActivity.CODE);
TalkClient.getInstance().getAccountApi()
.signInByTeambition(code)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<User>() {
@Override
public void call(User user) {
if (StringUtil.isNotBlank(user.getAccountToken())) {
initUserData(RegisterActivity.this, user);
getStrikeToken();
TransactionUtil.goTo(RegisterActivity.this, ChooseTeamActivity.class, true);
}
}
}, new ApiErrorAction());
} else if (resultCode == RESULT_OK && requestCode == REQUEST_COUNTRY_CODE) {
if (data != null) {
CountryModel model = Parcels.unwrap(data.getParcelableExtra(PickCountryCodeActivity.COUNTRY_CODE_DATA));
tvCountryCode.setText("+ " + String.valueOf(model.callingCode));
}
}
}
/**
* 提交教练认证申请
*
* @return
*/
public Observable<String> authentication() {
//1.创建Request
HttpGsonRequest<String> mRefreshRequest = RequestBuilder.create(String.class)
.requestMethod(Request.Method.POST)
.url(ApiConstant.API_COACH_AUTH)
.put("driverLicense", mCoachCardUrl)
.put("idCard", mIDCardUrl)
.putHeaders("Authorization", LoginManager.getInstance().getLoginUser().getToken())
.build();
//2.进行数据的处理
return gRequestPool.request(mRefreshRequest)
.filter((resp) -> null != resp)
.map(HttpResponse::getData)
.observeOn(AndroidSchedulers.mainThread());
}
/**
* 获取教练信息
*
* @return
*/
public Observable<Coach> getCoachInfo() {
//1.创建Request
HttpGsonRequest<Coach> mRefreshRequest = RequestBuilder.create(Coach.class)
.requestMethod(Request.Method.GET)
.url(ApiConstant.API_GET_COACH_INFO)
.putHeaders("Authorization", getUser().getToken())
.build();
//2.进行数据的处理
return gRequestPool.request(mRefreshRequest)
.filter((resp) -> null != resp && null != resp.data)
.map(HttpResponse::getData)
.doOnNext(coach -> {
PreferenceUtil.putString(SPConstant.KEY_COACH, GsonUtil.toJson(coach));
LoginManager.getInstance().setCoach(coach);
//更新占位符基本参数
UrlParserManager.getInstance().updateBaseParams();
})
.observeOn(AndroidSchedulers.mainThread());
}
/**
* 将邀请设置为已过期
*/
public void setExpire(Invitation invitation)
{
mInviteBll.setExpire(invitation)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<Void>()
{
@Override
public void onCompleted()
{
mActivity.setExpireSuccess();
}
@Override
public void onError(Throwable e)
{
e.printStackTrace();
}
@Override
public void onNext(Void aVoid)
{
}
});
}
@Override
public Observable<GetStoryExtraResponse> getStoryExtraById(int id) {
return Observable.just(new GetStoryExtraRequest(id))
.flatMap(new Func1<GetStoryExtraRequest, Observable<GetStoryExtraResponse>>() {
@Override
public Observable<GetStoryExtraResponse> call(GetStoryExtraRequest request) {
try {
GetStoryExtraResponse response = zhihuApi.getStoryExtraResponse(request);
return Observable.just(response);
} catch (AppException e) {
return Observable.error(e);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
public void getMoreNewPublicMessages(final String roomId, final String minId, Date minCreateTime) {
MessageRealm.getInstance()
.getLocalMessage(roomId, teamId, minId, minCreateTime.getTime(), 0)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<List<Message>>() {
@Override
public void call(List<Message> messages) {
if (messages.isEmpty()) {
getMoreNewPublicMessages(roomId, minId);
} else {
callback.showMoreNewMessages(messages, true);
}
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});
}
public void getThemesList() {
Subscription subscription = mBiz.getThemesList()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Themes>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
mView.onRequestError("getThemesList 数据加载失败ヽ(≧Д≦)ノ");
}
@Override
public void onNext(Themes themes) {
mView.loadThemeList(themes);
}
});
addSubscription(subscription);
}
protected void subscribe(String deviceAddress) {
Timber.d(getClass().getSimpleName());
deviceSubscriber = onDevice();
deviceMonitor = bleManager.selectedDeviceMonitor;
deviceMonitor
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(deviceSubscriber);
wifiMonitor = cloudManager.wifiMonitor;
wifiMonitor
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(onWifi());
firebaseSubscriber = onFirebaseStatus();
cloudManager.firebaseMonitor
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(firebaseSubscriber);
}
@NonNull private Single<Notification> buildNewFeatureNotification(Context context, String title,
String body, @StringRes int actionButtonString, PendingIntent pressIntentAction,
PendingIntent onDismissAction) {
return Single.fromCallable(() -> {
Notification notification =
new NotificationCompat.Builder(context).setContentIntent(pressIntentAction)
.setSmallIcon(R.drawable.ic_stat_aptoide_notification)
.setColor(ContextCompat.getColor(context, R.color.default_orange_gradient_end))
.setContentTitle(title)
.setContentText(body)
.addAction(0, context.getResources()
.getString(R.string.updates_notification_dismiss_button), onDismissAction)
.addAction(0, context.getResources()
.getString(actionButtonString), pressIntentAction)
.build();
notification.flags =
android.app.Notification.DEFAULT_LIGHTS | android.app.Notification.FLAG_AUTO_CANCEL;
return notification;
})
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread());
}
@Override
public Observable<T> call(Observable<R> rObservable) {
return rObservable.flatMap(new Func1<R, Observable<T>>() {
@Override
public Observable<T> call(R r) {
Log.d(TAG, r.isError() ? "HttpResult is error" : "HttpResult is right");
if (r.isError()) {
return Observable.error(new ApiException("网络出错"));
} else {
return createData(r.getResults());
}
}
}).subscribeOn(Schedulers.io()).unsubscribeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread());
}
public static void saveNoteZipAsync(final SaveNoteListener l){
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String zipFilePath = NotePersistenceController.saveNoteFiles();
subscriber.onNext(zipFilePath);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe(
new Action1<String>() {
@Override
public void call(String path) {
l.onZipPath(path);
}
});
}
public void start(final DownInfo info) {
// 如果消息实体为空,或正在下载,则返回
if (info == null || mDownInfoMap.get(info.getUrl()) != null) return;
DownLoadSubscriber<DownInfo> downLoadSubscriber = new DownLoadSubscriber<>(info);
mDownInfoMap.put(info.getUrl(), downLoadSubscriber);
// HttpService service;
// 增加一个拦截器,用于获取数据的进度回调
DownLoadInterceptor interceptor = new DownLoadInterceptor(downLoadSubscriber);
OkHttpClient.Builder builder = new OkHttpClient.Builder();
builder.connectTimeout(info.getConnectedTime(), TimeUnit.SECONDS).addInterceptor(interceptor);
Retrofit retrofit = new Retrofit.Builder().
addConverterFactory(ScalarsConverterFactory.create()).addCallAdapterFactory(RxJavaCallAdapterFactory.create()).
baseUrl(CommonUtils.getBaseUrl(info.getUrl())).client(builder.build()).build();
HttpService service = retrofit.create(HttpService.class);
info.setHttpService(service);
service.download("bytes=" + info.getReadLength() + "-", info.getUrl()).subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io()).retryWhen(new RetryWhenNetWorkException()).map(new Func1<ResponseBody, DownInfo>() {
@Override
public DownInfo call(ResponseBody responseBody) {
FileUtil.writeToCache(responseBody, info.getSavedFilePath(), info.getReadLength(), info.getTotalLength());
// 这里进行转化
return info;
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(downLoadSubscriber);
}
@Override
public void loadData() {
RetrofitHelper.getBiliAppAPI()
.getVideoDetails(av)
.compose(this.bindToLifecycle())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(videoDetails -> {
mVideoDetailsInfo = videoDetails.getData();
LogUtil.test(" VideoDetails finishTask" + mVideoDetailsInfo.getTitle());
finishTask();
}, throwable -> {
mFAB.setClickable(false);
mFAB.setBackgroundTintList(ColorStateList.valueOf(
getResources().getColor(R.color.gray_20)));
});
}
public void queryMoreNews() {
RxCenter.INSTANCE.getCompositeSubscription(TaskIds.newsTaskId).add(AppNetWork.INSTANCE.getNewsApi().loadMoreNews(mLastNewsNid)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<NewsList>() {
@Override
public void call(NewsList newsList) {
int size = newsList.getNews().size();
if (size > 0) {
mLastNewsNid = newsList.getNews().get(size - 1).getNid();
}
mCallback.onUpdateSuccessed(newsList.getNews(), true);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
mCallback.onUpdateFailed(true);
}
}));
}
public void onStarClick(View view) {
if (starred.get()) {
unstarUseCase.run(repo.get().owner.login, repo.get().name)
.compose(bindToLifecycle().forCompletable())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(() -> {
eventBus.post(new StarEvent());
Snackbar.make(view, "Unstarred!", Snackbar.LENGTH_SHORT).show();
});
} else {
starUseCase.run(repo.get().owner.login, repo.get().name)
.compose(bindToLifecycle().forCompletable())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(() -> {
eventBus.post(new StarEvent());
Snackbar.make(view, "Starred!", Snackbar.LENGTH_SHORT).show();
});
}
}
public void startNotificationWithChatActivity(final String messageType, final String _targetId) {
final Bundle bundle = new Bundle();
bundle.putBoolean(NOTIFICATION, true);
if (XiaomiPushReceiver.DMS.equalsIgnoreCase(messageType)) {
final Member member = MainApp.globalMembers.get(_targetId);
bundle.putParcelable(ChatActivity.EXTRA_MEMBER, Parcels.wrap(member));
TransactionUtil.goTo(this, ChatActivity.class, bundle);
} else if (XiaomiPushReceiver.ROOM.equalsIgnoreCase(messageType)) {
final Room room = MainApp.globalRooms.get(_targetId);
bundle.putParcelable(ChatActivity.EXTRA_ROOM, Parcels.wrap(room));
TransactionUtil.goTo(this, ChatActivity.class, bundle);
} else if (XiaomiPushReceiver.STORY.equalsIgnoreCase(messageType)) {
StoryRealm.getInstance().getSingleStory(_targetId)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Story>() {
@Override
public void call(Story story) {
if (story != null) {
bundle.putParcelable(ChatActivity.EXTRA_STORY, Parcels.wrap(story));
TransactionUtil.goTo(HomeActivity.this, ChatActivity.class, bundle);
}
}
}, new RealmErrorAction());
}
}
public void getBookDetail(String bookId) {
Subscription rxSubscription = bookApi.getBookDetail(bookId).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<BookDetail>() {
@Override
public void onNext(BookDetail data) {
if (data != null && mView != null) {
mView.showBookDetail(data);
}
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e);
}
});
addSubscrebe(rxSubscription);
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_auth);
SampleApplication
.authorize()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(() -> {
Timber.d("auth complete: token=%s token_secret=%s",
SampleApplication.getToken(), SampleApplication.getSecret());
Toast.makeText(this, "Auth complete", Toast.LENGTH_LONG).show();
startActivity(new Intent(this, MainActivity.class));
finish();
}, error -> {
Timber.e(error, "Failed to complete auth");
Toast.makeText(this, "Auth failed", Toast.LENGTH_LONG).show();
finish();
});
}
public void searchMessages(String keyword) {
callback.showProgressBar();
talkApi.searchMessages(BizLogic.getTeamId(), keyword, 20)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<SearchResponseData>() {
@Override
public void call(SearchResponseData searchResponseData) {
callback.dismissProgressBar();
if (searchResponseData.messages != null) {
callback.onSearchFinish(searchResponseData.messages);
}
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
callback.dismissProgressBar();
callback.onSearchFinish(new ArrayList<Message>());
}
});
}
@Override
public void updateMemoryInfo() {
mCategoryManager.getAvaliableMemoryUsingObservable()
.zipWith(mCategoryManager.getTotalMemoryUsingObservable(),
(avaliable, total) -> {
long usedMemory = total - avaliable;
mView.setMemoryProgress(usedMemory * 100 / total);
return FormatUtil.formatFileSize(usedMemory).toString() + "/" +
FormatUtil.formatFileSize(total).toString();
})
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(memory -> {
mView.setMemoryText(memory);
}, Throwable::printStackTrace);
}
@Override
public void showLatestMessages(List<Message> messages) {
showMessages(messages, false);
if (StringUtil.isNotBlank(messageToSend)) {
sendTextMessage(messageToSend);
}
if (imageToSend != null) {
Observable.from(imageToSend)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
sendImageMessage(s);
}
});
}
checkShareData();
}
/**
* 获取优秀的个人网站
*
* @return
*/
public Observable<List<WebSite>> getExcellentWebSite() {
//1.创建Request
HttpGsonRequest<List<WebSite>> mRefreshRequest = RequestBuilder.<List<WebSite>>create()
.requestMethod(Request.Method.POST)
.url(ApiConstant.API_EXCELLENT_WEBSITE)
.parser(new WebSiteParser())
.build();
//2.进行数据的处理
return gRequestPool.request(mRefreshRequest)
.filter((resp) -> null != resp && null != resp.data)
.map(HttpResponse::getData)
.filter(ValidateUtil::isValidate)
.doOnNext(data -> mWebSites = data)
.observeOn(AndroidSchedulers.mainThread());
}
@Override
public void onClick(View v) {
super.onClick(v);
switch (v.getId()) {
case R.id.btn_operator:
tvLogs.setText("");
userApi.getUserInfoNoToken()
.retryWhen(new RetryWithDelay(3, 3000))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Action1<Response>() {
@Override
public void call(Response response) {
String content = new String(((TypedByteArray) response.getBody()).getBytes());
printLog(tvLogs, "", content);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
}
});
}
}
@Override
public void onViewCreated(View view, @Nullable Bundle savedInstanceState) {
super.onViewCreated(view, savedInstanceState);
unbinder = ButterKnife.bind(this, view);
if (mCategoryAdapter == null) {
mCategoryAdapter = new CategoryAdapter();
}
recyclerView.setLayoutManager(new GridLayoutManager(getContext(), 2));
recyclerView.setAdapter(mCategoryAdapter);
recyclerView.addItemDecoration(new GridItemDecoration(getContext()));
DataManager.getInstance(getContext())
.loadCategory()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(onLoad, onError);
}
@Override
public void getBookReviewDetail(String id) {
Subscription rxSubscription = bookApi.getBookReviewDetail(id).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<BookReview>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
LogUtils.e("getBookReviewDetail:" + e.toString());
}
@Override
public void onNext(BookReview data) {
mView.showBookReviewDetail(data);
}
});
addSubscrebe(rxSubscription);
}
public static <T> Observable.Transformer<T, T> rxCacheBeanHelper(final String key) {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable
.subscribeOn(Schedulers.io())//指定doOnNext执行线程是新线程
.doOnNext(new Action1<T>() {
@Override
public void call(final T data) {
Schedulers.io().createWorker().schedule(new Action0() {
@Override
public void call() {
LogUtils.d("get data from network finish ,start cache...");
ACache.get(ReaderApplication.getsInstance())
.put(key, new Gson().toJson(data, data.getClass()));
LogUtils.d("cache finish");
}
});
}
})
.observeOn(AndroidSchedulers.mainThread());
}
};
}