io.reactivex.schedulers.Schedulers#from ( )源码实例Demo

下面列出了io.reactivex.schedulers.Schedulers#from ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: a   文件: DownloadService.java
@Override
public void onCreate() {
    super.onCreate();
    isRunning = true;
    //创建 Notification.Builder 对象
    NotificationCompat.Builder builder = new NotificationCompat.Builder(this, MApplication.channelIdDownload)
            .setSmallIcon(R.drawable.ic_download)
            .setLargeIcon(BitmapFactory.decodeResource(getResources(), R.mipmap.ic_launcher))
            .setOngoing(false)
            .setContentTitle(getString(R.string.download_offline_t))
            .setContentText(getString(R.string.download_offline_s));
    //发送通知
    Notification notification = builder.build();
    startForeground(notificationId, notification);

    SharedPreferences preferences = getSharedPreferences("CONFIG", 0);
    threadsNum = preferences.getInt(this.getString(R.string.pk_threads_num), 4);
    executor = Executors.newFixedThreadPool(threadsNum);
    scheduler = Schedulers.from(executor);
}
 
源代码2 项目: a   文件: CheckSourceService.java
@Override
public void onCreate() {
    super.onCreate();
    SharedPreferences preference = MApplication.getConfigPreferences();
    checkSourceListener = new CheckSourceListener() {
        @Override
        public void nextCheck() {
            CheckSourceService.this.nextCheck();
        }

        @Override
        public void compositeDisposableAdd(Disposable disposable) {
            compositeDisposable.add(disposable);
        }

        @Override
        public int getCheckIndex() {
            return checkIndex;
        }
    };
    threadsNum = preference.getInt(this.getString(R.string.pk_threads_num), 6);
    executorService = Executors.newFixedThreadPool(threadsNum);
    scheduler = Schedulers.from(executorService);
    compositeDisposable = new CompositeDisposable();
    updateNotification(0, "正在加载");
}
 
源代码3 项目: Reactive-Android-Programming   文件: Sandbox.java
private static void demo2() throws Exception {
    final ExecutorService executor = Executors.newFixedThreadPool(1000);
    final Scheduler pooledScheduler = Schedulers.from(executor);

    Observable.range(1, 10000)
            .flatMap(i -> Observable.just(i)
                    .subscribeOn(pooledScheduler)
                    .map(Sandbox::importantLongTask)
            )
            .doOnTerminate(WAIT_LATCH::countDown)
            .map(Objects::toString)
            .subscribe(e -> log("subscribe", e));

    WAIT_LATCH.await();
    executor.shutdown();
}
 
源代码4 项目: MyBookshelf   文件: CheckSourceService.java
@Override
public void onCreate() {
    super.onCreate();
    SharedPreferences preference = MApplication.getConfigPreferences();
    checkSourceListener = new CheckSourceListener() {
        @Override
        public void nextCheck() {
            CheckSourceService.this.nextCheck();
        }

        @Override
        public void compositeDisposableAdd(Disposable disposable) {
            compositeDisposable.add(disposable);
        }

        @Override
        public int getCheckIndex() {
            return checkIndex;
        }
    };
    threadsNum = preference.getInt(this.getString(R.string.pk_threads_num), 6);
    executorService = Executors.newFixedThreadPool(threadsNum);
    scheduler = Schedulers.from(executorService);
    compositeDisposable = new CompositeDisposable();
    updateNotification(0, "正在加载");
}
 
源代码5 项目: a   文件: SearchBookModel.java
public SearchBookModel(OnSearchListener searchListener, List<BookSourceBean> sourceBeanList) {
    this.searchListener = searchListener;
    threadsNum = MApplication.getConfigPreferences().getInt(MApplication.getInstance().getString(R.string.pk_threads_num), 6);
    executorService = Executors.newFixedThreadPool(threadsNum);
    scheduler = Schedulers.from(executorService);
    compositeDisposable = new CompositeDisposable();
    if (sourceBeanList == null) {
        initSearchEngineS(BookSourceManager.getSelectedBookSource());
    } else {
        initSearchEngineS(sourceBeanList);
    }
}
 
源代码6 项目: akarnokd-misc   文件: CustomSchedulerTimeout.java
@Test
public void test() throws Exception {
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
    Scheduler schedulerFromExecutor = Schedulers.from(scheduledExecutorService);

    Flowable<Integer> results = Flowable.fromIterable(()-> {

        return new Iterator<Integer>() {
            @Override
            public boolean hasNext()
            {
                try
                {
                    Thread.sleep(30000);
                    return false;
                }
                catch (InterruptedException e)
                {
                    System.out.println("Interrupted! " + e);
                    return true;
                }
            }

            @Override
            public Integer next()
            {
                return 2;
            }
        };

    }).subscribeOn(schedulerFromExecutor);//change to Schedulers.io() to make it work.

    results.timeout(1000, TimeUnit.MILLISECONDS, Schedulers.single(), Flowable.error(new TimeoutException("Timed out")))
            .doOnTerminate(()-> System.out.println("Finished"))
            .subscribe(r-> System.out.println("Got " + r), e-> System.out.println("Error " + e));

    Thread.sleep(200000);
}
 
源代码7 项目: Reactive-Android-Programming   文件: Sandbox.java
private static void demo3() {
    final ExecutorService executor = Executors.newFixedThreadPool(10);
    final Scheduler pooledScheduler = Schedulers.from(executor);

    Observable.range(1, 100)
            .subscribeOn(pooledScheduler)
            .map(Objects::toString)
            .subscribe(e -> log("subscribe", e));
}
 
ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize,
        ShardConsumer shardConsumer, int readTimeoutsToIgnoreBeforeWarning) {
    this.recordsPublisher = recordsPublisher;
    this.scheduler = Schedulers.from(executorService);
    this.bufferSize = bufferSize;
    this.shardConsumer = shardConsumer;
    this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning;
}
 
@Test
void custom_Scheduler_test2() {
    Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
    Observable.just("Apple", "Orange", "Appla")
              .subscribeOn(scheduler)
              .map(ConcurrencyTest::delayCalculation)
              .observeOn(scheduler)
              .map(String::length)
              .subscribe(ConcurrencyTest::log);
    sleep(10000);
}
 
源代码10 项目: HaoReader   文件: DownloadService.java
@Override
public void onCreate() {
    super.onCreate();
    running = true;
    threadsNum = AppConfigHelper.get().getInt(this.getString(R.string.pk_threads_num), 4);
    executor = Executors.newFixedThreadPool(threadsNum);
    scheduler = Schedulers.from(executor);
    managerCompat = NotificationManagerCompat.from(this);
}
 
@Incoming("hello")
@Outgoing("out")
public Flowable<String> consume(Flowable<String> values) {
    Scheduler scheduler = Schedulers.from(executor);
    return values
            .observeOn(scheduler)
            .delay(1, TimeUnit.MILLISECONDS, scheduler)
            .doOnError(err -> {
                downstreamFailure = err;
            });
}
 
源代码12 项目: rxjava2-jdbc   文件: DatabaseTest.java
@Test(timeout = 5000)
public void testSelectConcurrencyTest() throws InterruptedException, TimeoutException {
    debug();
    try {
        try (Database db = db(1)) {
            Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(2));
            int n = 2;
            CountDownLatch latch = new CountDownLatch(n);
            AtomicInteger count = new AtomicInteger();
            for (int i = 0; i < n; i++) {
                db.select("select score from person where name=?") //
                        .parameters("FRED", "JOSEPH") //
                        .getAs(Integer.class) //
                        .subscribeOn(scheduler) //
                        .toList() //
                        .doOnSuccess(x -> {
                            if (!x.equals(Lists.newArrayList(21, 34))) {
                                throw new RuntimeException("run broken");
                            }
                        }) //
                        .doOnSuccess(x -> {
                            count.incrementAndGet();
                            latch.countDown();
                        }) //
                        .doOnError(x -> latch.countDown()) //
                        .subscribe();
                log.info("submitted " + i);
            }
            if (!latch.await(5000, TimeUnit.SECONDS)) {
                throw new TimeoutException("timeout");
            }
            assertEquals(n, count.get());
        }
    } finally {
        debug();
    }
}
 
@Incoming("hello")
@Outgoing("out")
public Flowable<String> consume(Flowable<String> values) {
    Scheduler scheduler = Schedulers.from(executor);
    return values
            .observeOn(scheduler)
            .delay(1, TimeUnit.MILLISECONDS, scheduler)
            .doOnError(err -> downstreamFailure = err);
}
 
源代码14 项目: Collection-Android   文件: RxSchedulerUtils.java
public static Scheduler io(Executor executor) {
    return executor != null ? Schedulers.from(executor) : Schedulers.io();
}
 
源代码15 项目: HaoReader   文件: RxExecutors.java
public static Scheduler newScheduler(int nThreads) {
    ExecutorService service = Executors.newFixedThreadPool(nThreads, THREAD_FACTORY);
    return Schedulers.from(service);
}
 
源代码16 项目: state-machine   文件: StreamingTest.java
@Test
public void testJsonInputToStateMachineIssue1() throws InterruptedException {

    // JSON source stream (could contain other messages about other
    // Microwaves with different ids which will be processed concurrently by
    // the Processor)
    Flowable<String> messages = Flowable.just(
            "{\"cls\": \"Microwave\", \"id\": \"1\",\"event\": \"ButtonPressed\"}",
            "{\"cls\": \"Microwave\", \"id\": \"1\",\"event\": \"DoorOpened\"}",
            "{\"cls\": \"Microwave\", \"id\": \"1\",\"event\": \"ButtonPressed\"}");

    Flowable<Signal<?, String>> signals = messages //
            .map(msg -> toSignal(msg));

    // special scheduler that we will use to schedule signals and to process
    // events
    Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(3));

    // create the signal processor
    Processor<String> processor = createProcessor(scheduler, signals);

    // using a test subscriber because has easy assert methods on it
    TestSubscriber<Object> ts = TestSubscriber.create();

    // subscribe to the stream of entity states that is produced from the
    // signals stream
    processor //
            .flowable() //
            // just output the states
            .map(esm -> esm.state()) //
            .subscribe(ts);

    // wait for processing to finish (is running asynchronously using the
    // scheduler)
    Thread.sleep(1000);

    // assert that things happened as we expected
    ts.assertValues( //
            MicrowaveStateMachine.State.COOKING,
            MicrowaveStateMachine.State.COOKING_INTERRUPTED,
            MicrowaveStateMachine.State.COOKING_INTERRUPTED);

}
 
源代码17 项目: MyBookshelf   文件: PageLoaderNet.java
PageLoaderNet(PageView pageView, BookShelfBean bookShelfBean, Callback callback) {
    super(pageView, bookShelfBean, callback);
    executorService = Executors.newFixedThreadPool(20);
    scheduler = Schedulers.from(executorService);
}
 
private Scheduler getScheduler(ExecutorService executorService) {
    return Schedulers.from(executorService);
}
 
源代码19 项目: MyBookshelf   文件: UpLastChapterModel.java
private UpLastChapterModel() {
    executorService = Executors.newFixedThreadPool(5);
    scheduler = Schedulers.from(executorService);
    compositeDisposable = new CompositeDisposable();
    searchBookBeanList = new ArrayList<>();
}
 
源代码20 项目: NYBus   文件: SchedulerProviderImpl.java
/**
 * Provides the executor thread Scheduler.
 *
 * @return provides the executor thread Scheduler.
 */
@Override
public Scheduler provideExecutorScheduler() {
    return Schedulers.from(Executors.newCachedThreadPool());
}