下面列出了怎么用io.reactivex.FlowableEmitter的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public Flowable<ZeroFiveNewsDetail> getAcgNewsDetail(final String url) {
return Flowable.create(new FlowableOnSubscribe<ZeroFiveNewsDetail>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ZeroFiveNewsDetail> e) throws Exception {
Element html = Jsoup.connect(url).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
ZeroFiveNewsDetail zeroFiveNewsDetail = JP.from(html, ZeroFiveNewsDetail.class);
e.onNext(zeroFiveNewsDetail);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
@Test
public void whenTheConstructorIsCalledWithAValidEmitterThenGetOnErrorReturnsTheEmitter() {
// Given
@SuppressWarnings("unchecked")
final FlowableEmitter<Object> emitter = Mockito
.mock(FlowableEmitter.class);
final Throwable ex = Mockito.mock(Throwable.class);
final FlowableEmitterMqttActionListener<Object> listener = new FlowableEmitterMqttActionListener<Object>(
emitter) {
@Override
public void onSuccess(final IMqttToken asyncActionToken) {
// Not invoked
}
};
// When
final OnError onError = listener.getOnError();
onError.onError(ex);
// Then
Mockito.verify(emitter).onError(ex);
}
@Override
public Flowable<ScheduleDetail> getScheduleDetail(final String url) {
return Flowable.create(new FlowableOnSubscribe<ScheduleDetail>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ScheduleDetail> e) throws Exception {
String scheduleLink = url;
if (!url.contains("http")) {
scheduleLink = HtmlConstant.YHDM_M_URL + url;
}
Element html = Jsoup.connect(scheduleLink).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
ScheduleDetail scheduleDetail = JP.from(html, ScheduleDetail.class);
e.onNext(scheduleDetail);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
public Flowable<Boolean> deleteScheduleCacheByUrl(final String url) {
return RxRealmUtils
.flowableExec(realmConfiguration, new Consumer<Pair<FlowableEmitter, Realm>>() {
@Override
public void accept(final Pair<FlowableEmitter, Realm> pair) throws Exception {
pair.second.executeTransactionAsync(new Transaction() {
@Override
public void execute(Realm r) {
final boolean isSuccess = r.where(ScheduleCache.class)
.equalTo("scheduleUrl", url)
.findAll()
.deleteAllFromRealm();
pair.first.onNext(isSuccess);
pair.first.onComplete();
}
});
}
});
}
@Override
public Flowable<ScheduleOtherPage> getScheduleOtherPage(final String url) {
return Flowable.create(new FlowableOnSubscribe<ScheduleOtherPage>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ScheduleOtherPage> e) throws Exception {
Element html = Jsoup.connect(url).get();
if (html == null) {
e.onError(new Throwable("element html is null"));
} else {
ScheduleOtherPage scheduleOtherPage = JP.from(html, ScheduleOtherPage.class);
e.onNext(scheduleOtherPage);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<ScheduleVideo> getScheduleVideo(final String url) {
return Flowable.create(new FlowableOnSubscribe<ScheduleVideo>() {
@Override
public void subscribe(@NonNull FlowableEmitter<ScheduleVideo> e) throws Exception {
Element html = Jsoup.connect(url).get();
if(html == null){
e.onError(new Throwable("element html is null"));
}else {
ScheduleVideo scheduleVideo = JP.from(html, ScheduleVideo.class);
if (!TextUtils.isEmpty(scheduleVideo.getVideoHtml())) {
scheduleVideo.setVideoUrl("http://tup.yhdm.tv/?m=1&vid=" + scheduleVideo.getVideoUrl());
}
/*StringBuilder scheduleVideoHtmlBuilder = new StringBuilder();
scheduleVideoHtmlBuilder.append(HtmlConstant.SCHEDULE_VIDEO_CSS);
scheduleVideoHtmlBuilder.append("<div class=\"player_main\" style=\"position: relative;\"> ");
scheduleVideoHtmlBuilder.append(scheduleVideo.getVideoHtml());
scheduleVideoHtmlBuilder.append("</div>");
scheduleVideo.setVideoHtml(scheduleVideoHtmlBuilder.toString());*/
e.onNext(scheduleVideo);
e.onComplete();
}
}
}, BackpressureStrategy.BUFFER);
}
private static TestSubscriber<InAppMessage> listenerToFlowable(FirebaseInAppMessaging instance) {
Flowable<InAppMessage> listenerFlowable =
Flowable.create(
new FlowableOnSubscribe<InAppMessage>() {
@Override
public void subscribe(FlowableEmitter<InAppMessage> emitter) throws Exception {
instance.setMessageDisplayComponent(
new FirebaseInAppMessagingDisplay() {
@Override
public void displayMessage(
InAppMessage inAppMessage,
FirebaseInAppMessagingDisplayCallbacks callbacks) {
emitter.onNext(inAppMessage);
Log.i("FIAM", "Putting callback for IAM " + inAppMessage.getCampaignName());
callbacksHashMap.put(inAppMessage.getCampaignId(), callbacks);
}
});
}
},
BUFFER);
return listenerFlowable.test();
}
static <T> void createListItemEmitter(final Query<T> query, final FlowableEmitter<T> emitter) {
final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
@Override
public void onData(List<T> data) {
for (T datum : data) {
if (emitter.isCancelled()) {
return;
} else {
emitter.onNext(datum);
}
}
if (!emitter.isCancelled()) {
emitter.onComplete();
}
}
});
emitter.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
dataSubscription.cancel();
}
});
}
@Test
public void whenAMessageArrivesThenTheObserverIsNotified()
throws Exception {
@SuppressWarnings("unchecked")
final FlowableEmitter<MqttMessage> observer = Mockito
.mock(FlowableEmitter.class);
final IMqttMessageListener listener = new SubscriberMqttMessageListener(
observer);
final String expectedTopic = "expected";
final byte[] expectedPayload = new byte[] { 'a', 'b', 'c' };
final org.eclipse.paho.client.mqttv3.MqttMessage expectedMessage = new org.eclipse.paho.client.mqttv3.MqttMessage(
expectedPayload);
expectedMessage.setQos(2);
expectedMessage.setId(1);
expectedMessage.setRetained(true);
final ArgumentCaptor<SubscribeMessage> actualMessage = ArgumentCaptor
.forClass(SubscribeMessage.class);
listener.messageArrived(expectedTopic, expectedMessage);
Mockito.verify(observer).onNext(actualMessage.capture());
Assert.assertArrayEquals(expectedPayload,
actualMessage.getValue().getPayload());
Assert.assertEquals(2, actualMessage.getValue().getQos());
Assert.assertEquals(1, actualMessage.getValue().getId());
Assert.assertTrue(actualMessage.getValue().isRetained());
Assert.assertEquals(expectedTopic, actualMessage.getValue().getTopic());
}
public SimpleRemoteSourceMapper(FlowableEmitter<Resource<T>> emitter) {
emitter.onNext(Resource.loading(null));
// since realm instance was created on Main Thread, so if we need to touch on realm database after calling
// api (such as save response data to local database, we must make request on main thread
// by setting shouldUpdateUi params = true
Disposable disposable = RestHelper.makeRequest(getRemote(), true, response -> {
Log.d(TAG, "SimpleRemoteSourceMapper: call API success!");
saveCallResult(response);
emitter.onNext(Resource.success(response));
}, errorEntity -> {
Log.d(TAG, "SimpleRemoteSourceMapper: call API error: " + errorEntity.getMessage());
emitter.onNext(Resource.error(errorEntity.getMessage(), null));
});
// set emitter disposable to ensure that when it is going to be disposed, our api request should be disposed as well
emitter.setDisposable(disposable);
}
@Override
public void subscribe(FlowableEmitter<T> e) throws Exception {
try {
Response response = call.execute();
if (!e.isCancelled()) {
if (response.isSuccessful()) {
e.onNext(rxEasyConverter.convert(response.body().string()));
} else {
e.onError(new Throwable("response is unsuccessful"));
}
}
} catch (Throwable t) {
if (!e.isCancelled()) {
e.onError(t);
}
} finally {
e.onComplete();
}
}
private void asyncMethod(final ProceedingJoinPoint joinPoint) throws Throwable {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
Looper.prepare();
try {
joinPoint.proceed();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
Looper.loop();
}
}
, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
@Override
public void subscribe(final FlowableEmitter<Integer> emitter) throws Exception {
checkUiThread();
View.OnClickListener listener = v -> {
if (!emitter.isCancelled()) {
emitter.onNext(1);
}
};
view.setOnClickListener(listener);
emitter.setDisposable(new MainThreadDisposable() {
@Override
protected void onDispose() {
view.setOnClickListener(null);
}
});
}
@Override
public final void subscribe(FlowableEmitter<T> emitter) throws Exception {
final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter));
try {
apiClient.connect();
} catch (Throwable ex) {
emitter.onError(ex);
}
emitter.setCancellable(() -> {
if (apiClient.isConnected()) {
onUnsubscribed(apiClient);
}
apiClient.disconnect();
});
}
public static void demo3() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "current requested: " + emitter.requested());
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(1000);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
public static void demo1() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
/**
* <p>
* Generate RX observable support
* </p>
* .
*
* @param dataSourceName
* the data source name
* @param daoFactory
* the dao factory
*/
public void generateRx(ClassName dataSourceName, String daoFactory) {
classBuilder.addField(FieldSpec.builder(Scheduler.class, "globalSubscribeOn", Modifier.PROTECTED).build());
classBuilder.addMethod(MethodSpec.methodBuilder("globalSubscribeOn").returns(dataSourceName)
.addParameter(Scheduler.class, "scheduler").addModifiers(Modifier.PUBLIC)
.addStatement("this.globalSubscribeOn=scheduler").addStatement("return this").build());
classBuilder.addField(FieldSpec.builder(Scheduler.class, "globalObserveOn", Modifier.PROTECTED).build());
classBuilder.addMethod(MethodSpec.methodBuilder("globalObserveOn").addParameter(Scheduler.class, "scheduler")
.returns(dataSourceName).addModifiers(Modifier.PUBLIC).addStatement("this.globalObserveOn=scheduler")
.addStatement("return this").build());
generateRxInterface(daoFactory, RxInterfaceType.BATCH, ObservableEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.TRANSACTION, ObservableEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.BATCH, SingleEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.TRANSACTION, SingleEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.BATCH, FlowableEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.TRANSACTION, FlowableEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.BATCH, MaybeEmitter.class);
generateRxInterface(daoFactory, RxInterfaceType.TRANSACTION, MaybeEmitter.class);
generatExecuteTransactionRx(dataSourceName, daoFactory, RxType.OBSERVABLE);
generatExecuteTransactionRx(dataSourceName, daoFactory, RxType.SINGLE);
generatExecuteTransactionRx(dataSourceName, daoFactory, RxType.FLOWABLE);
generatExecuteTransactionRx(dataSourceName, daoFactory, RxType.MAYBE);
generatExecuteBatchRx(dataSourceName, daoFactory, RxType.OBSERVABLE);
generatExecuteBatchRx(dataSourceName, daoFactory, RxType.SINGLE);
generatExecuteBatchRx(dataSourceName, daoFactory, RxType.FLOWABLE);
generatExecuteBatchRx(dataSourceName, daoFactory, RxType.MAYBE);
}
public static void demo4() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 128; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
@Override
public void subscribe(@NonNull FlowableEmitter<Result> emitter) throws Exception {
try {
emitter.onNext(preparedOperation.executeAsBlocking());
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
@Override
public void subscribe(@NonNull FlowableEmitter<Optional<Result>> emitter) throws Exception {
try {
final Result value = preparedOperation.executeAsBlocking();
emitter.onNext(Optional.of(value));
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
@SuppressLint("CheckResult")
private void shareTextOrImage(final Pair<String, byte[]> shareImageObject, final String title, final String targetUrl, final String summary,
final Activity activity, final ShareListener listener) {
mShareImage = Flowable.create(new FlowableOnSubscribe<Pair<String, byte[]>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Pair<String, byte[]>> emitter) {
try {
emitter.onNext(shareImageObject);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Pair<String, byte[]>>() {
@Override
public void accept(@NonNull Pair<String, byte[]> stringPair) {
handleShare(stringPair, title, targetUrl, summary, listener);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
listener.shareFailure(new Exception(throwable));
recycle();
activity.finish();
}
});
}
@SuppressLint("CheckResult")
private void shareFunc(final int platform, final String title, final String targetUrl, final String summary,
final ShareImageObject shareImageObject, final boolean immediate, final Activity activity, final ShareListener listener) {
mShareFunc = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull FlowableEmitter<String> emitter) {
try {
if (immediate) {
emitter.onNext(shareImageObject.getPathOrUrl());
} else {
emitter.onNext(ImageDecoder.decode(activity, shareImageObject));
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) {
if (platform == SharePlatform.QZONE) {
shareToQZoneForMedia(title, targetUrl, summary, s, activity, listener);
} else {
shareToQQForMedia(title, summary, targetUrl, s, activity, listener);
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
listener.shareFailure(new Exception(throwable));
recycle();
activity.finish();
}
});
}
@SuppressLint("CheckResult")
@Override
public void shareImage(final int platform, final ShareImageObject shareImageObject,
final Activity activity, final ShareListener listener) {
mShareImage = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Exception {
try {
emitter.onNext(ImageDecoder.decode(activity, shareImageObject));
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String localPath) throws Exception {
if (platform == SharePlatform.QZONE) {
shareToQzoneForImage(localPath, activity, listener);
} else {
shareToQQForImage(localPath, activity, listener);
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
listener.shareFailure(new Exception(throwable));
recycle();
activity.finish();
}
});
}
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
mSubscribe = Flowable.create(new FlowableOnSubscribe<QQUser>() {
@Override
public void subscribe(@NonNull FlowableEmitter<QQUser> qqUserEmitter) {
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(buildUserInfoUrl(token, URL)).build();
try {
Response response = client.newCall(request).execute();
JSONObject jsonObject = new JSONObject(response.body().string());
QQUser user = QQUser.parse(token.getOpenid(), jsonObject);
qqUserEmitter.onNext(user);
qqUserEmitter.onComplete();
} catch (IOException | JSONException e) {
ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
qqUserEmitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<QQUser>() {
@Override
public void accept(@NonNull QQUser qqUser) {
mLoginListener.loginSuccess(
new LoginResultData(LoginPlatform.QQ, token, qqUser));
LoginUtil.recycle();
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
LoginUtil.recycle();
}
});
}
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
mSubscribe = Flowable.create(new FlowableOnSubscribe<WeiboUser>() {
@Override
public void subscribe(@NonNull FlowableEmitter<WeiboUser> weiboUserEmitter) {
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(buildUserInfoUrl(token, USER_INFO)).build();
try {
Response response = client.newCall(request).execute();
JSONObject jsonObject = new JSONObject(response.body().string());
WeiboUser user = WeiboUser.parse(jsonObject);
weiboUserEmitter.onNext(user);
weiboUserEmitter.onComplete();
} catch (IOException | JSONException e) {
ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
weiboUserEmitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<WeiboUser>() {
@Override
public void accept(@NonNull WeiboUser weiboUser) throws Exception {
mLoginListener.loginSuccess(
new LoginResultData(LoginPlatform.WEIBO, token, weiboUser));
LoginUtil.recycle();
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
LoginUtil.recycle();
}
});
}
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
mSubscribe = Flowable.create(new FlowableOnSubscribe<TwitterUser>() {
@Override
public void subscribe(@NonNull FlowableEmitter<TwitterUser> userEmitter) {
TwitterApiClient apiClient = TwitterCore.getInstance().getApiClient();
Call<User> userCall = apiClient.getAccountService().verifyCredentials(true, false, false);
try {
Response<User> execute = userCall.execute();
userEmitter.onNext(new TwitterUser(execute.body()));
userEmitter.onComplete();
} catch (Exception e) {
ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
userEmitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<TwitterUser>() {
@Override
public void accept(@NonNull TwitterUser user) {
mLoginListener.loginSuccess(new LoginResultData(LoginPlatform.TWITTER, token, user));
LoginUtil.recycle();
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
LoginUtil.recycle();
}
});
}
@SuppressLint("CheckResult")
private void getToken(final String code) {
mTokenSubscribe = Flowable.create(new FlowableOnSubscribe<WxToken>() {
@Override
public void subscribe(@NonNull FlowableEmitter<WxToken> wxTokenEmitter) {
Request request = new Request.Builder().url(buildTokenUrl(code)).build();
try {
Response response = mClient.newCall(request).execute();
JSONObject jsonObject = new JSONObject(response.body().string());
WxToken token = new WxToken(jsonObject);
wxTokenEmitter.onNext(token);
wxTokenEmitter.onComplete();
} catch (IOException | JSONException e) {
wxTokenEmitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<WxToken>() {
@Override
public void accept(@NonNull WxToken wxToken) {
if (mFetchUserInfo) {
mLoginListener.beforeFetchUserInfo(wxToken);
fetchUserInfo(wxToken);
} else {
mLoginListener.loginSuccess(new LoginResultData(LoginPlatform.WX, wxToken));
LoginUtil.recycle();
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
mLoginListener.loginFailure(new Exception(throwable.getMessage()), ShareLogger.INFO.ERR_GET_TOKEN_CODE);
LoginUtil.recycle();
}
});
}
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
mSubscribe = Flowable.create(new FlowableOnSubscribe<WxUser>() {
@Override
public void subscribe(@NonNull FlowableEmitter<WxUser> wxUserEmitter) {
Request request = new Request.Builder().url(buildUserInfoUrl(token)).build();
try {
Response response = mClient.newCall(request).execute();
JSONObject jsonObject = new JSONObject(response.body().string());
WxUser user = WxUser.parse(jsonObject);
wxUserEmitter.onNext(user);
} catch (IOException | JSONException e) {
wxUserEmitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<WxUser>() {
@Override
public void accept(@NonNull WxUser wxUser) {
mLoginListener.loginSuccess(new LoginResultData(LoginPlatform.WX, token, wxUser));
LoginUtil.recycle();
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
LoginUtil.recycle();
}
});
}
@Override
public void fetchUserInfo(final BaseToken token) {
mSubscribe = Flowable.create(new FlowableOnSubscribe<InsUser>() {
@Override
public void subscribe(@NonNull FlowableEmitter<InsUser> insUserEmitter) {
Request post = new Request.Builder().url(sTokenUrl).post(buildUserInfoUrl(token)).build();
try {
Response response = mClient.newCall(post).execute();
JSONObject jsonObject = new JSONObject(response.body().string());
ShareLogger.i("auth:" + jsonObject.toString());
InsUser user = new InsUser(jsonObject);
insUserEmitter.onNext(user);
insUserEmitter.onComplete();
} catch (IOException | JSONException e) {
insUserEmitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<InsUser>() {
@Override
public void accept(@NonNull InsUser insUser) {
mLoginListener.loginSuccess(new LoginResultData(LoginPlatform.INS, token, insUser));
LoginUtil.recycle();
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
LoginUtil.recycle();
}
});
}
@NonNull
private static <T, R> RxTaskOnSubscribe<RxAsyncTask<T, R>> getRxAsyncTaskOnSubscribe(@NonNull final RxAsyncTask<T, R> rxTask) {
return new RxTaskOnSubscribe<RxAsyncTask<T, R>>(rxTask) {
@Override
public void subscribe(FlowableEmitter<RxAsyncTask<T, R>> emitter) throws Exception {
RxAsyncTask<T, R> task = getTask();
task.setOutData(task.doInIOThread(task.getInData())); //在io线程工作
emitter.onNext(task);
emitter.onComplete();
}
};
}