io.reactivex.rxjava3.schedulers.Schedulers源码实例Demo

类io.reactivex.rxjava3.schedulers.Schedulers源码实例Demo

下面列出了io.reactivex.rxjava3.schedulers.Schedulers 类实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: mobius   文件: TransformersTest.java
@Test
public void processingLongEffectsDoesNotBlockProcessingShorterEffects() {
  final List<String> effects = Arrays.asList("Hello", "Rx");

  PublishSubject<String> upstream = PublishSubject.create();
  Function<String, Integer> sleepyFunction =
      s -> {
        try {
          Thread.sleep(duration(s));
        } catch (InterruptedException ie) {
        }
        return s.length();
      };

  final List<Integer> results = new ArrayList<>();
  upstream
      .compose(Transformers.fromFunction(sleepyFunction, Schedulers.io()))
      .subscribe(results::add);

  Observable.fromIterable(effects).subscribe(upstream);

  await().atMost(durationForEffects(effects)).until(() -> results.equals(expected(effects)));
}
 
源代码2 项目: zap-android   文件: Wallet.java
/**
 * This will fetch the current balance from LND.
 * All Listeners registered to BalanceListener will be informed about any changes.
 */
public void fetchBalanceFromLND() {

    Single<WalletBalanceResponse> walletBalance = LndConnection.getInstance().getLightningService().walletBalance(WalletBalanceRequest.newBuilder().build());
    Single<ChannelBalanceResponse> channelBalance = LndConnection.getInstance().getLightningService().channelBalance(ChannelBalanceRequest.newBuilder().build());
    Single<PendingChannelsResponse> pendingChannels = LndConnection.getInstance().getLightningService().pendingChannels(PendingChannelsRequest.newBuilder().build());

    compositeDisposable.add(Single.zip(walletBalance, channelBalance, pendingChannels, (walletBalanceResponse, channelBalanceResponse, pendingChannelsResponse) -> {

        setOnChainBalance(walletBalanceResponse.getTotalBalance(), walletBalanceResponse.getConfirmedBalance(), walletBalanceResponse.getUnconfirmedBalance());
        setChannelBalance(channelBalanceResponse.getBalance(), channelBalanceResponse.getPendingOpenBalance());
        setChannelBalanceLimbo(pendingChannelsResponse.getTotalLimboBalance());

        return true;
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(aBoolean -> {
        // Zip executed without error
        broadcastBalanceUpdate();
    }, throwable -> ZapLog.debug(LOG_TAG, "Exception in fetch balance task: " + throwable.getMessage())));
}
 
源代码3 项目: GetApk   文件: LocalRepository.java
public Disposable getAndSort(Context context, final boolean sortByTime, final DateFormat dateFormat, KWSubscriber<ItemArray> subscriber) {
    return Flowable.just(new WeakReference<>(context))
            .flatMap(new Function<WeakReference<Context>, Publisher<List<App>>>() {
                @Override
                public Publisher<List<App>> apply(WeakReference<Context> weakContext) throws Exception {
                    List<App> apps = getApps();
                    if (apps == null) {
                        return getApps(weakContext.get(), dateFormat);
                    } else {
                        return Flowable.just(apps);
                    }
                }
            })
            .map(new SortFunction(dateFormat, sortByTime))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(subscriber);
}
 
源代码4 项目: GetApk   文件: LocalRepository.java
@Deprecated
public Disposable saveApk(App app, final String dest, final KWSubscriber<String> subscriber) {
    return Flowable.just(app)
            .map(new Function<App, String>() {
                @Override
                public String apply(App source) throws Exception {
                    String fileName = source.name + "_" + source.versionName + ".apk";
                    return FileUtil.copy(source.apkPath, dest, fileName, new OnCopyListener() {
                        @Override
                        public void inProgress(final float progress) {
                            mHandler.post(new Runnable() {
                                @Override
                                public void run() {
                                    subscriber.inProgress(progress);
                                }
                            });
                        }
                    }).getAbsolutePath();
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(subscriber);
}
 
源代码5 项目: GetApk   文件: LocalRepository.java
public Disposable saveApk(ContentResolver resolver, App app, final Uri dest, final KWSubscriber<Uri> subscriber) {
    return Flowable.just(app)
            .map(new Function<App, Uri>() {
                @Override
                public Uri apply(App source) throws Exception {
                    FileUtil.copy(resolver, Uri.fromFile(new File(source.apkPath)), dest, new OnCopyListener() {
                        @Override
                        public void inProgress(final float progress) {
                            mHandler.post(new Runnable() {
                                @Override
                                public void run() {
                                    subscriber.inProgress(progress);
                                }
                            });
                        }
                    });
                    return dest;
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(subscriber);
}
 
源代码6 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void subscribeCancelRace() {
    for (int i = 0; i < 500; i++) {
        final TestObserver<Integer> ts = new TestObserver<Integer>();

        final ReplayRelay<Integer> rp = ReplayRelay.create();

        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                rp.subscribe(ts);
            }
        };

        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                ts.dispose();
            }
        };

        TestHelper.race(r1, r2, Schedulers.single());
    }
}
 
源代码7 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void subscribeRace() {
    for (int i = 0; i < 500; i++) {
        final ReplayRelay<Integer> rp = ReplayRelay.create();

        Runnable r1 = new Runnable() {
            @Override
            @SuppressWarnings("CheckReturnValue")
            public void run() {
                rp.test();
            }
        };

        TestHelper.race(r1, r1, Schedulers.single());
    }
}
 
源代码8 项目: RxRelay   文件: BehaviorRelayTest.java
@Test
public void addRemoveRace() {
    for (int i = 0; i < 500; i++) {
        final BehaviorRelay<Object> p = BehaviorRelay.create();

        final TestObserver<Object> ts = p.test();

        Runnable r1 = new Runnable() {
            @Override
            @SuppressWarnings("CheckReturnValue")
            public void run() {
                p.test();
            }
        };

        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                ts.dispose();
            }
        };

        TestHelper.race(r1, r2, Schedulers.single());
    }
}
 
源代码9 项目: RxRelay   文件: PublishRelayTest.java
@Test
public void addRemoveRance() throws Exception {

    for (int i = 0; i < 100; i++) {
        final PublishRelay<Integer> pp = PublishRelay.create();

        final TestObserver<Integer> ts = pp.test();

        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                pp.subscribe();
            }
        };
        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                ts.dispose();
            }
        };

        TestHelper.race(r1, r2, Schedulers.io());
    }
}
 
源代码10 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void subscribeCancelRace() {
    for (int i = 0; i < 500; i++) {
        final TestObserver<Integer> ts = new TestObserver<Integer>();

        final ReplayRelay<Integer> rp = ReplayRelay.create();

        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                rp.subscribe(ts);
            }
        };

        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                ts.dispose();
            }
        };

        TestHelper.race(r1, r2, Schedulers.single());
    }
}
 
源代码11 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void subscribeRace() {
    for (int i = 0; i < 500; i++) {
        final ReplayRelay<Integer> rp = ReplayRelay.create();

        Runnable r1 = new Runnable() {
            @Override
            @SuppressWarnings("CheckReturnValue")
            public void run() {
                rp.test();
            }
        };

        TestHelper.race(r1, r1, Schedulers.single());
    }
}
 
源代码12 项目: RxRelay   文件: BehaviorRelayTest.java
@Test
public void addRemoveRace() {
    for (int i = 0; i < 500; i++) {
        final BehaviorRelay<Object> p = BehaviorRelay.create();

        final TestObserver<Object> ts = p.test();

        Runnable r1 = new Runnable() {
            @Override
            @SuppressWarnings("CheckReturnValue")
            public void run() {
                p.test();
            }
        };

        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                ts.dispose();
            }
        };

        TestHelper.race(r1, r2, Schedulers.single());
    }
}
 
源代码13 项目: RxRelay   文件: PublishRelayTest.java
@Test
public void addRemoveRance() throws Exception {

    for (int i = 0; i < 100; i++) {
        final PublishRelay<Integer> pp = PublishRelay.create();

        final TestObserver<Integer> ts = pp.test();

        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                pp.subscribe();
            }
        };
        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                ts.dispose();
            }
        };

        TestHelper.race(r1, r2, Schedulers.io());
    }
}
 
源代码14 项目: akarnokd-misc   文件: SomeBlocking.java
@Test
public void test() {
    Observable
    .range(1, 20)
    .flatMap(
            integer -> {
                if (integer % 5 != 0) {
                    return Observable
                            .just(integer);
                }

                return Observable
                        .just(-integer)
                        .observeOn(Schedulers.io());
            },
            false,
            1
    )
    .ignoreElements()
    .blockingAwait();
}
 
源代码15 项目: GetApk   文件: AppDelegate.java
public void getIcon() {
    recycler();
    final WeakReference<App> appWeak = new WeakReference<>(mApp);
    mDisposable = Flowable.just(itemView.getContext().getApplicationContext().getPackageManager())
            .map(new Function<PackageManager, Drawable>() {
                @Override
                public Drawable apply(PackageManager pm) throws Exception {
                    App app = appWeak.get();
                    if (app != null){
                        return app.applicationInfo.loadIcon(pm);
                    }else {
                        throw new Exception("app is null");
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableSubscriber<Drawable>() {
                @Override
                public void onNext(Drawable drawable) {
                    mIconImg.setImageDrawable(drawable);
                }

                @Override
                public void onError(Throwable t) {

                }

                @Override
                public void onComplete() {

                }
            });
}
 
源代码16 项目: GetApk   文件: LocalRepository.java
public Disposable getApp(Context context, final Uri path, KWSubscriber<App> subscriber) {
    return Flowable.just(new WeakReference<>(context))
            .map(weakContext -> {
                PackageManager pm = weakContext.get().getPackageManager();
                String realPath;
                if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q){
                    File file = new File(weakContext.get().getExternalCacheDir(), "temp.apk");
                    file.deleteOnExit();
                    realPath = file.getAbsolutePath();
                    FileUtil.copy(weakContext.get().getContentResolver(), path, Uri.fromFile(new File(realPath)), null);
                }else {
                    realPath = FileUtil.getPath(weakContext.get(), path);
                }
                PackageInfo info = pm.getPackageArchiveInfo(realPath, 0);
                if (info == null) {
                    throw new NullPointerException();
                }
                ApplicationInfo applicationInfo = info.applicationInfo;
                applicationInfo.sourceDir = realPath;
                applicationInfo.publicSourceDir = realPath;
                App app = new App(info, pm);
                app.isFormFile = true;
                return app;
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(subscriber);
}
 
源代码17 项目: Image-Cipher   文件: PixelTraversalController.java
void setClickObservable(Observable<Point> observable) {
  logger.info("Setting observable");
  disposable = observable.subscribeOn(Schedulers.computation())
      .subscribe(point -> {
        startingPoint = point;
        runPixelTraversal();
      });
}
 
源代码18 项目: code-examples   文件: ReactiveBatchProcessorV2.java
public void start() {
  // WARNING: this code doesn't work as expected
  messageSource.getMessageBatches()
      .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
      .doOnNext(batch -> logger.log(batch.toString()))
      .flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
      .flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
          .subscribeOn(threadPoolScheduler(threads, threadPoolQueueSize)))
      .subscribeWith(new SimpleSubscriber<>(threads, 1));
}
 
源代码19 项目: code-examples   文件: ReactiveBatchProcessorV2.java
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
  return Schedulers.from(new ThreadPoolExecutor(
      poolSize,
      poolSize,
      0L,
      TimeUnit.SECONDS,
      new LinkedBlockingDeque<>(queueSize)
  ));
}
 
源代码20 项目: code-examples   文件: ReactiveBatchProcessorV3.java
public void start() {
  // WARNING: this code doesn't work as expected
  Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);

  messageSource.getMessageBatches()
      .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
      .doOnNext(batch -> logger.log(batch.toString()))
      .flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
      .flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
          .subscribeOn(scheduler))
      .subscribeWith(new SimpleSubscriber<>(threads, 1));
}
 
源代码21 项目: code-examples   文件: ReactiveBatchProcessorV3.java
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
  return Schedulers.from(new ThreadPoolExecutor(
      poolSize,
      poolSize,
      0L,
      TimeUnit.SECONDS,
      new LinkedBlockingDeque<>(queueSize)
  ));
}
 
源代码22 项目: code-examples   文件: ReactiveBatchProcessor.java
void start() {

    Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);

    messageSource.getMessageBatches()
        .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
        .doOnNext(batch -> logger.log(batch.toString()))
        .flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
        .flatMapSingle(m -> Single.defer(() -> Single.just(m)
            .map(messageHandler::handleMessage))
            .subscribeOn(scheduler))
        .subscribeWith(new SimpleSubscriber<>(threads, 1));
  }
 
源代码23 项目: code-examples   文件: ReactiveBatchProcessor.java
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
  return Schedulers.from(new ThreadPoolExecutor(
      poolSize,
      poolSize,
      0L,
      TimeUnit.SECONDS,
      new LinkedBlockingDeque<>(queueSize),
      new WaitForCapacityPolicy()
  ));
}
 
源代码24 项目: code-examples   文件: ReactiveBatchProcessorV1.java
public void start() {
  // WARNING: this code doesn't work as expected
  messageSource.getMessageBatches()
      .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
      .doOnNext(batch -> logger.log(batch.toString()))
      .flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
      .flatMapSingle(m -> Single.just(messageHandler.handleMessage(m))
          .subscribeOn(threadPoolScheduler(threads, threadPoolQueueSize)))
      .subscribeWith(new SimpleSubscriber<>(threads, 1));
}
 
源代码25 项目: code-examples   文件: ReactiveBatchProcessorV1.java
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
  return Schedulers.from(new ThreadPoolExecutor(
      poolSize,
      poolSize,
      0L,
      TimeUnit.SECONDS,
      new LinkedBlockingDeque<>(queueSize)
  ));
}
 
源代码26 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void createWithTimeAndSizeInvalidCapacity() {
    try {
        ReplayRelay.createWithTimeAndSize(1, TimeUnit.DAYS, Schedulers.computation(), -99);
        fail("Didn't throw IllegalArgumentException");
    } catch (IllegalArgumentException ex) {
        assertEquals("maxSize > 0 required but it was -99", ex.getMessage());
    }
}
 
源代码27 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void cancelRace() {
    for (int i = 0; i < 500; i++) {

        final ReplayRelay<Integer> rp = ReplayRelay.create();
        final TestObserver<Integer> ts1 = rp.test();
        final TestObserver<Integer> ts2 = rp.test();

        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                ts1.dispose();
            }
        };

        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                ts2.dispose();
            }
        };

        TestHelper.race(r1, r2, Schedulers.single());

        assertFalse(rp.hasObservers());
    }
}
 
源代码28 项目: RxRelay   文件: ReplayRelayTest.java
@Test
public void dispose() {
    TestHelper.checkDisposed(ReplayRelay.create());

    TestHelper.checkDisposed(ReplayRelay.createUnbounded());

    TestHelper.checkDisposed(ReplayRelay.createWithSize(10));

    TestHelper.checkDisposed(
        ReplayRelay.createWithTimeAndSize(1, TimeUnit.SECONDS, Schedulers.single(), 10));
}
 
源代码29 项目: RxRelay   文件: BehaviorRelayTest.java
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void subscribeOnNextRace() {
    for (int i = 0; i < 500; i++) {
        final BehaviorRelay<Object> p = BehaviorRelay.createDefault((Object)1);

        final TestObserver[] ts = { null };

        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                ts[0] = p.test();
            }
        };

        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                p.accept(2);
            }
        };

        TestHelper.race(r1, r2, Schedulers.single());

        if (ts[0].values().size() == 1) {
            ts[0].assertValue(2).assertNoErrors().assertNotComplete();
        } else {
            ts[0].assertValues(1, 2).assertNoErrors().assertNotComplete();
        }
    }
}
 
源代码30 项目: RxRelay   文件: SerializedRelayTest.java
@Test
public void onNextOnNextRace() {
    for (int i = 0; i < 500; i++) {
        final Relay<Integer> s = PublishRelay.<Integer>create().toSerialized();

        TestObserver<Integer> ts = s.test();

        Runnable r1 = new Runnable() {
            @Override
            public void run() {
                s.accept(1);
            }
        };

        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                s.accept(2);
            }
        };

        TestHelper.race(r1, r2, Schedulers.single());

        List<Integer> actual = ts.assertNoErrors().assertNotComplete().values();
        List<Integer> expected = Arrays.asList(1, 2);
        assertTrue("The collections are not the same", actual.size() == expected.size()
                && actual.containsAll(expected) && expected.containsAll(actual));
    }
}
 
源代码评论
动弹
沙发等你来抢
 类所在包
 类方法
 同包方法