下面列出了io.reactivex.CompletableOnSubscribe#io.reactivex.FlowableOnSubscribe 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Flowable<ZeroFiveNewsDetail> getAcgNewsDetail(final String url) {
return Flowable.create(new FlowableOnSubscribe<ZeroFiveNewsDetail>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ZeroFiveNewsDetail> e) throws Exception {
Element html = Jsoup.connect(url).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
ZeroFiveNewsDetail zeroFiveNewsDetail = JP.from(html, ZeroFiveNewsDetail.class);
e.onNext(zeroFiveNewsDetail);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<ZeroFiveNewsPage> getAcgNews(final String typeUrl) {
return Flowable.create(new FlowableOnSubscribe<ZeroFiveNewsPage>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ZeroFiveNewsPage> e) throws Exception {
Element html = Jsoup.connect(typeUrl).get();
if(html == null){
e.onError(new Throwable("element html is null"));
}else {
ZeroFiveNewsPage zeroFiveNewsPage = JP.from(html, ZeroFiveNewsPage.class);
e.onNext(zeroFiveNewsPage);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<ScheduleDetail> getScheduleDetail(final String url) {
return Flowable.create(new FlowableOnSubscribe<ScheduleDetail>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ScheduleDetail> e) throws Exception {
String scheduleLink = url;
if (!url.contains("http")) {
scheduleLink = HtmlConstant.YHDM_M_URL + url;
}
Element html = Jsoup.connect(scheduleLink).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
ScheduleDetail scheduleDetail = JP.from(html, ScheduleDetail.class);
e.onNext(scheduleDetail);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<ScheduleNew> getScheduleNew(final String url) {
return Flowable.create(new FlowableOnSubscribe<ScheduleNew>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ScheduleNew> e) throws Exception {
Element html = Jsoup.connect(url).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
ScheduleNew scheduleNew = JP.from(html, ScheduleNew.class);
e.onNext(scheduleNew);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<ScheduleOtherPage> getScheduleOtherPage(final String url) {
return Flowable.create(new FlowableOnSubscribe<ScheduleOtherPage>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ScheduleOtherPage> e) throws Exception {
Element html = Jsoup.connect(url).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
ScheduleOtherPage scheduleOtherPage = JP.from(html, ScheduleOtherPage.class);
e.onNext(scheduleOtherPage);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<ScheduleVideo> getScheduleVideo(final String url) {
return Flowable.create(new FlowableOnSubscribe<ScheduleVideo>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ScheduleVideo> e) throws Exception {
Element html = Jsoup.connect(url).get();
if(html == null){
e.onError(new Throwable("element html is null"));
}else {
ScheduleVideo scheduleVideo = JP.from(html, ScheduleVideo.class);
if (!TextUtils.isEmpty(scheduleVideo.getVideoHtml())) {
scheduleVideo.setVideoUrl("http://tup.yhdm.tv/?m=1&vid=" + scheduleVideo.getVideoUrl());
}
/*StringBuilder scheduleVideoHtmlBuilder = new StringBuilder();
scheduleVideoHtmlBuilder.append(HtmlConstant.SCHEDULE_VIDEO_CSS);
scheduleVideoHtmlBuilder.append("<div class=\"player_main\" style=\"position: relative;\"> ");
scheduleVideoHtmlBuilder.append(scheduleVideo.getVideoHtml());
scheduleVideoHtmlBuilder.append("</div>");
scheduleVideo.setVideoHtml(scheduleVideoHtmlBuilder.toString());*/
e.onNext(scheduleVideo);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
private static TestSubscriber<InAppMessage> listenerToFlowable(FirebaseInAppMessaging instance) {
Flowable<InAppMessage> listenerFlowable =
Flowable.create(
new FlowableOnSubscribe<InAppMessage>() {
@Override
public void subscribe(FlowableEmitter<InAppMessage> emitter) throws Exception {
instance.setMessageDisplayComponent(
new FirebaseInAppMessagingDisplay() {
@Override
public void displayMessage(
InAppMessage inAppMessage,
FirebaseInAppMessagingDisplayCallbacks callbacks) {
emitter.onNext(inAppMessage);
Log.i("FIAM", "Putting callback for IAM " + inAppMessage.getCampaignName());
callbacksHashMap.put(inAppMessage.getCampaignId(), callbacks);
}
});
}
},
BUFFER);
return listenerFlowable.test();
}
/**
* use only request with an empty array to request all manifest permissions
*/
public Flowable<PermissionResult> requestAsFlowable(final List<String> permissions) {
return Flowable.create(new FlowableOnSubscribe<PermissionResult>() {
@Override
public void subscribe(final FlowableEmitter<PermissionResult> emitter) throws Exception {
runtimePermission
.request(permissions)
.onResponse(new ResponseCallback() {
@Override
public void onResponse(PermissionResult result) {
if (result.isAccepted()) {
emitter.onNext(result);
} else {
emitter.onError(new Error(result));
}
}
}).ask();
}
}, BackpressureStrategy.LATEST);
}
@Override
public Flowable<MediaEntity.MediaResponseData> getMediaList(int imageOffset, int videoOffset, String bucketId) {
if (TextUtils.isEmpty(bucketId)) {
return Flowable.empty();
}
if (TextUtils.equals(bucketId, String.valueOf(Integer.MAX_VALUE))) {
// 图片和视频
return Flowable.create((FlowableOnSubscribe<MediaEntity.MediaResponseData>) e -> {
e.onNext(handleImageAndVideoMediaList(imageOffset, videoOffset));
e.onComplete();
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
} else if (TextUtils.equals(bucketId, String.valueOf(Integer.MIN_VALUE))) {
// 所有视频
return Flowable.create((FlowableOnSubscribe<MediaEntity.MediaResponseData>) e -> {
e.onNext(handleVideoMediaList(imageOffset, videoOffset));
e.onComplete();
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
} else {
return Flowable.create((FlowableOnSubscribe<MediaEntity.MediaResponseData>) e -> {
e.onNext(handleImageMediaList(imageOffset, videoOffset, bucketId));
e.onComplete();
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
}
}
private void asyncMethod(final ProceedingJoinPoint joinPoint) throws Throwable {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
Looper.prepare();
try {
joinPoint.proceed();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
Looper.loop();
}
}
, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
@Override
public Flowable<T> flowable(BackpressureStrategy mode) {
return Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
final CloseableIterator<T> iterator = mCallable.call();
try {
while (!emitter.isCancelled() && iterator.hasNext()) {
emitter.onNext(iterator.next());
}
emitter.onComplete();
} finally {
iterator.close();
}
}
}, mode);
}
/**
* 查询全国所有的省,从数据库查询
*/
private void queryProvinces() {
getToolbar().setTitle("选择省份");
Flowable.create((FlowableOnSubscribe<String>) emitter -> {
if (provincesList.isEmpty()) {
provincesList.addAll(WeatherDB.loadProvinces(DBManager.getInstance().getDatabase()));
}
dataList.clear();
for (Province province : provincesList) {
emitter.onNext(province.mProName);
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER)
.compose(RxUtil.ioF())
.compose(RxUtil.activityLifecycleF(this))
.doOnNext(proName -> dataList.add(proName))
.doOnComplete(() -> {
mProgressBar.setVisibility(View.GONE);
currentLevel = LEVEL_PROVINCE;
mAdapter.notifyDataSetChanged();
})
.subscribe();
}
/**
* 查询选中省份的所有城市,从数据库查询
*/
private void queryCities() {
getToolbar().setTitle("选择城市");
dataList.clear();
mAdapter.notifyDataSetChanged();
Flowable.create((FlowableOnSubscribe<String>) emitter -> {
cityList = WeatherDB.loadCities(DBManager.getInstance().getDatabase(), selectedProvince.mProSort);
for (City city : cityList) {
emitter.onNext(city.mCityName);
}
emitter.onComplete();
}, BackpressureStrategy.BUFFER)
.compose(RxUtil.ioF())
.compose(RxUtil.activityLifecycleF(this))
.doOnNext(proName -> dataList.add(proName))
.doOnComplete(() -> {
currentLevel = LEVEL_CITY;
mAdapter.notifyDataSetChanged();
mRecyclerView.smoothScrollToPosition(0);
})
.subscribe();
}
public <T> Flowable<CacheResult<T>> load2Flowable(final String key, final Type type, BackpressureStrategy backpressureStrategy) {
return Flowable.create(new FlowableOnSubscribe<CacheResult<T>>() {
@Override
public void subscribe(FlowableEmitter<CacheResult<T>> flowableEmitter) throws Exception {
CacheResult<T> load = cacheCore.load(getMD5MessageDigest(key), type);
if (!flowableEmitter.isCancelled()) {
if (load != null) {
flowableEmitter.onNext(load);
flowableEmitter.onComplete();
} else {
flowableEmitter.onError(new NullPointerException("Not find the key corresponding to the cache"));
}
}
}
}, backpressureStrategy);
}
public static Flowable<ActorMessage<?>> getMessages(final Queue<ActorMessage<?>> stash) {
return Flowable.create(new FlowableOnSubscribe<ActorMessage<?>>() {
@Override
public void subscribe(FlowableEmitter<ActorMessage<?>> emitter) throws Exception {
try {
ActorMessage<?> message;
for (int i=0; !emitter.isCancelled() && i<emitter.requested() && (message=stash.poll())!=null; i++)
emitter.onNext(message);
if (emitter.isCancelled())
return;
else
emitter.onComplete();;
}
catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER);
}
/**
* Send chat message to the server.
*
* @param chatMessage
* @return
*/
@Override
public Flowable<ChatMessage> sendMessage(@NonNull final ChatMessage chatMessage) {
return Flowable.create(new FlowableOnSubscribe<ChatMessage>() {
@Override
public void subscribe(FlowableEmitter<ChatMessage> emitter) throws Exception {
/*
* Socket.io supports acking messages.
* This feature can be used as
* mSocket.emit("EVENT_NEW_MESSAGE", chatMessage.getMessage(), new Ack() {
* @Override
* public void call(Object... args) {
* // Do something with args
*
* // On success
* emitter.onNext(chatMessage);
*
* // On error
* emitter.onError(new Exception("Sending message failed."));
* }
* });
*
* */
mSocket.emit(EVENT_NEW_MESSAGE, chatMessage.getMessage());
emitter.onNext(chatMessage);
}
}, BackpressureStrategy.BUFFER);
}
private <T> void subscribe(final Object subscriber,
final String tag,
final boolean isSticky,
final Scheduler scheduler,
final Callback<T> callback) {
Utils.requireNonNull(subscriber, tag, callback);
final Class<T> typeClass = Utils.getTypeClassFromParadigm(callback);
final Consumer<T> onNext = new Consumer<T>() {
@Override
public void accept(T t) {
callback.onEvent(t);
}
};
if (isSticky) {
final TagMessage stickyEvent = CacheUtils.getInstance().findStickyEvent(typeClass, tag);
if (stickyEvent != null) {
Flowable<T> stickyFlowable = Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) {
emitter.onNext(typeClass.cast(stickyEvent.mEvent));
}
}, BackpressureStrategy.LATEST);
if (scheduler != null) {
stickyFlowable = stickyFlowable.observeOn(scheduler);
}
Disposable stickyDisposable = FlowableUtils.subscribe(stickyFlowable, onNext, mOnError);
CacheUtils.getInstance().addDisposable(subscriber, stickyDisposable);
} else {
Utils.logW("sticky event is empty.");
}
}
Disposable disposable = FlowableUtils.subscribe(
toFlowable(typeClass, tag, scheduler), onNext, mOnError
);
CacheUtils.getInstance().addDisposable(subscriber, disposable);
}
@Override
public Flowable<DilidiliInfo> getDilidiliInfo() {
return Flowable.create(new FlowableOnSubscribe<DilidiliInfo>() {
@Override
public void subscribe(@NonNull FlowableEmitter<DilidiliInfo> e) throws Exception {
Element html = Jsoup.connect(HtmlConstant.YHDM_M_URL).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
DilidiliInfo dilidiliInfo = JP.from(html, DilidiliInfo.class);
/*Iterator<ScheduleWeek> scheudleWeekIterator = dilidiliInfo.getScheduleWeek().iterator();
while (scheudleWeekIterator.hasNext()) {
ScheduleWeek scheduleWeek = scheudleWeekIterator.next();
Iterator<ScheduleWeek.ScheduleItem> scheduleItemIterator = scheduleWeek
.getScheduleItems().iterator();
while (scheduleItemIterator.hasNext()) {
ScheduleWeek.ScheduleItem scheduleItem = scheduleItemIterator.next();
if (scheduleItem.getAnimeLink().contains("www.005.tv")) {
scheduleItemIterator.remove();
}
}
}
Iterator<ScheduleBanner> scheudleBannerIterator = dilidiliInfo.getScheduleBanners()
.iterator();
while (scheudleBannerIterator.hasNext()) {
ScheduleBanner scheudleBanner = scheudleBannerIterator.next();
if (TextUtils.isEmpty(scheudleBanner.getImgUrl()) |
TextUtils.isEmpty(scheudleBanner.getAnimeLink()) |
!scheudleBanner.getAnimeLink().contains("anime")) {
scheudleBannerIterator.remove();
}
}*/
e.onNext(dilidiliInfo);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
public static <T> Flowable<T> flowableExec(final RealmConfiguration configuration,
final Consumer<Pair<FlowableEmitter, Realm>> emitter) {
return Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> e) throws Exception {
try (Realm realm = Realm.getInstance(configuration)) {
emitter.accept(new Pair<FlowableEmitter, Realm>(e, realm));
}
}
}, BackpressureStrategy.BUFFER);
}
/**
* 生成Flowable
*/
public static <T> Flowable<T> createData(final T t) {
return Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
try {
emitter.onNext(t);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER);
}
@SuppressLint("CheckResult")
private void shareFunc(final int platform, final String title, final String targetUrl
, final String summary, final String miniId, final String miniPath
, final ShareImageObject shareImageObject
, final Activity activity, final ShareListener listener) {
mShareFunc = Flowable.create(new FlowableOnSubscribe<byte[]>() {
@Override
public void subscribe(@NonNull FlowableEmitter<byte[]> emitter) {
try {
String imagePath = ImageDecoder.decode(activity, shareImageObject);
emitter.onNext(ImageDecoder.compress2Byte(imagePath, TARGET_SIZE, THUMB_SIZE));
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<byte[]>() {
@Override
public void accept(@NonNull byte[] bytes) {
handleShareWx(platform, title, targetUrl, summary
, bytes, miniId, miniPath, listener);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
listener.shareFailure(new Exception(throwable));
recycle();
activity.finish();
}
});
}
@SuppressLint("CheckResult")
@Override
public void shareImage(final int platform, final ShareImageObject shareImageObject,
final Activity activity, final ShareListener listener) {
mShareImage = Flowable.create(new FlowableOnSubscribe<Pair<Bitmap, byte[]>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Pair<Bitmap, byte[]>> emitter) {
try {
String imagePath = ImageDecoder.decode(activity, shareImageObject);
emitter.onNext(Pair.create(BitmapFactory.decodeFile(imagePath),
ImageDecoder.compress2Byte(imagePath, TARGET_SIZE, THUMB_SIZE)));
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Pair<Bitmap, byte[]>>() {
@Override
public void accept(@NonNull Pair<Bitmap, byte[]> pair) {
WXImageObject imageObject = new WXImageObject(pair.first);
WXMediaMessage message = new WXMediaMessage();
message.mediaObject = imageObject;
message.thumbData = pair.second;
sendMessage(platform, message, buildTransaction("image"));
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
listener.shareFailure(new Exception(throwable));
recycle();
activity.finish();
}
});
}
@SuppressLint("CheckResult")
private void shareTextOrImage(final ShareImageObject shareImageObject, final String title, final String targetUrl, final String summary,
final Activity activity, final ShareListener listener) {
mShareImage = Flowable.create(new FlowableOnSubscribe<Pair<String, byte[]>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Pair<String, byte[]>> emitter) {
try {
String path = ImageDecoder.decode(activity, shareImageObject);
emitter.onNext(Pair.create(path, ImageDecoder.compress2Byte(path, TARGET_SIZE, TARGET_LENGTH)));
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Pair<String, byte[]>>() {
@Override
public void accept(@NonNull Pair<String, byte[]> stringPair) {
handleShare(stringPair, title, targetUrl, summary, listener);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
listener.shareFailure(new Exception(throwable.getMessage()));
recycle();
activity.finish();
}
});
}
@SuppressLint("CheckResult")
private void shareTextOrImage(final Pair<String, byte[]> shareImageObject, final String title, final String targetUrl, final String summary,
final Activity activity, final ShareListener listener) {
mShareImage = Flowable.create(new FlowableOnSubscribe<Pair<String, byte[]>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Pair<String, byte[]>> emitter) {
try {
emitter.onNext(shareImageObject);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Pair<String, byte[]>>() {
@Override
public void accept(@NonNull Pair<String, byte[]> stringPair) {
handleShare(stringPair, title, targetUrl, summary, listener);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
listener.shareFailure(new Exception(throwable));
recycle();
activity.finish();
}
});
}
@SuppressLint("CheckResult")
private void shareFunc(final int platform, final String title, final String targetUrl, final String summary,
final ShareImageObject shareImageObject, final boolean immediate, final Activity activity, final ShareListener listener) {
mShareFunc = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull FlowableEmitter<String> emitter) {
try {
if (immediate) {
emitter.onNext(shareImageObject.getPathOrUrl());
} else {
emitter.onNext(ImageDecoder.decode(activity, shareImageObject));
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) {
if (platform == SharePlatform.QZONE) {
shareToQZoneForMedia(title, targetUrl, summary, s, activity, listener);
} else {
shareToQQForMedia(title, summary, targetUrl, s, activity, listener);
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
listener.shareFailure(new Exception(throwable));
recycle();
activity.finish();
}
});
}
@SuppressLint("CheckResult")
@Override
public void shareImage(final int platform, final ShareImageObject shareImageObject,
final Activity activity, final ShareListener listener) {
mShareImage = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Exception {
try {
emitter.onNext(ImageDecoder.decode(activity, shareImageObject));
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String localPath) throws Exception {
if (platform == SharePlatform.QZONE) {
shareToQzoneForImage(localPath, activity, listener);
} else {
shareToQQForImage(localPath, activity, listener);
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
listener.shareFailure(new Exception(throwable));
recycle();
activity.finish();
}
});
}
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
mSubscribe = Flowable.create(new FlowableOnSubscribe<QQUser>() {
@Override
public void subscribe(@NonNull FlowableEmitter<QQUser> qqUserEmitter) {
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(buildUserInfoUrl(token, URL)).build();
try {
Response response = client.newCall(request).execute();
JSONObject jsonObject = new JSONObject(response.body().string());
QQUser user = QQUser.parse(token.getOpenid(), jsonObject);
qqUserEmitter.onNext(user);
qqUserEmitter.onComplete();
} catch (IOException | JSONException e) {
ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
qqUserEmitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<QQUser>() {
@Override
public void accept(@NonNull QQUser qqUser) {
mLoginListener.loginSuccess(
new LoginResultData(LoginPlatform.QQ, token, qqUser));
LoginUtil.recycle();
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
LoginUtil.recycle();
}
});
}
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
mSubscribe = Flowable.create(new FlowableOnSubscribe<WeiboUser>() {
@Override
public void subscribe(@NonNull FlowableEmitter<WeiboUser> weiboUserEmitter) {
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(buildUserInfoUrl(token, USER_INFO)).build();
try {
Response response = client.newCall(request).execute();
JSONObject jsonObject = new JSONObject(response.body().string());
WeiboUser user = WeiboUser.parse(jsonObject);
weiboUserEmitter.onNext(user);
weiboUserEmitter.onComplete();
} catch (IOException | JSONException e) {
ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
weiboUserEmitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<WeiboUser>() {
@Override
public void accept(@NonNull WeiboUser weiboUser) throws Exception {
mLoginListener.loginSuccess(
new LoginResultData(LoginPlatform.WEIBO, token, weiboUser));
LoginUtil.recycle();
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
LoginUtil.recycle();
}
});
}
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
mSubscribe = Flowable.create(new FlowableOnSubscribe<TwitterUser>() {
@Override
public void subscribe(@NonNull FlowableEmitter<TwitterUser> userEmitter) {
TwitterApiClient apiClient = TwitterCore.getInstance().getApiClient();
Call<User> userCall = apiClient.getAccountService().verifyCredentials(true, false, false);
try {
Response<User> execute = userCall.execute();
userEmitter.onNext(new TwitterUser(execute.body()));
userEmitter.onComplete();
} catch (Exception e) {
ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
userEmitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<TwitterUser>() {
@Override
public void accept(@NonNull TwitterUser user) {
mLoginListener.loginSuccess(new LoginResultData(LoginPlatform.TWITTER, token, user));
LoginUtil.recycle();
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
LoginUtil.recycle();
}
});
}
@SuppressLint("CheckResult")
private void getToken(final String code) {
mTokenSubscribe = Flowable.create(new FlowableOnSubscribe<WxToken>() {
@Override
public void subscribe(@NonNull FlowableEmitter<WxToken> wxTokenEmitter) {
Request request = new Request.Builder().url(buildTokenUrl(code)).build();
try {
Response response = mClient.newCall(request).execute();
JSONObject jsonObject = new JSONObject(response.body().string());
WxToken token = new WxToken(jsonObject);
wxTokenEmitter.onNext(token);
wxTokenEmitter.onComplete();
} catch (IOException | JSONException e) {
wxTokenEmitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<WxToken>() {
@Override
public void accept(@NonNull WxToken wxToken) {
if (mFetchUserInfo) {
mLoginListener.beforeFetchUserInfo(wxToken);
fetchUserInfo(wxToken);
} else {
mLoginListener.loginSuccess(new LoginResultData(LoginPlatform.WX, wxToken));
LoginUtil.recycle();
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
mLoginListener.loginFailure(new Exception(throwable.getMessage()), ShareLogger.INFO.ERR_GET_TOKEN_CODE);
LoginUtil.recycle();
}
});
}