类io.reactivex.functions.Action源码实例Demo

下面列出了怎么用io.reactivex.functions.Action的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: lifecycle-component   文件: RxUtil.java
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
    return new ObservableTransformer<T, T>() {
        @Override
        public Observable<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            view.showLoading();//显示进度条
                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doFinally(new Action() {
                        @Override
                        public void run() {
                            view.hideLoading();//隐藏进度条
                        }
                    }).compose(RxLifecycleUtils.bindToLifecycle(view));
        }
    };
}
 
源代码2 项目: RxAndroidBle   文件: ServiceDiscoveryManager.java
void reset() {
    hasCachedResults = false;
    this.deviceServicesObservable = getListOfServicesFromGatt()
            .map(wrapIntoRxBleDeviceServices())
            .switchIfEmpty(getTimeoutConfiguration().flatMap(scheduleActualDiscoveryWithTimeout()))
            .doOnSuccess(Functions.actionConsumer(new Action() {
                @Override
                public void run() {
                    hasCachedResults = true;
                }
            }))
            .doOnError(Functions.actionConsumer(new Action() {
                @Override
                public void run() {
                    reset();
                }
            }))
            .cache();
}
 
源代码3 项目: RxCupboard   文件: RxDatabase.java
private <T> FlowableTransformer<T, T> autoClose(final QueryResultIterable<T> iterable) {
	return new FlowableTransformer<T, T>() {
		@Override
		public Publisher<T> apply(Flowable<T> upstream) {
			return upstream.doOnTerminate(new Action() {
				@Override
				public void run() throws Exception {
					// Stream terminates (completed or on error): close the cursor
					iterable.close();
				}
			}).doOnCancel(new Action() {
				@Override
				public void run() throws Exception {
					// Cancelled subscription (manual unsubscribe or via some operator such as take()): close the cursor
					iterable.close();
				}
			});
		}
	};
}
 
源代码4 项目: News   文件: StoriesDataRepository.java
private Observable<List<Story>> getAndSaveStoryListFromRemote(String date) {
    return this.mStoriesRemoteDataSource.getStories(date)
            .doOnNext(new Consumer<List<Story>>() {
                @Override
                public void accept(@io.reactivex.annotations.NonNull List<Story> storyList) throws Exception {
                    mStoriesLocalDataSource.saveStories(storyList);
                    for (Story story : storyList) {
                        mCachedStories.put(story.getId(), story);
                    }
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    mCacheIsDirty = true;
                }
            });
}
 
源代码5 项目: RxAndroidBle   文件: ConnectionSharingAdapter.java
@Override
public ObservableSource<RxBleConnection> apply(Observable<RxBleConnection> upstream) {
    synchronized (connectionObservable) {
        final Observable<RxBleConnection> rxBleConnectionObservable = connectionObservable.get();

        if (rxBleConnectionObservable != null) {
            return rxBleConnectionObservable;
        }

        final Observable<RxBleConnection> newConnectionObservable = upstream
                .doFinally(new Action() {
                    @Override
                    public void run() {
                        connectionObservable.set(null);
                    }
                })
                .replay(1)
                .refCount();
        connectionObservable.set(newConnectionObservable);
        return newConnectionObservable;
    }
}
 
public static ObservableTransformer<TaskDetailEffect, TaskDetailEvent> createEffectHandlers(
    TaskDetailViewActions view, Context context, Action dismiss, Consumer<Task> launchEditor) {

  TasksRemoteDataSource remoteSource = TasksRemoteDataSource.getInstance();
  TasksLocalDataSource localSource =
      TasksLocalDataSource.getInstance(context, SchedulerProvider.getInstance());
  return RxMobius.<TaskDetailEffect, TaskDetailEvent>subtypeEffectHandler()
      .addFunction(DeleteTask.class, deleteTaskHandler(remoteSource, localSource))
      .addFunction(SaveTask.class, saveTaskHandler(remoteSource, localSource))
      .addAction(NotifyTaskMarkedComplete.class, view::showTaskMarkedComplete, mainThread())
      .addAction(NotifyTaskMarkedActive.class, view::showTaskMarkedActive, mainThread())
      .addAction(NotifyTaskDeletionFailed.class, view::showTaskDeletionFailed, mainThread())
      .addAction(NotifyTaskSaveFailed.class, view::showTaskSavingFailed, mainThread())
      .addConsumer(OpenTaskEditor.class, openTaskEditorHandler(launchEditor), mainThread())
      .addAction(Exit.class, dismiss, mainThread())
      .build();
}
 
源代码7 项目: AcgClub   文件: RxRealmUtils.java
public static Completable completableExec(final RealmConfiguration configuration,
    final Consumer<Realm> transaction) {
  return Completable.fromAction(new Action() {
    @Override
    public void run() throws Exception {
      try (Realm realm = Realm.getInstance(configuration)) {
        realm.executeTransaction(new Transaction() {
          @Override
          public void execute(Realm r) {
            try {
              transaction.accept(r);
            } catch (Exception e) {
              e.printStackTrace();
            }
          }
        });
      }
    }
  });
}
 
源代码8 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
 * stream as a {@link Completable} every time it receives an effect from the upstream effects
 * observable. This Completable will be subscribed on the specified {@link Scheduler}. This will
 * result in calling the provided Action on the specified scheduler every time an effect is
 * dispatched to the created effect transformer.
 *
 * @param doEffect the {@link Action} to be run every time the effect is requested
 * @param scheduler the {@link Scheduler} that the action should be run on
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromAction(
    final Action doEffect, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream
          .flatMapCompletable(
              new Function<F, CompletableSource>() {
                @Override
                public CompletableSource apply(F f) throws Exception {
                  return scheduler == null
                      ? Completable.fromAction(doEffect)
                      : Completable.fromAction(doEffect).subscribeOn(scheduler);
                }
              })
          .toObservable();
    }
  };
}
 
源代码9 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
 * the stream as a {@link Completable} every time it receives an effect from the upstream effects
 * observable. This will result in calling the consumer on the specified scheduler, and passing it
 * the requested effect object.
 *
 * @param doEffect the {@link Consumer} to be run every time the effect is requested
 * @param scheduler the {@link Scheduler} to be used when invoking the consumer
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromConsumer(
    final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream
          .flatMapCompletable(
              new Function<F, CompletableSource>() {
                @Override
                public CompletableSource apply(final F effect) throws Exception {
                  Completable completable =
                      Completable.fromAction(
                          new Action() {
                            @Override
                            public void run() throws Exception {
                              doEffect.accept(effect);
                            }
                          });
                  return scheduler == null ? completable : completable.subscribeOn(scheduler);
                }
              })
          .toObservable();
    }
  };
}
 
源代码10 项目: SimpleCropView   文件: CropImageView.java
/**
 * Save image with RxJava2
 *
 * @param bitmap Bitmap for saving
 * @param saveUri Uri for saving the cropped image
 *
 * @return Single of saving image
 */
public Single<Uri> saveAsSingle(final Bitmap bitmap, final Uri saveUri) {
  return Single.fromCallable(new Callable<Uri>() {

    @Override public Uri call() throws Exception {
      return saveImage(bitmap, saveUri);
    }
  }).doOnSubscribe(new Consumer<Disposable>() {
    @Override public void accept(@NonNull Disposable disposable) throws Exception {
      mIsSaving.set(true);
    }
  }).doFinally(new Action() {
    @Override public void run() throws Exception {
      mIsSaving.set(false);
    }
  });
}
 
源代码11 项目: MvpRoute   文件: RxScheduler.java
/**
 * 倒计时
 *
 * @param view     倒计时所用到的view
 * @param second   倒计时时长  单位 秒
 * @param listener 倒计时回调
 * @param <T>
 * @return Disposable  返回 Disposable  在Activity的onDestroy方法中
 * disposable.dispose() 取消掉  防止内存泄漏
 * @see CountDownListener  回调接口
 */
public static <T extends View> Disposable countDown(final T view, @IntRange(from = 1) final int second, final CountDownListener<T> listener) {
	if (listener == null || second <= 0) return null;
	return Flowable.intervalRange(0, second + 1, 0, 1, TimeUnit.SECONDS).observeOn(AndroidSchedulers.mainThread())
			.doOnNext(new Consumer<Long>() {
				@Override
				public void accept(Long aLong) throws Exception {
					listener.onCountDownProgress(view, (int) (second - aLong));
				}
			}).doOnComplete(new Action() {
				@Override
				public void run() throws Exception {
					listener.onCountDownComplete(view);
				}
			}).doOnSubscribe(new Consumer<Subscription>() {
				@Override
				public void accept(Subscription subscription) throws Exception {
					listener.onBindCountDown(view);
				}
			}).subscribe();

}
 
源代码12 项目: TikTok   文件: AppManager.java
/**
 * 让在前台的 {@link Activity}, 使用 {@link Snackbar} 显示文本内容
 *
 * @param message
 * @param isLong
 */
public void showSnackbar(String message, boolean isLong) {
    if (getCurrentActivity() == null && getTopActivity() == null) {
        Log.d(TAG, "mCurrentActivity == null when showSnackbar(String,boolean)");
        return;
    }
    Completable.fromAction(new Action() {
        @Override
        public void run() throws Exception {
            if (DEPENDENCY_SUPPORT_DESIGN) {
                Activity activity = getCurrentActivity() == null ? getTopActivity() : getCurrentActivity();
                View view = activity.getWindow().getDecorView().findViewById(android.R.id.content);
                Snackbar.make(view, message, isLong ? Snackbar.LENGTH_LONG : Snackbar.LENGTH_SHORT).show();
            } else {
                Toast.makeText(mApplication, message, Toast.LENGTH_SHORT).show();
            }
        }
    }).subscribeOn(AndroidSchedulers.mainThread()).subscribe();

}
 
源代码13 项目: adamant-android   文件: RxTaskManager.java
public void doIt(
        Completable flowable,
        Action complete,
        Consumer<? super Throwable> error
) {
    long current = disposableAddress.size();

    Disposable subscription = flowable.
            timeout(DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS)
            .doOnError(throwable -> {
                remove(current);
            })
            .doOnComplete(() -> {
                remove(current);
            })
            .subscribe(complete, error);

    disposableAddress.append(current, subscription);
}
 
源代码14 项目: Hands-Chopping   文件: RxUtil.java
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view) {
    return new ObservableTransformer<T, T>() {
        @Override
        public Observable<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            view.showLoading();//显示进度条
                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doFinally(new Action() {
                        @Override
                        public void run() {
                            view.hideLoading();//隐藏进度条
                        }
                    }).compose(RxLifecycleUtils.bindToLifecycle(view));
        }
    };
}
 
源代码15 项目: SimpleCropView   文件: CropImageView.java
/**
 * Crop image with RxJava2
 *
 * @param sourceUri Uri for cropping(If null, the Uri set in loadAsSingle() is used)
 *
 * @return Single of cropping image
 */
public Single<Bitmap> cropAsSingle(final Uri sourceUri) {
  return Single.fromCallable(new Callable<Bitmap>() {

    @Override public Bitmap call() throws Exception {
      if (sourceUri != null) mSourceUri = sourceUri;
      return cropImage();
    }
  }).doOnSubscribe(new Consumer<Disposable>() {
    @Override public void accept(@NonNull Disposable disposable) throws Exception {
      mIsCropping.set(true);
    }
  }).doFinally(new Action() {
    @Override public void run() throws Exception {
      mIsCropping.set(false);
    }
  });
}
 
源代码16 项目: whorlwind   文件: RealWhorlwind.java
@Override public Completable write(@NonNull final String name, @Nullable final ByteString value) {
  return Completable.fromAction(new Action() {
    @Override public void run() throws Exception {
      checkCanStoreSecurely();

      synchronized (dataLock) {
        if (value == null) {
          storage.remove(name);
          return;
        }

        prepareKeyStore();

        Cipher cipher = createCipher();
        cipher.init(Cipher.ENCRYPT_MODE, getPublicKey());

        storage.put(name, ByteString.of(cipher.doFinal(value.toByteArray())));
      }
    }
  });
}
 
源代码17 项目: rxfirebase   文件: AuthStateChangesOnSubscribe.java
/**
 * @param emitter
 */
@Override
public void subscribe(final ObservableEmitter<FirebaseAuth> emitter) {
    final FirebaseAuth.AuthStateListener listener = new FirebaseAuth.AuthStateListener() {
        @Override
        public void onAuthStateChanged(@NonNull FirebaseAuth firebaseAuth) {
            if (!emitter.isDisposed()) {
                emitter.onNext(firebaseAuth);
            }
        }
    };

    instance.addAuthStateListener(listener);

    emitter.setDisposable(Disposables.fromAction(new Action() {
        @Override
        public void run() throws Exception {
            instance.removeAuthStateListener(listener);
        }
    }));
}
 
源代码18 项目: Aurora   文件: RxUtils.java
public static <T> ObservableTransformer<T, T> applySchedulersWithLifeCycle(IView view) {
    return new ObservableTransformer<T, T>() {
        @Override
        public Observable<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doAfterTerminate(new Action() {
                        @Override
                        public void run() {
                            view.hideLoading();//隐藏进度条
                        }
                    })
                    .compose(RxLifecycleUtils.bindToLifecycle(view));
        }
    };
}
 
源代码19 项目: AssistantBySDK   文件: NaviProcessor.java
/**
 * 语音合成后伴随后续动作
 **/
private void executeAfterSpeak(String preText, final String afterText, final int type, SpeechMsgBuilder msgBuilder) {
    msgBuilder.setText(preText).setForceLocalEngine(true);
    SynthesizerBase.get().startSpeakAbsolute(msgBuilder.build())
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    if (EventBus.getDefault().hasSubscriberForEvent(NavigateEvent.class)) {
                        EventBus.getDefault().post(new NavigateEvent(type));
                    } else {
                        SynthesizerBase.get().startSpeakAbsolute(SpeechMsgBuilder.create(afterText).build())
                                .subscribeOn(Schedulers.io())
                                .observeOn(Schedulers.computation())
                                .subscribe();
                        EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(afterText), null, null, null));
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .subscribe();
    EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(preText), null, null, null));
}
 
源代码20 项目: AssistantBySDK   文件: NaviProcessor.java
/**
 * 合成文本并提前返回
 **/

private void speakAndAheadReturn(String text, SpeechMsgBuilder msgBuilder) {
/* 将回复文本发送到聊天列表 */
    EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(text), null, null, null));
    /* 合成回复文本 */
    msgBuilder.setText(text).setForceLocalEngine(true);
    SynthesizerBase.get().startSpeakAbsolute(msgBuilder.build())
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    EventBus.getDefault().post(new NavigateEvent(NavigateEvent.START_NAVI));
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .subscribe();
}
 
源代码21 项目: RxAndroidBle   文件: ConnectOperation.java
@Override
protected void protectedRun(final ObservableEmitter<BluetoothGatt> emitter, final QueueReleaseInterface queueReleaseInterface) {
    final Action queueReleaseAction = new Action() {
        @Override
        public void run() {
            queueReleaseInterface.release();
        }
    };
    final DisposableSingleObserver<BluetoothGatt> disposableGattObserver = getConnectedBluetoothGatt()
            .compose(wrapWithTimeoutWhenNotAutoconnecting())
            // when there are no subscribers there is no point of continuing work -> next will be disconnect operation
            .doFinally(queueReleaseAction)
            .subscribeWith(disposableSingleObserverFromEmitter(emitter));
    emitter.setDisposable(disposableGattObserver);

    if (autoConnect) {
        // with autoConnect the connection may be established after a really long time
        queueReleaseInterface.release();
    }
}
 
源代码22 项目: AssistantBySDK   文件: AssistPresenter.java
/**
 * 发送回复文本文本视图并合成声音
 **/
private void showAndSpeak(SpeechMsgBuilder builder) {
    EventBus.getDefault().post(new ChatMsgEvent(new ResponseMsg(builder.getText()), null, null, null));
    IflySynthesizer.getInstance().startSpeakAbsolute(builder.build())
            .doOnNext(new Consumer<SpeechMsg>() {
                @Override
                public void accept(SpeechMsg speechMsg) throws Exception {
                    if (speechMsg.state() == SpeechMsg.State.OnBegin)
                        EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_START));
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    EventBus.getDefault().post(new SynthesizeEvent(SynthesizeEvent.SYNTH_END));
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .subscribe();
}
 
源代码23 项目: AssistantBySDK   文件: ExampleUnitTest.java
public void test3(){
    start3(new Vo(10)).doOnError(new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            throwable.printStackTrace();
        }
    }).doOnComplete(new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("doOnCompleted......."+Thread.currentThread());
        }
    }).doOnTerminate(new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("doOnTerminate......."+Thread.currentThread());
        }
    }).subscribe();
    try {
        Thread.sleep(30000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
源代码24 项目: RxAndroidBle   文件: RxBleDeviceImpl.java
public Observable<RxBleConnection> establishConnection(final ConnectionSetup options) {
    return Observable.defer(new Callable<ObservableSource<RxBleConnection>>() {
        @Override
        public ObservableSource<RxBleConnection> call() {
            if (isConnected.compareAndSet(false, true)) {
                return connector.prepareConnection(options)
                        .doFinally(new Action() {
                            @Override
                            public void run() {
                                isConnected.set(false);
                            }
                        });
            } else {
                return Observable.error(new BleAlreadyConnectedException(bluetoothDevice.getAddress()));
            }
        }
    });
}
 
源代码25 项目: redisson   文件: ElementsStream.java
public static <V> Flowable<V> takeElements(Supplier<RFuture<V>> callable) {
    ReplayProcessor<V> p = ReplayProcessor.create();
    return p.doOnRequest(new LongConsumer() {
        @Override
        public void accept(long n) throws Exception {
            AtomicLong counter = new AtomicLong(n);
            AtomicReference<RFuture<V>> futureRef = new AtomicReference<RFuture<V>>();
            
            take(callable, p, counter, futureRef);

            p.doOnCancel(new Action() {
                @Override
                public void run() throws Exception {
                    futureRef.get().cancel(true);
                }
            });
        }
    });
}
 
源代码26 项目: RxEasyHttp   文件: RxUtil.java
public static <T> ObservableTransformer<T, T> io_main() {
    return new ObservableTransformer<T, T>() {
        @Override
        public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
            return upstream
                    .subscribeOn(Schedulers.io())
                    .unsubscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed());
                        }
                    })
                    .doFinally(new Action() {
                        @Override
                        public void run() throws Exception {
                            HttpLog.i("+++doFinally+++");
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码27 项目: RxEasyHttp   文件: RxUtil.java
public static <T> ObservableTransformer<ApiResult<T>, T> _io_main() {
    return new ObservableTransformer<ApiResult<T>, T>() {
        @Override
        public ObservableSource<T> apply(@NonNull Observable<ApiResult<T>> upstream) {
            return upstream
                    .subscribeOn(Schedulers.io())
                    .unsubscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .map(new HandleFuc<T>())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            HttpLog.i("+++doOnSubscribe+++" + disposable.isDisposed());
                        }
                    })
                    .doFinally(new Action() {
                        @Override
                        public void run() throws Exception {
                            HttpLog.i("+++doFinally+++");
                        }
                    })
                    .onErrorResumeNext(new HttpResponseFunc<T>());
        }
    };
}
 
源代码28 项目: rxjava2-jdbc   文件: NonBlockingPool.java
NonBlockingPool(Callable<? extends T> factory, Predicate<? super T> healthCheck, Consumer<? super T> disposer,
        int maxSize, long idleTimeBeforeHealthCheckMs, long maxIdleTimeMs, long createRetryIntervalMs,
        BiFunction<? super T, ? super Checkin, ? extends T> checkinDecorator, Scheduler scheduler,
        Action closeAction) {
    Preconditions.checkNotNull(factory);
    Preconditions.checkNotNull(healthCheck);
    Preconditions.checkNotNull(disposer);
    Preconditions.checkArgument(maxSize > 0);
    Preconditions.checkNotNull(checkinDecorator);
    Preconditions.checkNotNull(scheduler);
    Preconditions.checkArgument(createRetryIntervalMs >= 0, "createRetryIntervalMs must be >=0");
    Preconditions.checkNotNull(closeAction);
    Preconditions.checkArgument(maxIdleTimeMs >= 0, "maxIdleTime must be >=0");
    this.factory = factory;
    this.healthCheck = healthCheck;
    this.disposer = disposer;
    this.maxSize = maxSize;
    this.idleTimeBeforeHealthCheckMs = idleTimeBeforeHealthCheckMs;
    this.maxIdleTimeMs = maxIdleTimeMs;
    this.createRetryIntervalMs = createRetryIntervalMs;
    this.checkinDecorator = checkinDecorator;
    this.scheduler = scheduler;// schedules retries
    this.closeAction = closeAction;
}
 
源代码29 项目: Aurora   文件: RxUtils.java
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view,final boolean isLoadMore) {
    return new ObservableTransformer<T, T>() {
        @Override
        public Observable<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            if (!isLoadMore){
                                view.showLoading();
                            }
                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doAfterTerminate(new Action() {
                        @Override
                        public void run() {
                            view.hideLoading();//隐藏进度条
                        }
                    }).compose(RxLifecycleUtils.bindToLifecycle(view));
        }
    };
}
 
源代码30 项目: rxjava-RxLife   文件: ObservableLife.java
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                                  Action onComplete, Consumer<? super Disposable> onSubscribe) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

    subscribe(ls);

    return ls;
}
 
 类所在包
 类方法
 同包方法