下面列出了io.reactivex.schedulers.Schedulers#from ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void onCreate() {
super.onCreate();
isRunning = true;
//创建 Notification.Builder 对象
NotificationCompat.Builder builder = new NotificationCompat.Builder(this, MApplication.channelIdDownload)
.setSmallIcon(R.drawable.ic_download)
.setLargeIcon(BitmapFactory.decodeResource(getResources(), R.mipmap.ic_launcher))
.setOngoing(false)
.setContentTitle(getString(R.string.download_offline_t))
.setContentText(getString(R.string.download_offline_s));
//发送通知
Notification notification = builder.build();
startForeground(notificationId, notification);
SharedPreferences preferences = getSharedPreferences("CONFIG", 0);
threadsNum = preferences.getInt(this.getString(R.string.pk_threads_num), 4);
executor = Executors.newFixedThreadPool(threadsNum);
scheduler = Schedulers.from(executor);
}
@Override
public void onCreate() {
super.onCreate();
SharedPreferences preference = MApplication.getConfigPreferences();
checkSourceListener = new CheckSourceListener() {
@Override
public void nextCheck() {
CheckSourceService.this.nextCheck();
}
@Override
public void compositeDisposableAdd(Disposable disposable) {
compositeDisposable.add(disposable);
}
@Override
public int getCheckIndex() {
return checkIndex;
}
};
threadsNum = preference.getInt(this.getString(R.string.pk_threads_num), 6);
executorService = Executors.newFixedThreadPool(threadsNum);
scheduler = Schedulers.from(executorService);
compositeDisposable = new CompositeDisposable();
updateNotification(0, "正在加载");
}
private static void demo2() throws Exception {
final ExecutorService executor = Executors.newFixedThreadPool(1000);
final Scheduler pooledScheduler = Schedulers.from(executor);
Observable.range(1, 10000)
.flatMap(i -> Observable.just(i)
.subscribeOn(pooledScheduler)
.map(Sandbox::importantLongTask)
)
.doOnTerminate(WAIT_LATCH::countDown)
.map(Objects::toString)
.subscribe(e -> log("subscribe", e));
WAIT_LATCH.await();
executor.shutdown();
}
@Override
public void onCreate() {
super.onCreate();
SharedPreferences preference = MApplication.getConfigPreferences();
checkSourceListener = new CheckSourceListener() {
@Override
public void nextCheck() {
CheckSourceService.this.nextCheck();
}
@Override
public void compositeDisposableAdd(Disposable disposable) {
compositeDisposable.add(disposable);
}
@Override
public int getCheckIndex() {
return checkIndex;
}
};
threadsNum = preference.getInt(this.getString(R.string.pk_threads_num), 6);
executorService = Executors.newFixedThreadPool(threadsNum);
scheduler = Schedulers.from(executorService);
compositeDisposable = new CompositeDisposable();
updateNotification(0, "正在加载");
}
public SearchBookModel(OnSearchListener searchListener, List<BookSourceBean> sourceBeanList) {
this.searchListener = searchListener;
threadsNum = MApplication.getConfigPreferences().getInt(MApplication.getInstance().getString(R.string.pk_threads_num), 6);
executorService = Executors.newFixedThreadPool(threadsNum);
scheduler = Schedulers.from(executorService);
compositeDisposable = new CompositeDisposable();
if (sourceBeanList == null) {
initSearchEngineS(BookSourceManager.getSelectedBookSource());
} else {
initSearchEngineS(sourceBeanList);
}
}
@Test
public void test() throws Exception {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
Scheduler schedulerFromExecutor = Schedulers.from(scheduledExecutorService);
Flowable<Integer> results = Flowable.fromIterable(()-> {
return new Iterator<Integer>() {
@Override
public boolean hasNext()
{
try
{
Thread.sleep(30000);
return false;
}
catch (InterruptedException e)
{
System.out.println("Interrupted! " + e);
return true;
}
}
@Override
public Integer next()
{
return 2;
}
};
}).subscribeOn(schedulerFromExecutor);//change to Schedulers.io() to make it work.
results.timeout(1000, TimeUnit.MILLISECONDS, Schedulers.single(), Flowable.error(new TimeoutException("Timed out")))
.doOnTerminate(()-> System.out.println("Finished"))
.subscribe(r-> System.out.println("Got " + r), e-> System.out.println("Error " + e));
Thread.sleep(200000);
}
private static void demo3() {
final ExecutorService executor = Executors.newFixedThreadPool(10);
final Scheduler pooledScheduler = Schedulers.from(executor);
Observable.range(1, 100)
.subscribeOn(pooledScheduler)
.map(Objects::toString)
.subscribe(e -> log("subscribe", e));
}
ShardConsumerSubscriber(RecordsPublisher recordsPublisher, ExecutorService executorService, int bufferSize,
ShardConsumer shardConsumer, int readTimeoutsToIgnoreBeforeWarning) {
this.recordsPublisher = recordsPublisher;
this.scheduler = Schedulers.from(executorService);
this.bufferSize = bufferSize;
this.shardConsumer = shardConsumer;
this.readTimeoutsToIgnoreBeforeWarning = readTimeoutsToIgnoreBeforeWarning;
}
@Test
void custom_Scheduler_test2() {
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
Observable.just("Apple", "Orange", "Appla")
.subscribeOn(scheduler)
.map(ConcurrencyTest::delayCalculation)
.observeOn(scheduler)
.map(String::length)
.subscribe(ConcurrencyTest::log);
sleep(10000);
}
@Override
public void onCreate() {
super.onCreate();
running = true;
threadsNum = AppConfigHelper.get().getInt(this.getString(R.string.pk_threads_num), 4);
executor = Executors.newFixedThreadPool(threadsNum);
scheduler = Schedulers.from(executor);
managerCompat = NotificationManagerCompat.from(this);
}
@Incoming("hello")
@Outgoing("out")
public Flowable<String> consume(Flowable<String> values) {
Scheduler scheduler = Schedulers.from(executor);
return values
.observeOn(scheduler)
.delay(1, TimeUnit.MILLISECONDS, scheduler)
.doOnError(err -> {
downstreamFailure = err;
});
}
@Test(timeout = 5000)
public void testSelectConcurrencyTest() throws InterruptedException, TimeoutException {
debug();
try {
try (Database db = db(1)) {
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(2));
int n = 2;
CountDownLatch latch = new CountDownLatch(n);
AtomicInteger count = new AtomicInteger();
for (int i = 0; i < n; i++) {
db.select("select score from person where name=?") //
.parameters("FRED", "JOSEPH") //
.getAs(Integer.class) //
.subscribeOn(scheduler) //
.toList() //
.doOnSuccess(x -> {
if (!x.equals(Lists.newArrayList(21, 34))) {
throw new RuntimeException("run broken");
}
}) //
.doOnSuccess(x -> {
count.incrementAndGet();
latch.countDown();
}) //
.doOnError(x -> latch.countDown()) //
.subscribe();
log.info("submitted " + i);
}
if (!latch.await(5000, TimeUnit.SECONDS)) {
throw new TimeoutException("timeout");
}
assertEquals(n, count.get());
}
} finally {
debug();
}
}
@Incoming("hello")
@Outgoing("out")
public Flowable<String> consume(Flowable<String> values) {
Scheduler scheduler = Schedulers.from(executor);
return values
.observeOn(scheduler)
.delay(1, TimeUnit.MILLISECONDS, scheduler)
.doOnError(err -> downstreamFailure = err);
}
public static Scheduler io(Executor executor) {
return executor != null ? Schedulers.from(executor) : Schedulers.io();
}
public static Scheduler newScheduler(int nThreads) {
ExecutorService service = Executors.newFixedThreadPool(nThreads, THREAD_FACTORY);
return Schedulers.from(service);
}
@Test
public void testJsonInputToStateMachineIssue1() throws InterruptedException {
// JSON source stream (could contain other messages about other
// Microwaves with different ids which will be processed concurrently by
// the Processor)
Flowable<String> messages = Flowable.just(
"{\"cls\": \"Microwave\", \"id\": \"1\",\"event\": \"ButtonPressed\"}",
"{\"cls\": \"Microwave\", \"id\": \"1\",\"event\": \"DoorOpened\"}",
"{\"cls\": \"Microwave\", \"id\": \"1\",\"event\": \"ButtonPressed\"}");
Flowable<Signal<?, String>> signals = messages //
.map(msg -> toSignal(msg));
// special scheduler that we will use to schedule signals and to process
// events
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(3));
// create the signal processor
Processor<String> processor = createProcessor(scheduler, signals);
// using a test subscriber because has easy assert methods on it
TestSubscriber<Object> ts = TestSubscriber.create();
// subscribe to the stream of entity states that is produced from the
// signals stream
processor //
.flowable() //
// just output the states
.map(esm -> esm.state()) //
.subscribe(ts);
// wait for processing to finish (is running asynchronously using the
// scheduler)
Thread.sleep(1000);
// assert that things happened as we expected
ts.assertValues( //
MicrowaveStateMachine.State.COOKING,
MicrowaveStateMachine.State.COOKING_INTERRUPTED,
MicrowaveStateMachine.State.COOKING_INTERRUPTED);
}
PageLoaderNet(PageView pageView, BookShelfBean bookShelfBean, Callback callback) {
super(pageView, bookShelfBean, callback);
executorService = Executors.newFixedThreadPool(20);
scheduler = Schedulers.from(executorService);
}
private Scheduler getScheduler(ExecutorService executorService) {
return Schedulers.from(executorService);
}
private UpLastChapterModel() {
executorService = Executors.newFixedThreadPool(5);
scheduler = Schedulers.from(executorService);
compositeDisposable = new CompositeDisposable();
searchBookBeanList = new ArrayList<>();
}
/**
* Provides the executor thread Scheduler.
*
* @return provides the executor thread Scheduler.
*/
@Override
public Scheduler provideExecutorScheduler() {
return Schedulers.from(Executors.newCachedThreadPool());
}