类rx.functions.Actions源码实例Demo

下面列出了怎么用rx.functions.Actions的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: RXBus   文件: RXBusBuilder.java
public <R> Subscription subscribe(Action1<R> onNext, Action1<Throwable> onError,  Action0 onCompleted, Observable.Transformer<T, R> transformer)
{
    Observable observable = build(false);
    if (transformer != null)
        observable = observable.compose(transformer);

    if (onNext == null)
        onNext = Actions.empty();
    if (onError == null)
        onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
    if (onCompleted == null)
        onCompleted = Actions.empty();

    Action1<R> actualOnNext = onNext;
    if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled)
        actualOnNext = RXBusUtil.wrapQueueAction(onNext, mQueuer);

    observable = applySchedular(observable);
    Subscription subscription = observable.subscribe(actualOnNext, onError, onCompleted);
    if (mBoundObject != null)
        RXSubscriptionManager.addSubscription(mBoundObject, subscription);
    return subscription;
}
 
源代码2 项目: goro   文件: TestActivity.java
@Override
protected void onCreate(Bundle savedInstanceState) {
  super.onCreate(savedInstanceState);
  GoroService.setup(this, Goro.create());

  Scheduler scheduler = new RxGoro(goro).scheduler("test-queue");
  Observable.just("ok")
      .subscribeOn(scheduler)
      .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
          result = "ok";
          resultSync.countDown();
        }
      });

  Observable.error(new RuntimeException("test error"))
      .subscribeOn(scheduler)
      .subscribe(Actions.empty(), new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
          error = throwable;
          errorSync.countDown();
        }
      });
}
 
源代码3 项目: ocelli   文件: HostCollector.java
protected void bindToInstanceLifecycle(Instance<ConnectionProvider<W, R>> i,
                                       final HostConnectionProvider<W, R> hcp,
                                       final Subscriber<? super List<HostConnectionProvider<W, R>>> o) {
    i.getLifecycle()
     .finallyDo(new Action0() {
         @Override
         public void call() {
             removeHost(hcp, o);
         }
     })
     .subscribe(Actions.empty(), new Action1<Throwable>() {
         @Override
         public void call(Throwable throwable) {
             // Do nothing as finallyDo takes care of both complete and error.
         }
     });
}
 
源代码4 项目: letv   文件: Observable.java
public final Subscription subscribe(Action1<? super T> onNext, Action1<Throwable> onError) {
    if (onNext == null) {
        throw new IllegalArgumentException("onNext can not be null");
    } else if (onError != null) {
        return subscribe(new ActionSubscriber(onNext, onError, Actions.empty()));
    } else {
        throw new IllegalArgumentException("onError can not be null");
    }
}
 
@Test
public void testNoTimeoutPostSubscription() throws Exception {
    TestScheduler testScheduler = Schedulers.test();
    UnicastAutoReleaseSubject<String> subject = UnicastAutoReleaseSubject.create(1, TimeUnit.DAYS, testScheduler);
    subject.onNext("Start the timeout now."); // Since the timeout is scheduled only after content arrival.
    final AtomicReference<Throwable> errorOnSubscribe = new AtomicReference<Throwable>();
    final CountDownLatch latch = new CountDownLatch(1);
    subject.subscribe(Actions.empty(), new Action1<Throwable>() {
        @Override
        public void call(Throwable throwable) {
            errorOnSubscribe.set(throwable);
            latch.countDown();
        }
    }, new Action0() {
        @Override
        public void call() {
            latch.countDown();
        }
    });

    testScheduler.advanceTimeBy(1, TimeUnit.DAYS);
    subject.onCompleted();

    latch.await(1, TimeUnit.MINUTES);

    Assert.assertNull("Subscription got an error.", errorOnSubscribe.get());
}
 
private static void createAndShutdownDefaultEnvironment() {
    CoreEnvironment env = DefaultCoreEnvironment.create();
    env.scheduler().createWorker().schedule(Actions.empty());
    env.scheduler().createWorker().schedule(Actions.empty());
    env.scheduler().createWorker().schedule(Actions.empty());
    env.scheduler().createWorker().schedule(Actions.empty());
    env.scheduler().createWorker().schedule(Actions.empty());
    env.scheduler().createWorker().schedule(Actions.empty());
    boolean shutdownResult = env.shutdown();
    assertTrue(shutdownResult);
}
 
源代码7 项目: letv   文件: Completable.java
public final Completable doOnCompleted(Action0 onCompleted) {
    return doOnLifecycle(Actions.empty(), Actions.empty(), onCompleted, Actions.empty(), Actions.empty());
}
 
源代码8 项目: letv   文件: Completable.java
public final Completable doOnUnsubscribe(Action0 onUnsubscribe) {
    return doOnLifecycle(Actions.empty(), Actions.empty(), Actions.empty(), Actions.empty(), onUnsubscribe);
}
 
源代码9 项目: letv   文件: Completable.java
public final Completable doOnError(Action1<? super Throwable> onError) {
    return doOnLifecycle(Actions.empty(), onError, Actions.empty(), Actions.empty(), Actions.empty());
}
 
源代码10 项目: letv   文件: Completable.java
public final Completable doOnSubscribe(Action1<? super Subscription> onSubscribe) {
    return doOnLifecycle(onSubscribe, Actions.empty(), Actions.empty(), Actions.empty(), Actions.empty());
}
 
源代码11 项目: letv   文件: Completable.java
public final Completable doAfterTerminate(Action0 onAfterComplete) {
    return doOnLifecycle(Actions.empty(), Actions.empty(), Actions.empty(), onAfterComplete, Actions.empty());
}
 
源代码12 项目: letv   文件: Observable.java
public final Observable<T> doOnCompleted(Action0 onCompleted) {
    return lift(new OperatorDoOnEach(new ActionSubscriber(Actions.empty(), Actions.empty(), onCompleted)));
}
 
源代码13 项目: letv   文件: Observable.java
public final Observable<T> doOnError(Action1<Throwable> onError) {
    return lift(new OperatorDoOnEach(new ActionSubscriber(Actions.empty(), onError, Actions.empty())));
}
 
源代码14 项目: letv   文件: Observable.java
public final Observable<T> doOnNext(Action1<? super T> onNext) {
    return lift(new OperatorDoOnEach(new ActionSubscriber(onNext, Actions.empty(), Actions.empty())));
}
 
源代码15 项目: letv   文件: Observable.java
public final Observable<T> doOnTerminate(Action0 onTerminate) {
    return lift(new OperatorDoOnEach(new ActionSubscriber(Actions.empty(), Actions.toAction1(onTerminate), onTerminate)));
}
 
源代码16 项目: letv   文件: Observable.java
public final Subscription subscribe() {
    return subscribe(new ActionSubscriber(Actions.empty(), InternalObservableUtils.ERROR_NOT_IMPLEMENTED, Actions.empty()));
}
 
源代码17 项目: letv   文件: Observable.java
public final Subscription subscribe(Action1<? super T> onNext) {
    if (onNext != null) {
        return subscribe(new ActionSubscriber(onNext, InternalObservableUtils.ERROR_NOT_IMPLEMENTED, Actions.empty()));
    }
    throw new IllegalArgumentException("onNext can not be null");
}