下面列出了怎么用io.reactivex.Scheduler的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* 用于处理订阅事件在那个线程中执行
*
* @param observable d
* @param subscriberMethod d
* @return Observable
*/
private Flowable postToObservable(Flowable observable, SubscriberMethod subscriberMethod) {
Scheduler scheduler;
switch (subscriberMethod.threadMode) {
case MAIN:
scheduler = AndroidSchedulers.mainThread();
break;
case NEW_THREAD:
scheduler = Schedulers.newThread();
break;
case CURRENT_THREAD:
scheduler = Schedulers.trampoline();
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
}
return observable.observeOn(scheduler);
}
@Inject
DisconnectOperation(
RxBleGattCallback rxBleGattCallback,
BluetoothGattProvider bluetoothGattProvider,
@Named(DeviceModule.MAC_ADDRESS) String macAddress,
BluetoothManager bluetoothManager,
@Named(ClientComponent.NamedSchedulers.BLUETOOTH_INTERACTION) Scheduler bluetoothInteractionScheduler,
@Named(DeviceModule.DISCONNECT_TIMEOUT) TimeoutConfiguration timeoutConfiguration,
ConnectionStateChangeListener connectionStateChangeListener) {
this.rxBleGattCallback = rxBleGattCallback;
this.bluetoothGattProvider = bluetoothGattProvider;
this.macAddress = macAddress;
this.bluetoothManager = bluetoothManager;
this.bluetoothInteractionScheduler = bluetoothInteractionScheduler;
this.timeoutConfiguration = timeoutConfiguration;
this.connectionStateChangeListener = connectionStateChangeListener;
}
private static void demo2() throws Exception {
final ExecutorService executor = Executors.newFixedThreadPool(1000);
final Scheduler pooledScheduler = Schedulers.from(executor);
Observable.range(1, 10000)
.flatMap(i -> Observable.just(i)
.subscribeOn(pooledScheduler)
.map(Sandbox::importantLongTask)
)
.doOnTerminate(WAIT_LATCH::countDown)
.map(Object::toString)
.subscribe(e -> log("subscribe", e));
WAIT_LATCH.await();
executor.shutdown();
}
CharacteristicLongWriteOperation(
BluetoothGatt bluetoothGatt,
RxBleGattCallback rxBleGattCallback,
@Named(ClientComponent.NamedSchedulers.BLUETOOTH_INTERACTION) Scheduler bluetoothInteractionScheduler,
@Named(ConnectionModule.OPERATION_TIMEOUT) TimeoutConfiguration timeoutConfiguration,
BluetoothGattCharacteristic bluetoothGattCharacteristic,
PayloadSizeLimitProvider batchSizeProvider,
WriteOperationAckStrategy writeOperationAckStrategy,
WriteOperationRetryStrategy writeOperationRetryStrategy,
byte[] bytesToWrite) {
this.bluetoothGatt = bluetoothGatt;
this.rxBleGattCallback = rxBleGattCallback;
this.bluetoothInteractionScheduler = bluetoothInteractionScheduler;
this.timeoutConfiguration = timeoutConfiguration;
this.bluetoothGattCharacteristic = bluetoothGattCharacteristic;
this.batchSizeProvider = batchSizeProvider;
this.writeOperationAckStrategy = writeOperationAckStrategy;
this.writeOperationRetryStrategy = writeOperationRetryStrategy;
this.bytesToWrite = bytesToWrite;
}
private void whenError(Scheduler scheduler) {
if (!isDownloading) {
return;
}
if (isFinishing()) {
stopDownload();
if (downloadBook.getSuccessCount() == 0) {
onDownloadError(downloadBook);
} else {
onDownloadComplete(downloadBook);
}
} else {
toDownload(scheduler);
}
}
RxJava2CachedCallAdapter(Cache<String, byte[]> cachingSystem, Type responseType, Scheduler scheduler, Retrofit retrofit, Annotation[] annotations,
boolean mAsync, boolean mResult, boolean mBody, boolean mFlowable, boolean mSingle, boolean mMaybe,
boolean mCompletable) {
this.mCachingSystem = cachingSystem;
this.mResponseType = responseType;
this.mScheduler = scheduler;
this.mRetrofit = retrofit;
this.mAnnotations = annotations;
this.mAsync = mAsync;
this.mResult = mResult;
this.mBody = mBody;
this.mFlowable = mFlowable;
this.mSingle = mSingle;
this.mMaybe = mMaybe;
this.mCompletable = mCompletable;
}
@Override
protected void starting(Description description) {
if (restoreHandlers) {
// https://github.com/ReactiveX/RxAndroid/pull/358
// originalInitMainThreadInitHandler =
// RxAndroidPlugins.getInitMainThreadScheduler();
// originalMainThreadHandler = RxAndroidPlugins.getMainThreadScheduler();
}
RxAndroidPlugins.reset();
RxAndroidPlugins.setInitMainThreadSchedulerHandler(
new Function<Callable<Scheduler>, Scheduler>() {
@Override
public Scheduler apply(Callable<Scheduler> schedulerCallable) throws Exception {
return delegatingMainThreadScheduler;
}
});
RxAndroidPlugins.setMainThreadSchedulerHandler(
new Function<Scheduler, Scheduler>() {
@Override
public Scheduler apply(Scheduler scheduler) throws Exception {
return delegatingMainThreadScheduler;
}
});
}
/**
* 用于处理订阅事件在那个线程中执行
*
* @param observable d
* @param subscriberMethod d
* @return Observable
*/
private Flowable postToObservable(Flowable observable, SubscriberMethod subscriberMethod) {
Scheduler scheduler;
switch (subscriberMethod.threadMode) {
case MAIN:
scheduler = AndroidSchedulers.mainThread();
break;
case NEW_THREAD:
scheduler = Schedulers.newThread();
break;
case CURRENT_THREAD:
scheduler = Schedulers.trampoline();
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscriberMethod.threadMode);
}
return observable.observeOn(scheduler);
}
@CheckResult
@NonNull
public static <T> Flowable<T> subscribeOn(
@NonNull StorIOContentResolver storIOContentResolver,
@NonNull Flowable<T> flowable
) {
final Scheduler scheduler = storIOContentResolver.defaultRxScheduler();
return scheduler != null ? flowable.subscribeOn(scheduler) : flowable;
}
public static void setupRxjava() {
TestScheduler testScheduler = new TestScheduler();
RxJavaPlugins.setComputationSchedulerHandler(new Function<Scheduler, Scheduler>() {
@Override
public Scheduler apply(Scheduler scheduler) throws Exception {
return testScheduler;
}
});
}
@Test
void custom_Scheduler_test() {
Scheduler customScheduler = custom_Scheduler();
Observable.just("Apple", "Orange", "Appla")
.subscribeOn(customScheduler)
.map(ConcurrencyTest::delayCalculation)
.observeOn(customScheduler)
.map(String::length)
.subscribe(ConcurrencyTest::log);
sleep(10000);
}
@Inject
SendAssetInteractor(@Named(ApplicationModule.JOB) Scheduler jobScheduler,
@Named(ApplicationModule.UI) Scheduler uiScheduler,
ManagedChannel managedChannel, PreferencesUtil preferencesUtil) {
super(jobScheduler, uiScheduler);
this.channel = managedChannel;
this.preferenceUtils = preferencesUtil;
}
public static Database create(int maxSize, boolean big, Scheduler scheduler) {
NonBlockingConnectionPool pool = Pools.nonBlocking() //
.connectionProvider(connectionProvider(nextUrl(), big)) //
.maxPoolSize(maxSize) //
.scheduler(scheduler) //
.build();
return Database.from(pool, () -> {
pool.close();
scheduler.shutdown();
});
}
@Inject
GetAccountTransactionsInteractor(@Named(ApplicationModule.JOB) Scheduler jobScheduler,
@Named(ApplicationModule.UI) Scheduler uiScheduler,
PreferencesUtil preferenceUtils, ManagedChannel channel) {
super(jobScheduler, uiScheduler);
this.preferenceUtils = preferenceUtils;
this.channel = channel;
}
public AVObjectAsyncTest(String testName) {
super(testName);
AppConfiguration.config(true, new AppConfiguration.SchedulerCreator() {
public Scheduler create() {
return Schedulers.newThread();
}
});
Configure.initializeRuntime();
}
BriteContentResolver(ContentResolver contentResolver, Logger logger, Scheduler scheduler,
ObservableTransformer<Query, Query> queryTransformer) {
this.contentResolver = contentResolver;
this.logger = logger;
this.scheduler = scheduler;
this.queryTransformer = queryTransformer;
}
@NonNull
@Override
public Observable<Void> asObservable(BluetoothGatt bluetoothGatt,
RxBleGattCallback rxBleGattCallback,
Scheduler scheduler) throws Throwable {
return Observable.fromCallable(() -> refreshDeviceCache(bluetoothGatt))
.delay(delay_ms, TimeUnit.MILLISECONDS, Schedulers.computation())
.subscribeOn(scheduler);
}
public AdamantApiWrapper(AdamantApiBuilder apiBuilder, AdamantKeyGenerator keyGenerator, Scheduler scheduler) {
this.apiBuilder = apiBuilder;
this.keyGenerator = keyGenerator;
this.scheduler = scheduler;
buildApi();
}
public SetOperationTests(String testName) {
super(testName);
AppConfiguration.config(true, new AppConfiguration.SchedulerCreator() {
public Scheduler create() {
return Schedulers.newThread();
}
});
Configure.initializeRuntime();
}
@Incoming("hello")
@Outgoing("out")
public Flowable<String> consume(Flowable<String> values) {
Scheduler scheduler = Schedulers.from(executor);
return values
.observeOn(scheduler)
.delay(1, TimeUnit.MILLISECONDS, scheduler)
.doOnError(err -> {
downstreamFailure = err;
});
}
BriteDatabase(SupportSQLiteOpenHelper helper, Logger logger, Scheduler scheduler,
ObservableTransformer<Query, Query> queryTransformer) {
this.helper = helper;
this.logger = logger;
this.scheduler = scheduler;
this.queryTransformer = queryTransformer;
}
@CheckResult
@NonNull
public static Completable subscribeOn(
@NonNull StorIOSQLite storIOSQLite,
@NonNull Completable completable
) {
final Scheduler scheduler = storIOSQLite.defaultRxScheduler();
return scheduler != null ? completable.subscribeOn(scheduler) : completable;
}
static Scheduler applyRequireNonNull(Function<Callable<Scheduler>, Scheduler> f, Callable<Scheduler> s) {
Scheduler scheduler = apply(f,s);
if (scheduler == null) {
throw new NullPointerException("Scheduler Callable returned null");
}
return scheduler;
}
/**
* Buffers the source {@link Flowable} into {@link List}s, emitting Lists when
* the size of a list reaches {@code maxSize} or if the elapsed time since last
* emission from the source reaches the given duration.
*
* @param maxSize
* max size of emitted lists
* @param duration
* function that based on the last emission calculates the elapsed
* time to be used before emitting a buffered list
* @param unit
* unit of {@code duration}
* @param scheduler
* scheduler to use to schedule emission of a buffer (as a list) if
* the time since last emission from the source reaches duration
* @param <T>
* type of the source stream items
* @return source with operator applied
*/
public static <T> FlowableTransformer<T, List<T>> buffer(final int maxSize,
final Function<? super T, ? extends Long> duration, final TimeUnit unit, final Scheduler scheduler) {
final BiPredicate<List<T>, MyOptional<T>> condition = new BiPredicate<List<T>, MyOptional<T>>() {
@Override
public boolean test(List<T> list, MyOptional<T> x) throws Exception {
return list.size() < maxSize && x.isPresent();
}
};
Function<MyOptional<T>, Long> timeout = new Function<MyOptional<T>, Long>() {
@Override
public Long apply(MyOptional<T> t) throws Exception {
return duration.apply(t.get());
}
};
final FlowableTransformer<MyOptional<T>, MyOptional<T>> insert = insert(timeout, unit,
Functions.constant(MyOptional.<T>empty()), scheduler);
final FlowableTransformer<MyOptional<T>, List<T>> collectWhile = collectWhile( //
// factory
ListFactoryHolder.<T>factory(), //
// add function
MyOptional.<T>addIfPresent(), //
// condition
condition);
return new FlowableTransformer<T, List<T>>() {
@Override
public Publisher<List<T>> apply(Flowable<T> source) {
return source //
.map(MyOptional.<T>of()) //
.compose(insert) //
.compose(collectWhile)
// need this filter because sometimes nothing gets added to list
.filter(MyOptional.<T>listHasElements()); //
}
};
}
private Observable wrapObservableInBackground(Observable observable) {
if (null == observable) {
return null;
}
Scheduler scheduler = Schedulers.io();
if (asynchronized) {
observable = observable.subscribeOn(scheduler);
}
if (null != defaultCreator) {
observable = observable.observeOn(scheduler);
}
return observable;
}
@Test public void runningWorkReportsBusy() {
Scheduler.Worker worker = scheduler.createWorker();
worker.schedule(new Runnable() {
@Override public void run() {
assertBusy();
}
});
delegate.triggerActions();
}
private MessageGroupSubscriber(long messageTimeoutNanos,
int bufferedMaxMessages,
Sender<M, R> sender,
Scheduler scheduler) {
this.messageTimeoutNanos = messageTimeoutNanos == 0 ? Long.MAX_VALUE : messageTimeoutNanos;
this.bufferedMaxMessages = bufferedMaxMessages;
this.sender = sender;
this.scheduler = scheduler;
}
@NonNull
@Override
public Observable<Void> asObservable(BluetoothGatt bluetoothGatt,
RxBleGattCallback rxBleGattCallback,
Scheduler scheduler) throws Throwable {
return Observable.fromCallable(() -> refreshDeviceCache(bluetoothGatt))
.delay(delay_ms, TimeUnit.MILLISECONDS, Schedulers.computation())
.subscribeOn(scheduler);
}
public static void main(String[] args) throws Exception {
Scheduler scheduler = Schedulers.from(newFixedThreadPool(10, threadFactory));
CountDownLatch latch = new CountDownLatch(1);
// Synchronous emission
Observable<Object> observable = Observable.create(emitter -> {
for (String superHero : SUPER_HEROES) {
log("Emitting: " + superHero);
emitter.onNext(superHero);
}
log("Completing");
emitter.onComplete();
});
log("---------------- Subscribing");
observable
.subscribeOn(scheduler)
.subscribe(
item -> log("Received " + item),
error -> log("Error"),
() -> {
log("Complete");
latch.countDown();
});
log("---------------- Subscribed");
latch.await();
}
@NonNull
Scheduler io();