类io.reactivex.FlowableEmitter源码实例Demo

下面列出了怎么用io.reactivex.FlowableEmitter的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: AcgClub   文件: ZeroFiveNewsDetailModel.java
@Override
public Flowable<ZeroFiveNewsDetail> getAcgNewsDetail(final String url) {
  return Flowable.create(new FlowableOnSubscribe<ZeroFiveNewsDetail>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<ZeroFiveNewsDetail> e) throws Exception {
      Element html = Jsoup.connect(url).get();
      if (html == null) {
        e.onError(new Throwable("element html is null"));
      } else {
        ZeroFiveNewsDetail zeroFiveNewsDetail = JP.from(html, ZeroFiveNewsDetail.class);
        e.onNext(zeroFiveNewsDetail);
        e.onComplete();
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
@Test
public void whenTheConstructorIsCalledWithAValidEmitterThenGetOnErrorReturnsTheEmitter() {

    // Given
    @SuppressWarnings("unchecked")
    final FlowableEmitter<Object> emitter = Mockito
            .mock(FlowableEmitter.class);
    final Throwable ex = Mockito.mock(Throwable.class);
    final FlowableEmitterMqttActionListener<Object> listener = new FlowableEmitterMqttActionListener<Object>(
            emitter) {

        @Override
        public void onSuccess(final IMqttToken asyncActionToken) {
            // Not invoked
        }
    };

    // When
    final OnError onError = listener.getOnError();
    onError.onError(ex);

    // Then
    Mockito.verify(emitter).onError(ex);
}
 
源代码3 项目: AcgClub   文件: ScheduleDetailModel.java
@Override
public Flowable<ScheduleDetail> getScheduleDetail(final String url) {
  return Flowable.create(new FlowableOnSubscribe<ScheduleDetail>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<ScheduleDetail> e) throws Exception {
      String scheduleLink = url;
      if (!url.contains("http")) {
        scheduleLink = HtmlConstant.YHDM_M_URL + url;
      }
      Element html = Jsoup.connect(scheduleLink).get();
      if (html == null) {
        e.onError(new Throwable("element html is null"));
      } else {
        ScheduleDetail scheduleDetail = JP.from(html, ScheduleDetail.class);
        e.onNext(scheduleDetail);
        e.onComplete();
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
源代码4 项目: AcgClub   文件: ScheduleDAO.java
public Flowable<Boolean> deleteScheduleCacheByUrl(final String url) {
  return RxRealmUtils
      .flowableExec(realmConfiguration, new Consumer<Pair<FlowableEmitter, Realm>>() {
        @Override
        public void accept(final Pair<FlowableEmitter, Realm> pair) throws Exception {
          pair.second.executeTransactionAsync(new Transaction() {
            @Override
            public void execute(Realm r) {
              final boolean isSuccess = r.where(ScheduleCache.class)
                  .equalTo("scheduleUrl", url)
                  .findAll()
                  .deleteAllFromRealm();
              pair.first.onNext(isSuccess);
              pair.first.onComplete();
            }
          });
        }
      });
}
 
源代码5 项目: AcgClub   文件: ScheduleOtherModel.java
@Override
public Flowable<ScheduleOtherPage> getScheduleOtherPage(final String url) {
  return Flowable.create(new FlowableOnSubscribe<ScheduleOtherPage>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<ScheduleOtherPage> e) throws Exception {
      Element html = Jsoup.connect(url).get();
      if (html == null) {
        e.onError(new Throwable("element html is null"));
      } else {
        ScheduleOtherPage scheduleOtherPage = JP.from(html, ScheduleOtherPage.class);
        e.onNext(scheduleOtherPage);
        e.onComplete();
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
源代码6 项目: AcgClub   文件: ScheduleVideoModel.java
@Override
public Flowable<ScheduleVideo> getScheduleVideo(final String url) {
  return Flowable.create(new FlowableOnSubscribe<ScheduleVideo>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<ScheduleVideo> e) throws Exception {
      Element html = Jsoup.connect(url).get();
      if(html == null){
        e.onError(new Throwable("element html is null"));
      }else {
        ScheduleVideo scheduleVideo = JP.from(html, ScheduleVideo.class);
        if (!TextUtils.isEmpty(scheduleVideo.getVideoHtml())) {
          scheduleVideo.setVideoUrl("http://tup.yhdm.tv/?m=1&vid=" + scheduleVideo.getVideoUrl());
        }
        /*StringBuilder scheduleVideoHtmlBuilder = new StringBuilder();
        scheduleVideoHtmlBuilder.append(HtmlConstant.SCHEDULE_VIDEO_CSS);
        scheduleVideoHtmlBuilder.append("<div class=\"player_main\" style=\"position: relative;\"> ");
        scheduleVideoHtmlBuilder.append(scheduleVideo.getVideoHtml());
        scheduleVideoHtmlBuilder.append("</div>");
        scheduleVideo.setVideoHtml(scheduleVideoHtmlBuilder.toString());*/
        e.onNext(scheduleVideo);
        e.onComplete();
      }
    }
  }, BackpressureStrategy.BUFFER);
}
 
private static TestSubscriber<InAppMessage> listenerToFlowable(FirebaseInAppMessaging instance) {
  Flowable<InAppMessage> listenerFlowable =
      Flowable.create(
          new FlowableOnSubscribe<InAppMessage>() {
            @Override
            public void subscribe(FlowableEmitter<InAppMessage> emitter) throws Exception {
              instance.setMessageDisplayComponent(
                  new FirebaseInAppMessagingDisplay() {
                    @Override
                    public void displayMessage(
                        InAppMessage inAppMessage,
                        FirebaseInAppMessagingDisplayCallbacks callbacks) {
                      emitter.onNext(inAppMessage);
                      Log.i("FIAM", "Putting callback for IAM " + inAppMessage.getCampaignName());
                      callbacksHashMap.put(inAppMessage.getCampaignId(), callbacks);
                    }
                  });
            }
          },
          BUFFER);
  return listenerFlowable.test();
}
 
源代码8 项目: ObjectBoxRxJava   文件: RxQuery.java
static <T> void createListItemEmitter(final Query<T> query, final FlowableEmitter<T> emitter) {
    final DataSubscription dataSubscription = query.subscribe().observer(new DataObserver<List<T>>() {
        @Override
        public void onData(List<T> data) {
            for (T datum : data) {
                if (emitter.isCancelled()) {
                    return;
                } else {
                    emitter.onNext(datum);
                }
            }
            if (!emitter.isCancelled()) {
                emitter.onComplete();
            }
        }
    });
    emitter.setCancellable(new Cancellable() {
        @Override
        public void cancel() throws Exception {
            dataSubscription.cancel();
        }
    });
}
 
@Test
public void whenAMessageArrivesThenTheObserverIsNotified()
        throws Exception {
    @SuppressWarnings("unchecked")
    final FlowableEmitter<MqttMessage> observer = Mockito
            .mock(FlowableEmitter.class);
    final IMqttMessageListener listener = new SubscriberMqttMessageListener(
            observer);
    final String expectedTopic = "expected";
    final byte[] expectedPayload = new byte[] { 'a', 'b', 'c' };
    final org.eclipse.paho.client.mqttv3.MqttMessage expectedMessage = new org.eclipse.paho.client.mqttv3.MqttMessage(
            expectedPayload);
    expectedMessage.setQos(2);
    expectedMessage.setId(1);
    expectedMessage.setRetained(true);
    final ArgumentCaptor<SubscribeMessage> actualMessage = ArgumentCaptor
            .forClass(SubscribeMessage.class);
    listener.messageArrived(expectedTopic, expectedMessage);
    Mockito.verify(observer).onNext(actualMessage.capture());
    Assert.assertArrayEquals(expectedPayload,
            actualMessage.getValue().getPayload());
    Assert.assertEquals(2, actualMessage.getValue().getQos());
    Assert.assertEquals(1, actualMessage.getValue().getId());
    Assert.assertTrue(actualMessage.getValue().isRetained());
    Assert.assertEquals(expectedTopic, actualMessage.getValue().getTopic());
}
 
源代码10 项目: mvvm-template   文件: SimpleRemoteSourceMapper.java
public SimpleRemoteSourceMapper(FlowableEmitter<Resource<T>> emitter) {
    emitter.onNext(Resource.loading(null));
    // since realm instance was created on Main Thread, so if we need to touch on realm database after calling
    // api (such as save response data to local database, we must make request on main thread
    // by setting shouldUpdateUi params = true
    Disposable disposable = RestHelper.makeRequest(getRemote(), true, response -> {
        Log.d(TAG, "SimpleRemoteSourceMapper: call API success!");
        saveCallResult(response);
        emitter.onNext(Resource.success(response));
    }, errorEntity -> {
        Log.d(TAG, "SimpleRemoteSourceMapper: call API error: " + errorEntity.getMessage());
        emitter.onNext(Resource.error(errorEntity.getMessage(), null));
    });

    // set emitter disposable to ensure that when it is going to be disposed, our api request should be disposed as well
    emitter.setDisposable(disposable);
}
 
源代码11 项目: EasyHttp   文件: RxEasyHttpManager.java
@Override
public void subscribe(FlowableEmitter<T> e) throws Exception {
	try {
		Response response = call.execute();

		if (!e.isCancelled()) {
			if (response.isSuccessful()) {
				e.onNext(rxEasyConverter.convert(response.body().string()));
			} else {
				e.onError(new Throwable("response is unsuccessful"));
			}
		}
	} catch (Throwable t) {
		if (!e.isCancelled()) {
			e.onError(t);
		}
	} finally {
		e.onComplete();
	}
}
 
源代码12 项目: SAF-AOP   文件: AsyncAspect.java
private void asyncMethod(final ProceedingJoinPoint joinPoint) throws Throwable {

        Flowable.create(new FlowableOnSubscribe<Object>() {
                            @Override
                            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                                Looper.prepare();
                                try {
                                    joinPoint.proceed();
                                } catch (Throwable throwable) {
                                    throwable.printStackTrace();
                                }
                                Looper.loop();
                            }
                        }
                , BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();
    }
 
源代码13 项目: RxComboDetector   文件: ViewClickOnSubscribe.java
@Override
public void subscribe(final FlowableEmitter<Integer> emitter) throws Exception {
    checkUiThread();

    View.OnClickListener listener = v -> {
        if (!emitter.isCancelled()) {
            emitter.onNext(1);
        }
    };
    view.setOnClickListener(listener);

    emitter.setDisposable(new MainThreadDisposable() {
        @Override
        protected void onDispose() {
            view.setOnClickListener(null);
        }
    });
}
 
源代码14 项目: RxGps   文件: RxLocationFlowableOnSubscribe.java
@Override
public final void subscribe(FlowableEmitter<T> emitter) throws Exception {
    final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter));

    try {
        apiClient.connect();
    } catch (Throwable ex) {
        emitter.onError(ex);
    }

    emitter.setCancellable(() -> {
        if (apiClient.isConnected()) {
            onUnsubscribed(apiClient);
        }

        apiClient.disconnect();
    });
}
 
源代码15 项目: RxJava2Demo   文件: ChapterNine.java
public static void demo3() {
    Flowable
            .create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "current requested: " + emitter.requested());
                }
            }, BackpressureStrategy.ERROR)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {

                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    mSubscription = s;
                    s.request(1000);
                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}
 
源代码16 项目: RxJava2Demo   文件: ChapterEight.java
public static void demo1() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 1000; i++) {
                Log.d(TAG, "emit " + i);
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {

                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    mSubscription = s;
                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}
 
源代码17 项目: kripton   文件: BindDataSourceBuilder.java
/**
 * <p>
 * Generate RX observable support
 * </p>
 * .
 *
 * @param dataSourceName
 *            the data source name
 * @param daoFactory
 *            the dao factory
 */
public void generateRx(ClassName dataSourceName, String daoFactory) {
	classBuilder.addField(FieldSpec.builder(Scheduler.class, "globalSubscribeOn", Modifier.PROTECTED).build());
	classBuilder.addMethod(MethodSpec.methodBuilder("globalSubscribeOn").returns(dataSourceName)
			.addParameter(Scheduler.class, "scheduler").addModifiers(Modifier.PUBLIC)
			.addStatement("this.globalSubscribeOn=scheduler").addStatement("return this").build());

	classBuilder.addField(FieldSpec.builder(Scheduler.class, "globalObserveOn", Modifier.PROTECTED).build());
	classBuilder.addMethod(MethodSpec.methodBuilder("globalObserveOn").addParameter(Scheduler.class, "scheduler")
			.returns(dataSourceName).addModifiers(Modifier.PUBLIC).addStatement("this.globalObserveOn=scheduler")
			.addStatement("return this").build());

	generateRxInterface(daoFactory, RxInterfaceType.BATCH, ObservableEmitter.class);
	generateRxInterface(daoFactory, RxInterfaceType.TRANSACTION, ObservableEmitter.class);

	generateRxInterface(daoFactory, RxInterfaceType.BATCH, SingleEmitter.class);
	generateRxInterface(daoFactory, RxInterfaceType.TRANSACTION, SingleEmitter.class);

	generateRxInterface(daoFactory, RxInterfaceType.BATCH, FlowableEmitter.class);
	generateRxInterface(daoFactory, RxInterfaceType.TRANSACTION, FlowableEmitter.class);

	generateRxInterface(daoFactory, RxInterfaceType.BATCH, MaybeEmitter.class);
	generateRxInterface(daoFactory, RxInterfaceType.TRANSACTION, MaybeEmitter.class);

	generatExecuteTransactionRx(dataSourceName, daoFactory, RxType.OBSERVABLE);
	generatExecuteTransactionRx(dataSourceName, daoFactory, RxType.SINGLE);
	generatExecuteTransactionRx(dataSourceName, daoFactory, RxType.FLOWABLE);
	generatExecuteTransactionRx(dataSourceName, daoFactory, RxType.MAYBE);

	generatExecuteBatchRx(dataSourceName, daoFactory, RxType.OBSERVABLE);
	generatExecuteBatchRx(dataSourceName, daoFactory, RxType.SINGLE);
	generatExecuteBatchRx(dataSourceName, daoFactory, RxType.FLOWABLE);
	generatExecuteBatchRx(dataSourceName, daoFactory, RxType.MAYBE);

}
 
源代码18 项目: RxJava2Demo   文件: ChapterSeven.java
public static void demo4() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 128; i++) {
                Log.d(TAG, "emit " + i);
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {

                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    mSubscription = s;
                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}
 
@Override
public void subscribe(@NonNull FlowableEmitter<Result> emitter) throws Exception {
    try {
        emitter.onNext(preparedOperation.executeAsBlocking());
        emitter.onComplete();
    } catch (Exception e) {
        emitter.onError(e);
    }
}
 
@Override
public void subscribe(@NonNull FlowableEmitter<Optional<Result>> emitter) throws Exception {
    try {
        final Result value = preparedOperation.executeAsBlocking();
        emitter.onNext(Optional.of(value));
        emitter.onComplete();
    } catch (Exception e) {
        emitter.onError(e);
    }
}
 
源代码21 项目: ShareLoginPayUtil   文件: WeiboShareInstance.java
@SuppressLint("CheckResult")
private void shareTextOrImage(final Pair<String, byte[]> shareImageObject, final String title, final String targetUrl, final String summary,
                              final Activity activity, final ShareListener listener) {

    mShareImage = Flowable.create(new FlowableOnSubscribe<Pair<String, byte[]>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<Pair<String, byte[]>> emitter) {
            try {
                emitter.onNext(shareImageObject);
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Pair<String, byte[]>>() {
                @Override
                public void accept(@NonNull Pair<String, byte[]> stringPair) {
                    handleShare(stringPair, title, targetUrl, summary, listener);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    listener.shareFailure(new Exception(throwable));
                    recycle();
                    activity.finish();
                }
            });
}
 
源代码22 项目: ShareLoginPayUtil   文件: QQShareInstance.java
@SuppressLint("CheckResult")
private void shareFunc(final int platform, final String title, final String targetUrl, final String summary,
                       final ShareImageObject shareImageObject, final boolean immediate, final Activity activity, final ShareListener listener) {
    mShareFunc = Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<String> emitter) {
            try {
                if (immediate) {
                    emitter.onNext(shareImageObject.getPathOrUrl());
                } else {
                    emitter.onNext(ImageDecoder.decode(activity, shareImageObject));
                }
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) {
                    if (platform == SharePlatform.QZONE) {
                        shareToQZoneForMedia(title, targetUrl, summary, s, activity, listener);
                    } else {
                        shareToQQForMedia(title, summary, targetUrl, s, activity, listener);
                    }
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    listener.shareFailure(new Exception(throwable));
                    recycle();
                    activity.finish();
                }
            });
}
 
源代码23 项目: ShareLoginPayUtil   文件: QQShareInstance.java
@SuppressLint("CheckResult")
@Override
public void shareImage(final int platform, final ShareImageObject shareImageObject,
                       final Activity activity, final ShareListener listener) {
    mShareImage = Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<String> emitter) throws Exception {
            try {
                emitter.onNext(ImageDecoder.decode(activity, shareImageObject));
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String localPath) throws Exception {
                    if (platform == SharePlatform.QZONE) {
                        shareToQzoneForImage(localPath, activity, listener);
                    } else {
                        shareToQQForImage(localPath, activity, listener);
                    }
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    listener.shareFailure(new Exception(throwable));
                    recycle();
                    activity.finish();
                }
            });
}
 
源代码24 项目: ShareLoginPayUtil   文件: QQLoginInstance.java
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
    mSubscribe = Flowable.create(new FlowableOnSubscribe<QQUser>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<QQUser> qqUserEmitter) {
            OkHttpClient client = new OkHttpClient();
            Request request = new Request.Builder().url(buildUserInfoUrl(token, URL)).build();

            try {
                Response response = client.newCall(request).execute();
                JSONObject jsonObject = new JSONObject(response.body().string());
                QQUser user = QQUser.parse(token.getOpenid(), jsonObject);
                qqUserEmitter.onNext(user);
                qqUserEmitter.onComplete();
            } catch (IOException | JSONException e) {
                ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
                qqUserEmitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<QQUser>() {
                @Override
                public void accept(@NonNull QQUser qqUser) {
                    mLoginListener.loginSuccess(
                            new LoginResultData(LoginPlatform.QQ, token, qqUser));
                    LoginUtil.recycle();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
                    LoginUtil.recycle();
                }
            });
}
 
源代码25 项目: ShareLoginPayUtil   文件: WeiboLoginInstance.java
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
    mSubscribe = Flowable.create(new FlowableOnSubscribe<WeiboUser>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<WeiboUser> weiboUserEmitter) {
            OkHttpClient client = new OkHttpClient();
            Request request = new Request.Builder().url(buildUserInfoUrl(token, USER_INFO)).build();
            try {
                Response response = client.newCall(request).execute();
                JSONObject jsonObject = new JSONObject(response.body().string());
                WeiboUser user = WeiboUser.parse(jsonObject);
                weiboUserEmitter.onNext(user);
                weiboUserEmitter.onComplete();
            } catch (IOException | JSONException e) {
                ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
                weiboUserEmitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<WeiboUser>() {
                @Override
                public void accept(@NonNull WeiboUser weiboUser) throws Exception {
                    mLoginListener.loginSuccess(
                            new LoginResultData(LoginPlatform.WEIBO, token, weiboUser));
                    LoginUtil.recycle();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) throws Exception {
                    mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
                    LoginUtil.recycle();
                }
            });
}
 
源代码26 项目: ShareLoginPayUtil   文件: TwitterLoginInstance.java
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
    mSubscribe = Flowable.create(new FlowableOnSubscribe<TwitterUser>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<TwitterUser> userEmitter) {

            TwitterApiClient apiClient = TwitterCore.getInstance().getApiClient();
            Call<User> userCall = apiClient.getAccountService().verifyCredentials(true, false, false);

            try {
                Response<User> execute = userCall.execute();
                userEmitter.onNext(new TwitterUser(execute.body()));
                userEmitter.onComplete();
            } catch (Exception e) {
                ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
                userEmitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<TwitterUser>() {
                @Override
                public void accept(@NonNull TwitterUser user) {
                    mLoginListener.loginSuccess(new LoginResultData(LoginPlatform.TWITTER, token, user));
                    LoginUtil.recycle();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
                    LoginUtil.recycle();
                }
            });
}
 
源代码27 项目: ShareLoginPayUtil   文件: WxLoginInstance.java
@SuppressLint("CheckResult")
private void getToken(final String code) {
    mTokenSubscribe = Flowable.create(new FlowableOnSubscribe<WxToken>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<WxToken> wxTokenEmitter) {
            Request request = new Request.Builder().url(buildTokenUrl(code)).build();
            try {
                Response response = mClient.newCall(request).execute();
                JSONObject jsonObject = new JSONObject(response.body().string());
                WxToken token = new WxToken(jsonObject);
                wxTokenEmitter.onNext(token);
                wxTokenEmitter.onComplete();
            } catch (IOException | JSONException e) {
                wxTokenEmitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<WxToken>() {
                @Override
                public void accept(@NonNull WxToken wxToken) {
                    if (mFetchUserInfo) {
                        mLoginListener.beforeFetchUserInfo(wxToken);
                        fetchUserInfo(wxToken);
                    } else {
                        mLoginListener.loginSuccess(new LoginResultData(LoginPlatform.WX, wxToken));
                        LoginUtil.recycle();
                    }

                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    mLoginListener.loginFailure(new Exception(throwable.getMessage()), ShareLogger.INFO.ERR_GET_TOKEN_CODE);
                    LoginUtil.recycle();
                }
            });
}
 
源代码28 项目: ShareLoginPayUtil   文件: WxLoginInstance.java
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
    mSubscribe = Flowable.create(new FlowableOnSubscribe<WxUser>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<WxUser> wxUserEmitter) {
            Request request = new Request.Builder().url(buildUserInfoUrl(token)).build();
            try {
                Response response = mClient.newCall(request).execute();
                JSONObject jsonObject = new JSONObject(response.body().string());
                WxUser user = WxUser.parse(jsonObject);
                wxUserEmitter.onNext(user);
            } catch (IOException | JSONException e) {
                wxUserEmitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<WxUser>() {
                @Override
                public void accept(@NonNull WxUser wxUser) {
                    mLoginListener.loginSuccess(new LoginResultData(LoginPlatform.WX, token, wxUser));
                    LoginUtil.recycle();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
                    LoginUtil.recycle();
                }
            });
}
 
源代码29 项目: ShareLoginPayUtil   文件: InsLoginInstance.java
@Override
public void fetchUserInfo(final BaseToken token) {
    mSubscribe = Flowable.create(new FlowableOnSubscribe<InsUser>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<InsUser> insUserEmitter) {

            Request post = new Request.Builder().url(sTokenUrl).post(buildUserInfoUrl(token)).build();
            try {
                Response response = mClient.newCall(post).execute();
                JSONObject jsonObject = new JSONObject(response.body().string());
                ShareLogger.i("auth:" + jsonObject.toString());
                InsUser user = new InsUser(jsonObject);
                insUserEmitter.onNext(user);
                insUserEmitter.onComplete();
            } catch (IOException | JSONException e) {
                insUserEmitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<InsUser>() {
                @Override
                public void accept(@NonNull InsUser insUser) {
                    mLoginListener.loginSuccess(new LoginResultData(LoginPlatform.INS, token, insUser));
                    LoginUtil.recycle();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
                    LoginUtil.recycle();
                }
            });
}
 
源代码30 项目: Collection-Android   文件: RxJavaUtils.java
@NonNull
private static <T, R> RxTaskOnSubscribe<RxAsyncTask<T, R>> getRxAsyncTaskOnSubscribe(@NonNull final RxAsyncTask<T, R> rxTask) {
    return new RxTaskOnSubscribe<RxAsyncTask<T, R>>(rxTask) {
        @Override
        public void subscribe(FlowableEmitter<RxAsyncTask<T, R>> emitter) throws Exception {
            RxAsyncTask<T, R> task = getTask();
            task.setOutData(task.doInIOThread(task.getInData()));  //在io线程工作
            emitter.onNext(task);
            emitter.onComplete();
        }
    };
}
 
 类所在包
 同包方法