下面列出了怎么用io.reactivex.Scheduler.Worker的API类实例代码及写法,或者点击链接到github查看源代码。
public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads, long timeout, TimeUnit unit) {
final CountDownLatch latch = new CountDownLatch(numThreads);
for (int i = 1; i <= numThreads; i++) {
final Worker worker = scheduler.createWorker();
worker.schedule(new Runnable() {
@Override
public void run() {
worker.dispose();
latch.countDown();
}
});
}
try {
boolean finished = latch.await(timeout, unit);
if (!finished) {
throw new RuntimeException("timeout occured waiting for work to finish");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void applySignalsToOthers(ClassId<?, Id> cid, Worker worker, Signals<Id> signals) {
Signal<?, Id> signal;
while ((signal = signals.signalsToOther.pollFirst()) != null) {
Signal<?, Id> s = signal;
if (signal.isImmediate()) {
subject.onNext(signal);
} else if (signal.event() instanceof CancelTimedSignal) {
cancel(signal);
} else {
long delayMs = signal.time().get() - worker.now(TimeUnit.MILLISECONDS);
if (delayMs <= 0) {
subject.onNext(signal);
} else {
scheduleSignal(cid, worker, signal, s, delayMs);
}
}
}
}
private void scheduleSignal(ClassId<?, Id> from, Worker worker, Signal<?, Id> signal,
Signal<?, Id> s, long delayMs) {
// record pairwise signal so we can cancel it if
// desired
@SuppressWarnings({ "unchecked", "rawtypes" })
ClassIdPair<Id> idPair = new ClassIdPair<Id>(from,
new ClassId(signal.cls(), signal.id()));
long t1 = signalScheduler.now(TimeUnit.MILLISECONDS);
Disposable subscription = worker.schedule(() -> {
subject.onNext(s.now());
} , delayMs, TimeUnit.MILLISECONDS);
long t2 = signalScheduler.now(TimeUnit.MILLISECONDS);
worker.schedule(() -> {
subscriptions.remove(idPair);
} , delayMs - (t2 - t1), TimeUnit.MILLISECONDS);
Disposable previous = subscriptions.put(idPair, subscription);
if (previous != null) {
previous.dispose();
}
}
private boolean tryEmit(Observers<T> obs, DecoratingMember<T> m) {
// get a fresh worker each time so we jump threads to
// break the stack-trace (a long-enough chain of
// checkout-checkins could otherwise provoke stack
// overflow)
// advance counter to the next and choose an Observer to emit to (round robin)
int index = obs.index;
MemberSingleObserver<T> o = obs.observers[index];
MemberSingleObserver<T> oNext = o;
// atomically bump up the index (if that entry has not been deleted in
// the meantime by disposal)
while (true) {
Observers<T> x = observers.get();
if (x.index == index && x.observers[index] == o) {
boolean[] active = new boolean[x.active.length];
System.arraycopy(x.active, 0, active, 0, active.length);
int nextIndex = (index + 1) % active.length;
while (nextIndex != index && !active[nextIndex]) {
nextIndex = (nextIndex + 1) % active.length;
}
active[nextIndex] = false;
if (observers.compareAndSet(x,
new Observers<T>(x.observers, active, x.activeCount - 1, nextIndex, x.requested - 1))) {
oNext = x.observers[nextIndex];
break;
}
} else {
// checkin because no active observers
m.checkin();
return false;
}
}
Worker worker = scheduler.createWorker();
worker.schedule(new Emitter<T>(worker, oNext, m));
return true;
}
@Override
protected void subscribeActual(Subscriber<? super T> child) {
PagedQueue queue = new PagedQueue(options.fileFactory(), options.pageSizeBytes());
Worker worker = options.scheduler().createWorker();
if (source != null) {
source.subscribe(
new BufferToFileSubscriberFlowable<T>(child, queue, serializer, worker));
} else {
source2.subscribe(
new BufferToFileSubscriberObservable<T>(child, queue, serializer, worker));
}
}
BufferToFileSubscriber(Subscriber<? super T> child, PagedQueue queue,
Serializer<T> serializer, Worker worker) {
this.child = child;
this.queue = queue;
this.serializer = serializer;
this.worker = worker;
}
@Test
public void testDispose() {
Scheduler s = SchedulerHelper.withThreadId(Schedulers.trampoline(), "boo");
Worker w = s.createWorker();
Assert.assertFalse(w.isDisposed());
w.dispose();
Assert.assertTrue(w.isDisposed());
}
@Test
public void testPollQueueThrowsExceptionEmitsError() {
PagedQueue queue = Mockito.mock(PagedQueue.class);
RuntimeException err = new RuntimeException();
Mockito.doThrow(err).when(queue).poll();
Worker worker = Schedulers.trampoline().createWorker();
TestSubscriber<String> ts = TestSubscriber.create(1);
BufferToFileSubscriberFlowable<String> b = new BufferToFileSubscriberFlowable<String>(ts, queue,
Serializers.utf8(), worker);
b.onSubscribe(IGNORE);
b.request(1);
b.run();
Mockito.verify(queue, Mockito.atLeastOnce()).poll();
ts.assertError(err);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Flowable<EntityStateMachine<?, Id>> flowable() {
return Flowable.defer(() -> {
Worker worker = signalScheduler.createWorker();
Flowable<Signal<?, Id>> o0 = subject //
.toSerialized() //
.toFlowable(BackpressureStrategy.BUFFER) //
.mergeWith(signals) //
.doOnCancel(() -> worker.dispose()) //
.compose(preGroupBy);
Flowable<GroupedFlowable<ClassId<?, Id>, Signal<?, Id>>> o;
if (mapFactory != null) {
o = o0.groupBy(signal -> new ClassId(signal.cls(),
signal.id()), x -> x, true, 16, mapFactory);
} else {
o = o0.groupBy(signal -> new ClassId(signal.cls(), signal.id()),
Functions.identity());
}
return o.flatMap(g -> {
Flowable<EntityStateMachine<?, Id>> obs = g //
.flatMap(processSignalsToSelfAndSendSignalsToOthers(worker, g.getKey())) //
.doOnNext(m -> stateMachines.put(g.getKey(), m)) //
.subscribeOn(processingScheduler); //
Flowable<EntityStateMachine<?, Id>> res = entityTransform
.apply(grouped(g.getKey(), obs));
return res;
});
});
}
private Flowable<EntityStateMachine<?, Id>> process(ClassId<?, Id> classId, Event<?> event,
Worker worker) {
EntityStateMachine<?, Id> machine = getStateMachine(classId.cls(), classId.id());
TransitionHandler handler = new TransitionHandler(classId, event, worker, machine);
return Flowable.generate(handler, handler);
}
TransitionHandler(ClassId<?, Id> classId, Event<?> event, Worker worker,
EntityStateMachine<?, Id> machine) {
this.classId = classId;
this.event = event;
this.worker = worker;
this.machine = machine;
}
ConditionalCompactorSubscriber(Subscriber<? super String> actual, long timeout, TimeUnit unit, Worker worker) {
this.actual = actual;
this.worker = worker;
this.timeout = timeout;
this.unit = unit;
this.batch = new ArrayList<>();
this.wip = new AtomicInteger();
this.mas = new SerialDisposable();
this.mas.set(NO_TIMER);
this.queue = new ConcurrentLinkedQueue<>();
}
ThrottleFirstSampleObserver(Observer<? super T> actual, long time, TimeUnit unit, Worker worker) {
this.actual = actual;
this.time = time;
this.unit = unit;
this.worker = worker;
this.queue = new ConcurrentLinkedQueue<>();
}
LockstepObserveOnSubscriber(Subscriber<? super T> actual, Worker worker) {
this.actual = actual;
this.worker = worker;
this.item = new AtomicReference<>();
this.requested = new AtomicLong();
this.wip = new AtomicInteger();
}
Emitter(Worker worker, MemberSingleObserver<T> observer, Member<T> m) {
this.worker = worker;
this.observer = observer;
this.m = m;
}
@VisibleForTesting
public BufferToFileSubscriberFlowable(Subscriber<? super T> child, PagedQueue queue,
Serializer<T> serializer, Worker worker) {
super(child, queue, serializer, worker);
}
BufferToFileSubscriberObservable(Subscriber<? super T> child, PagedQueue queue,
Serializer<T> serializer, Worker worker) {
super(child, queue, serializer, worker);
}
private Function<? super Signal<?, Id>, Flowable<EntityStateMachine<?, Id>>> processSignalsToSelfAndSendSignalsToOthers(
Worker worker, ClassId<?, Id> classId) {
return signal -> process(classId, signal.event(), worker) //
.toList() //
.toFlowable().flatMapIterable(Functions.identity());
}
RequestObserveOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker) {
this.actual = actual;
this.worker = worker;
this.requestOne = () -> upstream.request(1L);
}
@Test
public void race() throws Exception {
Worker w = Schedulers.newThread().createWorker();
try {
for (int i = 0; i < 1000; i++) {
Integer[] value = { 0, 0 };
TestObserver<Integer> to = new TestObserver<Integer>() {
@Override
public void onSuccess(Integer v) {
value[1] = value[0];
super.onSuccess(v);
}
};
SingleSubject<Integer> subj = SingleSubject.create();
subj.observeOn(Schedulers.single())
.onTerminateDetach()
.subscribe(to);
AtomicInteger wip = new AtomicInteger(2);
CountDownLatch cdl = new CountDownLatch(2);
w.schedule(() -> {
if (wip.decrementAndGet() != 0) {
while (wip.get() != 0);
}
subj.onSuccess(1);
cdl.countDown();
});
Schedulers.single().scheduleDirect(() -> {
if (wip.decrementAndGet() != 0) {
while (wip.get() != 0);
}
to.cancel();
value[0] = null;
cdl.countDown();
});
cdl.await();
Assert.assertNotNull(value[1]);
}
} finally {
w.dispose();
}
}