下面列出了怎么用io.reactivex.SingleEmitter的API类实例代码及写法,或者点击链接到github查看源代码。
private <T> Single<T> compositeSingle(Collection<Single<T>> singles) {
return Single.create((SingleEmitter<T> emitter) -> {
AtomicInteger errorCount = new AtomicInteger(0);
CompositeDisposable compositeDisposable = new CompositeDisposable();
emitter.setCancellable(compositeDisposable::dispose);
for (Single<T> single : singles) {
compositeDisposable.add(single.subscribe(emitter::onSuccess, error -> {
synchronized (errorCount) {
if (errorCount.incrementAndGet() == singles.size()) { // Every single has errored
emitter.tryOnError(error);
}
}
}));
}
})
.subscribeOn(BurstKitUtils.defaultBurstNodeServiceScheduler());
}
@Override @NonNull public Single<List<T>> observeAdd(@NonNull final T value) {
assertNotNull(value, "value");
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
runInWriteLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
if (!file.exists() && !file.createNewFile()) {
throw new IOException("Could not create file for store.");
}
List<T> originalList = converter.read(file, type);
if (originalList == null) originalList = Collections.emptyList();
List<T> result = new ArrayList<T>(originalList.size() + 1);
result.addAll(originalList);
result.add(value);
converterWrite(result, converter, type, file);
emitter.onSuccess(result);
updateSubject.onNext(result);
}
});
}
});
}
/**
* The returned Single emits one Query result as a List.
*/
public static <T> Single<List<T>> single(final Query<T> query) {
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override
public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
query.subscribe().single().observer(new DataObserver<List<T>>() {
@Override
public void onData(List<T> data) {
if (!emitter.isDisposed()) {
emitter.onSuccess(data);
}
}
});
// no need to cancel, single never subscribes
}
});
}
@SuppressLint("NewApi")
public Single<Boolean> isAlive() {
if (RXSDebug.isDebug()) Timber.tag(TAG).v("isAlive()");
return Single
.create((SingleEmitter<Boolean> emitter) -> {
if (ApiWrap.hasOreo()) {
emitter.onSuccess(process.isAlive());
} else {
try {
process.exitValue();
emitter.onSuccess(false);
} catch (IllegalThreadStateException e) {
emitter.onSuccess(true);
}
}
})
.subscribeOn(Schedulers.io());
}
/************************************************************/
public Single<Void> deleteCollBookInRx(CollBookBean bean) {
return Single.create(new SingleOnSubscribe<Void>() {
@Override
public void subscribe(SingleEmitter<Void> e) throws Exception {
//查看文本中是否存在删除的数据
deleteBook(bean.get_id());
//删除任务
deleteDownloadTask(bean.get_id());
//删除目录
deleteBookChapter(bean.get_id());
//删除CollBook
mCollBookDao.delete(bean);
e.onSuccess(new Void());
}
});
}
public Single<T> toSingle() {
return Single.create(new SingleOnSubscribe<T>() {
@Override
public void subscribe(final @NonNull SingleEmitter<T> e) throws Exception {
enqueue(new Callback<T>() {
@Override
public void onResponse(@NonNull T data) {
e.onSuccess(data);
}
}
, new ErrorCallback() {
@Override
public void onError(@NonNull Throwable error) {
e.onError(error);
}
}
);
}
});
}
static void onResolutionResult(String observableId, int resultCode) {
if (observableMap.containsKey(observableId)) {
SettingsCheckHandleSingleOnSubscribe observable = observableMap.get(observableId).get();
if (observable != null && observable.emitterWeakRef != null) {
SingleEmitter<Boolean> observer = observable.emitterWeakRef.get();
if (observer != null) {
observer.onSuccess(resultCode == Activity.RESULT_OK);
}
}
observableMap.remove(observableId);
}
observableMapCleanup();
}
@Override @NonNull public Single<List<T>> observeRemove(final int position) {
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
runInWriteLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
List<T> originalList = converter.read(file, type);
if (originalList == null) originalList = Collections.emptyList();
List<T> modifiedList = new ArrayList<T>(originalList);
modifiedList.remove(position);
converterWrite(modifiedList, converter, type, file);
emitter.onSuccess(modifiedList);
updateSubject.onNext(modifiedList);
}
});
}
});
}
@Override
public final void subscribe(SingleEmitter<T> emitter) throws Exception {
final GoogleApiClient apiClient = createApiClient(new ApiClientConnectionCallbacks(emitter));
try {
apiClient.connect();
} catch (Throwable ex) {
emitter.onError(ex);
}
emitter.setCancellable(() -> {
if (apiClient.isConnected()) {
onUnsubscribed(apiClient);
}
apiClient.disconnect();
});
}
@Override @NonNull public Single<List<T>> observeClear() {
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
runInWriteLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
if (file.exists() && !file.delete()) {
throw new IOException("Clear operation on store failed.");
}
emitter.onSuccess(Collections.<T>emptyList());
updateSubject.onNext(Collections.<T>emptyList());
}
});
}
});
}
public static <T> Single<T> single(final OnSubscribeAction<T> subscribe) {
return Single.<T>create(new SingleOnSubscribe<T>() {
@Override
public void subscribe(final SingleEmitter<T> emitter) throws Exception {
RxActionDelegate<T> delegate = new RxActionDelegate<>(new ActionDelegate<T>() {
@Override
public void onSuccess(T result) {
emitter.onSuccess(result);
}
@Override
public void onError(Exception e) {
emitter.onError(e);
}
});
emitter.setDisposable(delegate);
subscribe.subscribe(delegate);
}
});
}
public Single<Result<String>> loadData() {
return Single.create(new SingleOnSubscribe<Result<String>>() {
@Override
public void subscribe(SingleEmitter<Result<String>> e) throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
if (new Random().nextBoolean()) {
e.onSuccess(Result.success("Result from data service"));
} else {
e.onSuccess(Result.<String>error(new Throwable("Fake error")));
}
}
}).subscribeOn(Schedulers.computation()).observeOn(AndroidSchedulers.mainThread());
}
@Override @NonNull public Single<List<T>> get() {
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override public void subscribe(final SingleEmitter<List<T>> emitter) throws Exception {
runInReadLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
if (!file.exists()) {
emitter.onSuccess(Collections.<T>emptyList());
return;
}
List<T> list = converter.read(file, type);
if (list == null) list = Collections.emptyList();
emitter.onSuccess(list);
}
});
}
});
}
/**
* Converts an {@link ApolloStoreOperation} to a Single.
*
* @param operation the ApolloStoreOperation to convert
* @param <T> the value type
* @return the converted Single
*/
@NotNull
@CheckReturnValue
public static <T> Single<T> from(@NotNull final ApolloStoreOperation<T> operation) {
checkNotNull(operation, "operation == null");
return Single.create(new SingleOnSubscribe<T>() {
@Override
public void subscribe(final SingleEmitter<T> emitter) {
operation.enqueue(new ApolloStoreOperation.Callback<T>() {
@Override
public void onSuccess(T result) {
emitter.onSuccess(result);
}
@Override
public void onFailure(Throwable t) {
emitter.onError(t);
}
});
}
});
}
/**
* @param query
* @return
*/
@NonNull
@CheckReturnValue
public static Single<DataSnapshot> single(@NonNull final Query query) {
return Single.create(new SingleOnSubscribe<DataSnapshot>() {
@Override
public void subscribe(
@NonNull final SingleEmitter<DataSnapshot> emit) throws Exception {
final ValueEventListener listener = listener(emit);
emit.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(listener);
}
});
query.addListenerForSingleValueEvent(listener);
}
});
}
/**
* @param emit
* @return
*/
@NonNull
@CheckReturnValue
public static ValueEventListener listener(@NonNull final SingleEmitter<DataSnapshot> emit) {
return new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
if (!emit.isDisposed()) {
emit.onSuccess(dataSnapshot);
}
}
@Override
public void onCancelled(DatabaseError e) {
if (!emit.isDisposed()) {
emit.onError(e.toException());
}
}
};
}
/**
* @param query
* @return
*/
@NonNull
@CheckReturnValue
public static Single<DataSnapshot> single(@NonNull final Query query) {
return Single.create(new SingleOnSubscribe<DataSnapshot>() {
@Override
public void subscribe(
@NonNull final SingleEmitter<DataSnapshot> emit) throws Exception {
final ValueEventListener listener = listener(emit);
emit.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(listener);
}
});
query.addListenerForSingleValueEvent(listener);
}
});
}
/**
* @param emit
* @return
*/
@NonNull
@CheckReturnValue
public static ValueEventListener listener(@NonNull final SingleEmitter<DataSnapshot> emit) {
return new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
if (!emit.isDisposed()) {
emit.onSuccess(dataSnapshot);
}
}
@Override
public void onCancelled(DatabaseError e) {
if (!emit.isDisposed()) {
emit.onError(e.toException());
}
}
};
}
public static <T> Single<T> singleExec(final RealmConfiguration configuration,
final Consumer<Pair<SingleEmitter, Realm>> emitter) {
return Single.create(new SingleOnSubscribe<T>() {
@Override
public void subscribe(SingleEmitter<T> e) throws Exception {
try (Realm realm = Realm.getInstance(configuration)) {
emitter.accept(new Pair<SingleEmitter, Realm>(e, realm));
}
}
});
}
private static <T> void onNullableSuccess(@Nullable T v, SingleEmitter<T> emitter) {
if (v == null) {
emitter.onError(new NullPointerException());
} else {
emitter.onSuccess(v);
}
}
@NonNull private String createDirectory(SingleEmitter<String> e) {
File appDir =
new File(Environment.getExternalStorageDirectory() + "/AppPermissionsExtractedApk");
if (!appDir.exists()) {
if (!appDir.mkdir()) {
e.onError(new Exception("Error creating directory"));
}
}
return appDir.getPath();
}
public static Single<List<File>> getSDTxtFile(){
//外部存储卡路径
String rootPath = Environment.getExternalStorageDirectory().getPath();
return Single.create(new SingleOnSubscribe<List<File>>() {
@Override
public void subscribe(SingleEmitter<List<File>> e) throws Exception {
List<File> files = getTxtFiles(rootPath,0);
e.onSuccess(files);
}
});
}
private <T> Single<List<T>> queryToRx(QueryBuilder<T> builder){
return Single.create(new SingleOnSubscribe<List<T>>() {
@Override
public void subscribe(SingleEmitter<List<T>> e) throws Exception {
List<T> data = builder.list();
if (data == null){
data = new ArrayList<T>(1);
}
e.onSuccess(data);
}
});
}
public Single<List<BookChapterBean>> getBookChaptersInRx(String bookId){
return Single.create(new SingleOnSubscribe<List<BookChapterBean>>() {
@Override
public void subscribe(SingleEmitter<List<BookChapterBean>> e) throws Exception {
List<BookChapterBean> beans = mSession
.getBookChapterBeanDao()
.queryBuilder()
.where(BookChapterBeanDao.Properties.BookId.eq(bookId))
.list();
e.onSuccess(beans);
}
});
}
@Override
public void subscribe(SingleEmitter<GoogleSignInAccount> emitter) throws Exception {
if (!result.isSuccess()) {
emitter.onError(new GoogleSignInException(result));
return;
}
emitter.onSuccess(result.getSignInAccount());
}
@Override
protected void onGoogleApiClientReady(GoogleApiClient apiClient, SingleEmitter<Status> emitter) {
//noinspection MissingPermission
setupLocationPendingResult(
LocationServices.FusedLocationApi.requestLocationUpdates(apiClient, locationRequest, pendingIntent),
SingleResultCallBack.get(emitter)
);
}
@Override
protected void onGoogleApiClientReady(GoogleApiClient apiClient, SingleEmitter<LocationSettingsResult> emitter) {
setupLocationPendingResult(
LocationServices.SettingsApi.checkLocationSettings(apiClient, locationSettingsRequest),
SingleResultCallBack.get(emitter)
);
}
@Override
protected void onGoogleApiClientReady(GoogleApiClient apiClient, SingleEmitter<Status> emitter) {
//noinspection MissingPermission
setupLocationPendingResult(
ActivityRecognition.ActivityRecognitionApi.requestActivityUpdates(apiClient, detectionIntervalMillis, pendingIntent),
SingleResultCallBack.get(emitter)
);
}
@Override
protected void onGoogleApiClientReady(GoogleApiClient apiClient, SingleEmitter<Status> emitter) {
//noinspection MissingPermission
setupLocationPendingResult(
ActivityRecognition.ActivityRecognitionApi.removeActivityUpdates(apiClient, pendingIntent),
SingleResultCallBack.get(emitter)
);
}
@Override
protected void onGoogleApiClientReady(GoogleApiClient apiClient, SingleEmitter<Status> emitter) {
setupLocationPendingResult(
LocationServices.FusedLocationApi.removeLocationUpdates(apiClient, pendingIntent),
SingleResultCallBack.get(emitter)
);
}