下面列出了怎么用io.reactivex.CompletableOnSubscribe的API类实例代码及写法,或者点击链接到github查看源代码。
public static Completable completable(final OnSubscribeAction<Void> subscribe){
return Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(final CompletableEmitter emitter) throws Exception {
RxActionDelegate<Void> delegate = new RxActionDelegate<>(new ActionDelegate<Void>() {
@Override
public void onSuccess(Void result) {
emitter.onComplete();
}
@Override
public void onError(Exception e) {
emitter.onError(e);
}
});
emitter.setDisposable(delegate);
subscribe.subscribe(delegate);
}
});
}
/**
* Given an operation that takes a {@link MaybeConsumer<Success>}, create a JavaRX {@link
* Completable} that succeeds iff the operation does.
*
* <p>Example:
*
* <pre>
* // update the experiment, and then log that it was successful
* DataController dc = getDataController();
* MaybeConsumers.buildCompleteable(mc -> dc.updateExperiment(e.getExperimentId(), mc))
* .subscribe(() -> log("successfully updated!"));
* </pre>
*/
public static Completable buildCompleteable(
io.reactivex.functions.Consumer<MaybeConsumer<Success>> c) {
return Completable.create(
new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
c.accept(
new MaybeConsumer<Success>() {
@Override
public void success(Success value) {
emitter.onComplete();
}
@Override
public void fail(Exception e) {
emitter.onError(e);
}
});
}
});
}
/**
* Converts an {@link ApolloPrefetch} to a synchronous Completable
*
* @param prefetch the ApolloPrefetch to convert
* @return the converted Completable
* @throws NullPointerException if prefetch == null
*/
@NotNull
@CheckReturnValue
public static Completable from(@NotNull final ApolloPrefetch prefetch) {
checkNotNull(prefetch, "prefetch == null");
return Completable.create(new CompletableOnSubscribe() {
@Override public void subscribe(final CompletableEmitter emitter) {
cancelOnCompletableDisposed(emitter, prefetch);
prefetch.enqueue(new ApolloPrefetch.Callback() {
@Override public void onSuccess() {
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}
@Override public void onFailure(@NotNull ApolloException e) {
Exceptions.throwIfFatal(e);
if (!emitter.isDisposed()) {
emitter.onError(e);
}
}
});
}
});
}
@Override @NonNull public Completable observeClear() {
return Completable.create(new CompletableOnSubscribe() {
@Override public void subscribe(final CompletableEmitter emitter) throws Exception {
runInWriteLock(readWriteLock, new ThrowingRunnable() {
@Override public void run() throws Exception {
if (file.exists() && !file.delete()) {
throw new IOException("Clear operation on store failed.");
} else {
emitter.onComplete();
}
updateSubject.onNext(ValueUpdate.<T>empty());
}
});
}
});
}
@Test
public void testCompletableObservable() {
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) {
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
System.out.println("onComplete is called");
}
@Override
public void onError(Throwable e) {
System.out.println("onError is called" + e.getMessage());
}
});
}
/**
* Load image from Uri with RxJava2
*
* @param sourceUri Image Uri
*
* @see #load(Uri)
*
* @return Completable of loading image
*/
public Completable loadAsCompletable(final Uri sourceUri, final boolean useThumbnail,
final RectF initialFrameRect) {
return Completable.create(new CompletableOnSubscribe() {
@Override public void subscribe(@NonNull final CompletableEmitter emitter) throws Exception {
mInitialFrameRect = initialFrameRect;
mSourceUri = sourceUri;
if (useThumbnail) {
applyThumbnail(sourceUri);
}
final Bitmap sampled = getImage(sourceUri);
mHandler.post(new Runnable() {
@Override public void run() {
mAngle = mExifRotation;
setImageDrawableInternal(new BitmapDrawable(getResources(), sampled));
emitter.onComplete();
}
});
}
}).doOnSubscribe(new Consumer<Disposable>() {
@Override public void accept(@NonNull Disposable disposable) throws Exception {
mIsLoading.set(true);
}
}).doFinally(new Action() {
@Override public void run() throws Exception {
mIsLoading.set(false);
}
});
}