类io.reactivex.Scheduler源码实例Demo

下面列出了怎么用io.reactivex.Scheduler的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: YiZhi   文件: RxBus.java
/**
 * 用于处理订阅事件在那个线程中执行
 *
 * @param observable       d
 * @param subscriberMethod d
 * @return Observable
 */
private Flowable postToObservable(Flowable observable, SubscriberMethod subscriberMethod) {
    Scheduler scheduler;
    switch (subscriberMethod.threadMode) {
        case MAIN:
            scheduler = AndroidSchedulers.mainThread();
            break;

        case NEW_THREAD:
            scheduler = Schedulers.newThread();
            break;

        case CURRENT_THREAD:
            scheduler = Schedulers.trampoline();
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
    }
    return observable.observeOn(scheduler);
}
 
源代码2 项目: RxAndroidBle   文件: DisconnectOperation.java
@Inject
DisconnectOperation(
        RxBleGattCallback rxBleGattCallback,
        BluetoothGattProvider bluetoothGattProvider,
        @Named(DeviceModule.MAC_ADDRESS) String macAddress,
        BluetoothManager bluetoothManager,
        @Named(ClientComponent.NamedSchedulers.BLUETOOTH_INTERACTION) Scheduler bluetoothInteractionScheduler,
        @Named(DeviceModule.DISCONNECT_TIMEOUT) TimeoutConfiguration timeoutConfiguration,
        ConnectionStateChangeListener connectionStateChangeListener) {
    this.rxBleGattCallback = rxBleGattCallback;
    this.bluetoothGattProvider = bluetoothGattProvider;
    this.macAddress = macAddress;
    this.bluetoothManager = bluetoothManager;
    this.bluetoothInteractionScheduler = bluetoothInteractionScheduler;
    this.timeoutConfiguration = timeoutConfiguration;
    this.connectionStateChangeListener = connectionStateChangeListener;
}
 
源代码3 项目: Reactive-Android-Programming   文件: Sandbox.java
private static void demo2() throws Exception {
    final ExecutorService executor = Executors.newFixedThreadPool(1000);
    final Scheduler pooledScheduler = Schedulers.from(executor);

    Observable.range(1, 10000)
            .flatMap(i -> Observable.just(i)
                    .subscribeOn(pooledScheduler)
                    .map(Sandbox::importantLongTask)
            )
            .doOnTerminate(WAIT_LATCH::countDown)
            .map(Object::toString)
            .subscribe(e -> log("subscribe", e));

    WAIT_LATCH.await();
    executor.shutdown();
}
 
CharacteristicLongWriteOperation(
        BluetoothGatt bluetoothGatt,
        RxBleGattCallback rxBleGattCallback,
        @Named(ClientComponent.NamedSchedulers.BLUETOOTH_INTERACTION) Scheduler bluetoothInteractionScheduler,
        @Named(ConnectionModule.OPERATION_TIMEOUT) TimeoutConfiguration timeoutConfiguration,
        BluetoothGattCharacteristic bluetoothGattCharacteristic,
        PayloadSizeLimitProvider batchSizeProvider,
        WriteOperationAckStrategy writeOperationAckStrategy,
        WriteOperationRetryStrategy writeOperationRetryStrategy,
        byte[] bytesToWrite) {
    this.bluetoothGatt = bluetoothGatt;
    this.rxBleGattCallback = rxBleGattCallback;
    this.bluetoothInteractionScheduler = bluetoothInteractionScheduler;
    this.timeoutConfiguration = timeoutConfiguration;
    this.bluetoothGattCharacteristic = bluetoothGattCharacteristic;
    this.batchSizeProvider = batchSizeProvider;
    this.writeOperationAckStrategy = writeOperationAckStrategy;
    this.writeOperationRetryStrategy = writeOperationRetryStrategy;
    this.bytesToWrite = bytesToWrite;
}
 
源代码5 项目: MyBookshelf   文件: DownloadTaskImpl.java
private void whenError(Scheduler scheduler) {
    if (!isDownloading) {
        return;
    }

    if (isFinishing()) {
        stopDownload();
        if (downloadBook.getSuccessCount() == 0) {
            onDownloadError(downloadBook);
        } else {
            onDownloadComplete(downloadBook);
        }
    } else {
        toDownload(scheduler);
    }
}
 
源代码6 项目: retrocache   文件: RxJava2CachedCallAdapter.java
RxJava2CachedCallAdapter(Cache<String, byte[]> cachingSystem, Type responseType, Scheduler scheduler, Retrofit retrofit, Annotation[] annotations,
                         boolean mAsync, boolean mResult, boolean mBody, boolean mFlowable, boolean mSingle, boolean mMaybe,
                         boolean mCompletable) {

    this.mCachingSystem = cachingSystem;
    this.mResponseType = responseType;
    this.mScheduler = scheduler;
    this.mRetrofit = retrofit;
    this.mAnnotations = annotations;
    this.mAsync = mAsync;
    this.mResult = mResult;
    this.mBody = mBody;
    this.mFlowable = mFlowable;
    this.mSingle = mSingle;
    this.mMaybe = mMaybe;
    this.mCompletable = mCompletable;
}
 
源代码7 项目: RIBs   文件: AndroidSchedulersRule.java
@Override
protected void starting(Description description) {
  if (restoreHandlers) {
    // https://github.com/ReactiveX/RxAndroid/pull/358
    //            originalInitMainThreadInitHandler =
    // RxAndroidPlugins.getInitMainThreadScheduler();
    //            originalMainThreadHandler = RxAndroidPlugins.getMainThreadScheduler();
  }
  RxAndroidPlugins.reset();
  RxAndroidPlugins.setInitMainThreadSchedulerHandler(
      new Function<Callable<Scheduler>, Scheduler>() {
        @Override
        public Scheduler apply(Callable<Scheduler> schedulerCallable) throws Exception {
          return delegatingMainThreadScheduler;
        }
      });
  RxAndroidPlugins.setMainThreadSchedulerHandler(
      new Function<Scheduler, Scheduler>() {
        @Override
        public Scheduler apply(Scheduler scheduler) throws Exception {
          return delegatingMainThreadScheduler;
        }
      });
}
 
源代码8 项目: RxBus2   文件: RxBus.java
/**
 * 用于处理订阅事件在那个线程中执行
 *
 * @param observable       d
 * @param subscriberMethod d
 * @return Observable
 */
private Flowable postToObservable(Flowable observable, SubscriberMethod subscriberMethod) {
    Scheduler scheduler;
    switch (subscriberMethod.threadMode) {
        case MAIN:
            scheduler = AndroidSchedulers.mainThread();
            break;

        case NEW_THREAD:
            scheduler = Schedulers.newThread();
            break;

        case CURRENT_THREAD:
            scheduler = Schedulers.trampoline();
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
    }
    return observable.observeOn(scheduler);
}
 
源代码9 项目: storio   文件: RxJavaUtils.java
@CheckResult
@NonNull
public static <T> Flowable<T> subscribeOn(
        @NonNull StorIOContentResolver storIOContentResolver,
        @NonNull Flowable<T> flowable
) {
    final Scheduler scheduler = storIOContentResolver.defaultRxScheduler();
    return scheduler != null ? flowable.subscribeOn(scheduler) : flowable;
}
 
源代码10 项目: AndroidGodEye   文件: ThreadHelper.java
public static void setupRxjava() {
    TestScheduler testScheduler = new TestScheduler();
    RxJavaPlugins.setComputationSchedulerHandler(new Function<Scheduler, Scheduler>() {
        @Override
        public Scheduler apply(Scheduler scheduler) throws Exception {
            return testScheduler;
        }
    });
}
 
@Test
void custom_Scheduler_test() {
    Scheduler customScheduler = custom_Scheduler();
    Observable.just("Apple", "Orange", "Appla")
              .subscribeOn(customScheduler)
              .map(ConcurrencyTest::delayCalculation)
              .observeOn(customScheduler)
              .map(String::length)
              .subscribe(ConcurrencyTest::log);
    sleep(10000);
}
 
源代码12 项目: iroha-android   文件: SendAssetInteractor.java
@Inject
SendAssetInteractor(@Named(ApplicationModule.JOB) Scheduler jobScheduler,
                    @Named(ApplicationModule.UI) Scheduler uiScheduler,
                    ManagedChannel managedChannel, PreferencesUtil preferencesUtil) {
    super(jobScheduler, uiScheduler);
    this.channel = managedChannel;
    this.preferenceUtils = preferencesUtil;
}
 
源代码13 项目: rxjava2-jdbc   文件: DatabaseCreator.java
public static Database create(int maxSize, boolean big, Scheduler scheduler) {
    NonBlockingConnectionPool pool = Pools.nonBlocking() //
            .connectionProvider(connectionProvider(nextUrl(), big)) //
            .maxPoolSize(maxSize) //
            .scheduler(scheduler) //
            .build();
    return Database.from(pool, () -> {
        pool.close();
        scheduler.shutdown();
    });
}
 
@Inject
GetAccountTransactionsInteractor(@Named(ApplicationModule.JOB) Scheduler jobScheduler,
                                 @Named(ApplicationModule.UI) Scheduler uiScheduler,
                                 PreferencesUtil preferenceUtils, ManagedChannel channel) {
    super(jobScheduler, uiScheduler);
    this.preferenceUtils = preferenceUtils;
    this.channel = channel;
}
 
源代码15 项目: java-unified-sdk   文件: AVObjectAsyncTest.java
public AVObjectAsyncTest(String testName) {
  super(testName);
  AppConfiguration.config(true, new AppConfiguration.SchedulerCreator() {
    public Scheduler create() {
      return Schedulers.newThread();
    }
  });
  Configure.initializeRuntime();
}
 
源代码16 项目: sqlbrite   文件: BriteContentResolver.java
BriteContentResolver(ContentResolver contentResolver, Logger logger, Scheduler scheduler,
    ObservableTransformer<Query, Query> queryTransformer) {
  this.contentResolver = contentResolver;
  this.logger = logger;
  this.scheduler = scheduler;
  this.queryTransformer = queryTransformer;
}
 
源代码17 项目: xDrip   文件: JamBaseBluetoothService.java
@NonNull
@Override
public Observable<Void> asObservable(BluetoothGatt bluetoothGatt,
                                     RxBleGattCallback rxBleGattCallback,
                                     Scheduler scheduler) throws Throwable {

    return Observable.fromCallable(() -> refreshDeviceCache(bluetoothGatt))
            .delay(delay_ms, TimeUnit.MILLISECONDS, Schedulers.computation())
            .subscribeOn(scheduler);
}
 
源代码18 项目: adamant-android   文件: AdamantApiWrapper.java
public AdamantApiWrapper(AdamantApiBuilder apiBuilder, AdamantKeyGenerator keyGenerator, Scheduler scheduler) {
    this.apiBuilder = apiBuilder;
    this.keyGenerator = keyGenerator;
    this.scheduler = scheduler;

    buildApi();
}
 
源代码19 项目: java-unified-sdk   文件: SetOperationTests.java
public SetOperationTests(String testName) {
  super(testName);
  AppConfiguration.config(true, new AppConfiguration.SchedulerCreator() {
    public Scheduler create() {
      return Schedulers.newThread();
    }
  });
  Configure.initializeRuntime();
}
 
@Incoming("hello")
@Outgoing("out")
public Flowable<String> consume(Flowable<String> values) {
    Scheduler scheduler = Schedulers.from(executor);
    return values
            .observeOn(scheduler)
            .delay(1, TimeUnit.MILLISECONDS, scheduler)
            .doOnError(err -> {
                downstreamFailure = err;
            });
}
 
源代码21 项目: sqlbrite   文件: BriteDatabase.java
BriteDatabase(SupportSQLiteOpenHelper helper, Logger logger, Scheduler scheduler,
    ObservableTransformer<Query, Query> queryTransformer) {
  this.helper = helper;
  this.logger = logger;
  this.scheduler = scheduler;
  this.queryTransformer = queryTransformer;
}
 
源代码22 项目: storio   文件: RxJavaUtils.java
@CheckResult
@NonNull
public static Completable subscribeOn(
        @NonNull StorIOSQLite storIOSQLite,
        @NonNull Completable completable
) {
    final Scheduler scheduler = storIOSQLite.defaultRxScheduler();
    return scheduler != null ? completable.subscribeOn(scheduler) : completable;
}
 
源代码23 项目: atlas   文件: RxAndroidPlugins.java
static Scheduler applyRequireNonNull(Function<Callable<Scheduler>, Scheduler> f, Callable<Scheduler> s) {
    Scheduler scheduler = apply(f,s);
    if (scheduler == null) {
        throw new NullPointerException("Scheduler Callable returned null");
    }
    return scheduler;
}
 
源代码24 项目: rxjava2-extras   文件: Transformers.java
/**
 * Buffers the source {@link Flowable} into {@link List}s, emitting Lists when
 * the size of a list reaches {@code maxSize} or if the elapsed time since last
 * emission from the source reaches the given duration.
 * 
 * @param maxSize
 *            max size of emitted lists
 * @param duration
 *            function that based on the last emission calculates the elapsed
 *            time to be used before emitting a buffered list
 * @param unit
 *            unit of {@code duration}
 * @param scheduler
 *            scheduler to use to schedule emission of a buffer (as a list) if
 *            the time since last emission from the source reaches duration
 * @param <T>
 *            type of the source stream items
 * @return source with operator applied
 */
public static <T> FlowableTransformer<T, List<T>> buffer(final int maxSize,
        final Function<? super T, ? extends Long> duration, final TimeUnit unit, final Scheduler scheduler) {

    final BiPredicate<List<T>, MyOptional<T>> condition = new BiPredicate<List<T>, MyOptional<T>>() {
        @Override
        public boolean test(List<T> list, MyOptional<T> x) throws Exception {
            return list.size() < maxSize && x.isPresent();
        }
    };
    Function<MyOptional<T>, Long> timeout = new Function<MyOptional<T>, Long>() {
        @Override
        public Long apply(MyOptional<T> t) throws Exception {
            return duration.apply(t.get());
        }
    };
    final FlowableTransformer<MyOptional<T>, MyOptional<T>> insert = insert(timeout, unit,
            Functions.constant(MyOptional.<T>empty()), scheduler);

    final FlowableTransformer<MyOptional<T>, List<T>> collectWhile = collectWhile( //
            // factory
            ListFactoryHolder.<T>factory(), //
            // add function
            MyOptional.<T>addIfPresent(), //
            // condition
            condition);

    return new FlowableTransformer<T, List<T>>() {
        @Override
        public Publisher<List<T>> apply(Flowable<T> source) {

            return source //
                    .map(MyOptional.<T>of()) //
                    .compose(insert) //
                    .compose(collectWhile)
                    // need this filter because sometimes nothing gets added to list
                    .filter(MyOptional.<T>listHasElements()); //
        }
    };
}
 
源代码25 项目: java-unified-sdk   文件: StorageClient.java
private Observable wrapObservableInBackground(Observable observable) {
  if (null == observable) {
    return null;
  }
  Scheduler scheduler = Schedulers.io();
  if (asynchronized) {
    observable = observable.subscribeOn(scheduler);
  }
  if (null != defaultCreator) {
    observable = observable.observeOn(scheduler);
  }
  return observable;
}
 
@Test public void runningWorkReportsBusy() {
  Scheduler.Worker worker = scheduler.createWorker();
  worker.schedule(new Runnable() {
    @Override public void run() {
      assertBusy();
    }
  });
  delegate.triggerActions();
}
 
源代码27 项目: buffer-slayer   文件: RxReporter.java
private MessageGroupSubscriber(long messageTimeoutNanos,
                               int bufferedMaxMessages,
                               Sender<M, R> sender,
                               Scheduler scheduler) {
  this.messageTimeoutNanos = messageTimeoutNanos == 0 ? Long.MAX_VALUE : messageTimeoutNanos;
  this.bufferedMaxMessages = bufferedMaxMessages;
  this.sender = sender;
  this.scheduler = scheduler;
}
 
源代码28 项目: xDrip   文件: Ob1G5CollectionService.java
@NonNull
@Override
public Observable<Void> asObservable(BluetoothGatt bluetoothGatt,
                                     RxBleGattCallback rxBleGattCallback,
                                     Scheduler scheduler) throws Throwable {

    return Observable.fromCallable(() -> refreshDeviceCache(bluetoothGatt))
            .delay(delay_ms, TimeUnit.MILLISECONDS, Schedulers.computation())
            .subscribeOn(scheduler);
}
 
源代码29 项目: rxjava2-lab   文件: Code5.java
public static void main(String[] args) throws Exception {

        Scheduler scheduler = Schedulers.from(newFixedThreadPool(10, threadFactory));

        CountDownLatch latch = new CountDownLatch(1);

        // Synchronous emission
        Observable<Object> observable = Observable.create(emitter -> {
            for (String superHero : SUPER_HEROES) {
                log("Emitting: " + superHero);
                emitter.onNext(superHero);
            }
            log("Completing");
            emitter.onComplete();
        });

        log("---------------- Subscribing");
        observable
            .subscribeOn(scheduler)
            .subscribe(
                item -> log("Received " + item),
                error -> log("Error"),
                () -> {
                    log("Complete");
                    latch.countDown();
                });
        log("---------------- Subscribed");

        latch.await();
    }
 
源代码30 项目: mockstar   文件: BaseSchedulerProvider.java
@NonNull
Scheduler io();
 
 类所在包
 同包方法