下面列出了怎么用rx.functions.Actions的API类实例代码及写法,或者点击链接到github查看源代码。
public <R> Subscription subscribe(Action1<R> onNext, Action1<Throwable> onError, Action0 onCompleted, Observable.Transformer<T, R> transformer)
{
Observable observable = build(false);
if (transformer != null)
observable = observable.compose(transformer);
if (onNext == null)
onNext = Actions.empty();
if (onError == null)
onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
if (onCompleted == null)
onCompleted = Actions.empty();
Action1<R> actualOnNext = onNext;
if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled)
actualOnNext = RXBusUtil.wrapQueueAction(onNext, mQueuer);
observable = applySchedular(observable);
Subscription subscription = observable.subscribe(actualOnNext, onError, onCompleted);
if (mBoundObject != null)
RXSubscriptionManager.addSubscription(mBoundObject, subscription);
return subscription;
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
GoroService.setup(this, Goro.create());
Scheduler scheduler = new RxGoro(goro).scheduler("test-queue");
Observable.just("ok")
.subscribeOn(scheduler)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
result = "ok";
resultSync.countDown();
}
});
Observable.error(new RuntimeException("test error"))
.subscribeOn(scheduler)
.subscribe(Actions.empty(), new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
error = throwable;
errorSync.countDown();
}
});
}
protected void bindToInstanceLifecycle(Instance<ConnectionProvider<W, R>> i,
final HostConnectionProvider<W, R> hcp,
final Subscriber<? super List<HostConnectionProvider<W, R>>> o) {
i.getLifecycle()
.finallyDo(new Action0() {
@Override
public void call() {
removeHost(hcp, o);
}
})
.subscribe(Actions.empty(), new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
// Do nothing as finallyDo takes care of both complete and error.
}
});
}
public final Subscription subscribe(Action1<? super T> onNext, Action1<Throwable> onError) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
} else if (onError != null) {
return subscribe(new ActionSubscriber(onNext, onError, Actions.empty()));
} else {
throw new IllegalArgumentException("onError can not be null");
}
}
@Test
public void testNoTimeoutPostSubscription() throws Exception {
TestScheduler testScheduler = Schedulers.test();
UnicastAutoReleaseSubject<String> subject = UnicastAutoReleaseSubject.create(1, TimeUnit.DAYS, testScheduler);
subject.onNext("Start the timeout now."); // Since the timeout is scheduled only after content arrival.
final AtomicReference<Throwable> errorOnSubscribe = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);
subject.subscribe(Actions.empty(), new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
errorOnSubscribe.set(throwable);
latch.countDown();
}
}, new Action0() {
@Override
public void call() {
latch.countDown();
}
});
testScheduler.advanceTimeBy(1, TimeUnit.DAYS);
subject.onCompleted();
latch.await(1, TimeUnit.MINUTES);
Assert.assertNull("Subscription got an error.", errorOnSubscribe.get());
}
private static void createAndShutdownDefaultEnvironment() {
CoreEnvironment env = DefaultCoreEnvironment.create();
env.scheduler().createWorker().schedule(Actions.empty());
env.scheduler().createWorker().schedule(Actions.empty());
env.scheduler().createWorker().schedule(Actions.empty());
env.scheduler().createWorker().schedule(Actions.empty());
env.scheduler().createWorker().schedule(Actions.empty());
env.scheduler().createWorker().schedule(Actions.empty());
boolean shutdownResult = env.shutdown();
assertTrue(shutdownResult);
}
public final Completable doOnCompleted(Action0 onCompleted) {
return doOnLifecycle(Actions.empty(), Actions.empty(), onCompleted, Actions.empty(), Actions.empty());
}
public final Completable doOnUnsubscribe(Action0 onUnsubscribe) {
return doOnLifecycle(Actions.empty(), Actions.empty(), Actions.empty(), Actions.empty(), onUnsubscribe);
}
public final Completable doOnError(Action1<? super Throwable> onError) {
return doOnLifecycle(Actions.empty(), onError, Actions.empty(), Actions.empty(), Actions.empty());
}
public final Completable doOnSubscribe(Action1<? super Subscription> onSubscribe) {
return doOnLifecycle(onSubscribe, Actions.empty(), Actions.empty(), Actions.empty(), Actions.empty());
}
public final Completable doAfterTerminate(Action0 onAfterComplete) {
return doOnLifecycle(Actions.empty(), Actions.empty(), Actions.empty(), onAfterComplete, Actions.empty());
}
public final Observable<T> doOnCompleted(Action0 onCompleted) {
return lift(new OperatorDoOnEach(new ActionSubscriber(Actions.empty(), Actions.empty(), onCompleted)));
}
public final Observable<T> doOnError(Action1<Throwable> onError) {
return lift(new OperatorDoOnEach(new ActionSubscriber(Actions.empty(), onError, Actions.empty())));
}
public final Observable<T> doOnNext(Action1<? super T> onNext) {
return lift(new OperatorDoOnEach(new ActionSubscriber(onNext, Actions.empty(), Actions.empty())));
}
public final Observable<T> doOnTerminate(Action0 onTerminate) {
return lift(new OperatorDoOnEach(new ActionSubscriber(Actions.empty(), Actions.toAction1(onTerminate), onTerminate)));
}
public final Subscription subscribe() {
return subscribe(new ActionSubscriber(Actions.empty(), InternalObservableUtils.ERROR_NOT_IMPLEMENTED, Actions.empty()));
}
public final Subscription subscribe(Action1<? super T> onNext) {
if (onNext != null) {
return subscribe(new ActionSubscriber(onNext, InternalObservableUtils.ERROR_NOT_IMPLEMENTED, Actions.empty()));
}
throw new IllegalArgumentException("onNext can not be null");
}