类io.reactivex.Scheduler.Worker源码实例Demo

下面列出了怎么用io.reactivex.Scheduler.Worker的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: rxjava2-extras   文件: SchedulerHelper.java
public static void blockUntilWorkFinished(Scheduler scheduler, int numThreads, long timeout, TimeUnit unit) {
    final CountDownLatch latch = new CountDownLatch(numThreads);
    for (int i = 1; i <= numThreads; i++) {
        final Worker worker = scheduler.createWorker();
        worker.schedule(new Runnable() {
            @Override
            public void run() {
                worker.dispose();
                latch.countDown();
            }
        });
    }
    try {
        boolean finished = latch.await(timeout, unit);
        if (!finished) {
            throw new RuntimeException("timeout occured waiting for work to finish");
        }
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}
 
源代码2 项目: state-machine   文件: Processor.java
private void applySignalsToOthers(ClassId<?, Id> cid, Worker worker, Signals<Id> signals) {
    Signal<?, Id> signal;
    while ((signal = signals.signalsToOther.pollFirst()) != null) {
        Signal<?, Id> s = signal;
        if (signal.isImmediate()) {
            subject.onNext(signal);
        } else if (signal.event() instanceof CancelTimedSignal) {
            cancel(signal);
        } else {
            long delayMs = signal.time().get() - worker.now(TimeUnit.MILLISECONDS);
            if (delayMs <= 0) {
                subject.onNext(signal);
            } else {
                scheduleSignal(cid, worker, signal, s, delayMs);
            }
        }
    }
}
 
源代码3 项目: state-machine   文件: Processor.java
private void scheduleSignal(ClassId<?, Id> from, Worker worker, Signal<?, Id> signal,
        Signal<?, Id> s, long delayMs) {
    // record pairwise signal so we can cancel it if
    // desired
    @SuppressWarnings({ "unchecked", "rawtypes" })
    ClassIdPair<Id> idPair = new ClassIdPair<Id>(from,
            new ClassId(signal.cls(), signal.id()));
    long t1 = signalScheduler.now(TimeUnit.MILLISECONDS);
    Disposable subscription = worker.schedule(() -> {
        subject.onNext(s.now());
    } , delayMs, TimeUnit.MILLISECONDS);
    long t2 = signalScheduler.now(TimeUnit.MILLISECONDS);
    worker.schedule(() -> {
        subscriptions.remove(idPair);
    } , delayMs - (t2 - t1), TimeUnit.MILLISECONDS);
    Disposable previous = subscriptions.put(idPair, subscription);
    if (previous != null) {
        previous.dispose();
    }
}
 
源代码4 项目: rxjava2-jdbc   文件: MemberSingle.java
private boolean tryEmit(Observers<T> obs, DecoratingMember<T> m) {
    // get a fresh worker each time so we jump threads to
    // break the stack-trace (a long-enough chain of
    // checkout-checkins could otherwise provoke stack
    // overflow)

    // advance counter to the next and choose an Observer to emit to (round robin)

    int index = obs.index;
    MemberSingleObserver<T> o = obs.observers[index];
    MemberSingleObserver<T> oNext = o;
    // atomically bump up the index (if that entry has not been deleted in
    // the meantime by disposal)
    while (true) {
        Observers<T> x = observers.get();
        if (x.index == index && x.observers[index] == o) {
            boolean[] active = new boolean[x.active.length];
            System.arraycopy(x.active, 0, active, 0, active.length);
            int nextIndex = (index + 1) % active.length;
            while (nextIndex != index && !active[nextIndex]) {
                nextIndex = (nextIndex + 1) % active.length;
            }
            active[nextIndex] = false;
            if (observers.compareAndSet(x,
                    new Observers<T>(x.observers, active, x.activeCount - 1, nextIndex, x.requested - 1))) {
                oNext = x.observers[nextIndex];
                break;
            }
        } else {
            // checkin because no active observers
            m.checkin();
            return false;
        }
    }
    Worker worker = scheduler.createWorker();
    worker.schedule(new Emitter<T>(worker, oNext, m));
    return true;
}
 
@Override
protected void subscribeActual(Subscriber<? super T> child) {
    PagedQueue queue = new PagedQueue(options.fileFactory(), options.pageSizeBytes());
    Worker worker = options.scheduler().createWorker();
    if (source != null) {
        source.subscribe(
                new BufferToFileSubscriberFlowable<T>(child, queue, serializer, worker));
    } else {
        source2.subscribe(
                new BufferToFileSubscriberObservable<T>(child, queue, serializer, worker));
    }
}
 
BufferToFileSubscriber(Subscriber<? super T> child, PagedQueue queue,
        Serializer<T> serializer, Worker worker) {
    this.child = child;
    this.queue = queue;
    this.serializer = serializer;
    this.worker = worker;
}
 
源代码7 项目: rxjava2-extras   文件: SchedulerHelperTest.java
@Test
public void testDispose() {
    Scheduler s = SchedulerHelper.withThreadId(Schedulers.trampoline(), "boo");
    Worker w = s.createWorker();
    Assert.assertFalse(w.isDisposed());
    w.dispose();
    Assert.assertTrue(w.isDisposed());
}
 
@Test
public void testPollQueueThrowsExceptionEmitsError() {
    PagedQueue queue = Mockito.mock(PagedQueue.class);
    RuntimeException err = new RuntimeException();
    Mockito.doThrow(err).when(queue).poll();
    Worker worker = Schedulers.trampoline().createWorker();
    TestSubscriber<String> ts = TestSubscriber.create(1);
    BufferToFileSubscriberFlowable<String> b = new BufferToFileSubscriberFlowable<String>(ts, queue,
            Serializers.utf8(), worker);
    b.onSubscribe(IGNORE);
    b.request(1);
    b.run();
    Mockito.verify(queue, Mockito.atLeastOnce()).poll();
    ts.assertError(err);
}
 
源代码9 项目: state-machine   文件: Processor.java
@SuppressWarnings({ "rawtypes", "unchecked" })
public Flowable<EntityStateMachine<?, Id>> flowable() {
    return Flowable.defer(() -> {
        Worker worker = signalScheduler.createWorker();
        Flowable<Signal<?, Id>> o0 = subject //
                .toSerialized() //
                .toFlowable(BackpressureStrategy.BUFFER) //
                .mergeWith(signals) //
                .doOnCancel(() -> worker.dispose()) //
                .compose(preGroupBy);
        Flowable<GroupedFlowable<ClassId<?, Id>, Signal<?, Id>>> o;
        if (mapFactory != null) {
            o = o0.groupBy(signal -> new ClassId(signal.cls(),
             signal.id()), x -> x, true, 16, mapFactory);
        } else {
            o = o0.groupBy(signal -> new ClassId(signal.cls(), signal.id()),
                    Functions.identity());
        }
        return o.flatMap(g -> {
            Flowable<EntityStateMachine<?, Id>> obs = g //
                    .flatMap(processSignalsToSelfAndSendSignalsToOthers(worker, g.getKey())) //
                    .doOnNext(m -> stateMachines.put(g.getKey(), m)) //
                    .subscribeOn(processingScheduler); //

            Flowable<EntityStateMachine<?, Id>> res = entityTransform
                    .apply(grouped(g.getKey(), obs));
            return res;
        });
    });
}
 
源代码10 项目: state-machine   文件: Processor.java
private Flowable<EntityStateMachine<?, Id>> process(ClassId<?, Id> classId, Event<?> event,
        Worker worker) {

    EntityStateMachine<?, Id> machine = getStateMachine(classId.cls(), classId.id());
    TransitionHandler handler = new TransitionHandler(classId, event, worker, machine);
    return Flowable.generate(handler, handler);
}
 
源代码11 项目: state-machine   文件: Processor.java
TransitionHandler(ClassId<?, Id> classId, Event<?> event, Worker worker,
        EntityStateMachine<?, Id> machine) {
    this.classId = classId;
    this.event = event;
    this.worker = worker;
    this.machine = machine;
}
 
源代码12 项目: akarnokd-misc   文件: ConditionalCompactedStream.java
ConditionalCompactorSubscriber(Subscriber<? super String> actual, long timeout, TimeUnit unit, Worker worker) {
    this.actual = actual;
    this.worker = worker;
    this.timeout = timeout;
    this.unit = unit;
    this.batch = new ArrayList<>();
    this.wip = new AtomicInteger();
    this.mas = new SerialDisposable();
    this.mas.set(NO_TIMER);
    this.queue = new ConcurrentLinkedQueue<>();
}
 
源代码13 项目: akarnokd-misc   文件: ThrottleSampleTest.java
ThrottleFirstSampleObserver(Observer<? super T> actual, long time, TimeUnit unit, Worker worker) {
    this.actual = actual;
    this.time = time;
    this.unit = unit;
    this.worker = worker;
    this.queue = new ConcurrentLinkedQueue<>();
}
 
源代码14 项目: akarnokd-misc   文件: LockstepObserveOnTest.java
LockstepObserveOnSubscriber(Subscriber<? super T> actual, Worker worker) {
    this.actual = actual;
    this.worker = worker;
    this.item = new AtomicReference<>();
    this.requested = new AtomicLong();
    this.wip = new AtomicInteger();
}
 
源代码15 项目: rxjava2-jdbc   文件: MemberSingle.java
Emitter(Worker worker, MemberSingleObserver<T> observer, Member<T> m) {
    this.worker = worker;
    this.observer = observer;
    this.m = m;
}
 
@VisibleForTesting
public BufferToFileSubscriberFlowable(Subscriber<? super T> child, PagedQueue queue,
        Serializer<T> serializer, Worker worker) {
    super(child, queue, serializer, worker);
}
 
BufferToFileSubscriberObservable(Subscriber<? super T> child, PagedQueue queue,
        Serializer<T> serializer, Worker worker) {
    super(child, queue, serializer, worker);
}
 
源代码18 项目: state-machine   文件: Processor.java
private Function<? super Signal<?, Id>, Flowable<EntityStateMachine<?, Id>>> processSignalsToSelfAndSendSignalsToOthers(
        Worker worker, ClassId<?, Id> classId) {
    return signal -> process(classId, signal.event(), worker) //
            .toList() //
            .toFlowable().flatMapIterable(Functions.identity());
}
 
源代码19 项目: akarnokd-misc   文件: ThrottleLastTest.java
RequestObserveOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker) {
    this.actual = actual;
    this.worker = worker;
    this.requestOne = () -> upstream.request(1L);
}
 
源代码20 项目: akarnokd-misc   文件: SingleObserveOnRaceTest.java
@Test
public void race() throws Exception {
    Worker w = Schedulers.newThread().createWorker();
    try {
        for (int i = 0; i < 1000; i++) {
            Integer[] value = { 0, 0 };
            
            TestObserver<Integer> to = new TestObserver<Integer>() {
                @Override
                public void onSuccess(Integer v) {
                    value[1] = value[0];
                    super.onSuccess(v);
                }
            };
            
            SingleSubject<Integer> subj = SingleSubject.create();
            
            subj.observeOn(Schedulers.single())
            .onTerminateDetach()
            .subscribe(to);
            
            AtomicInteger wip = new AtomicInteger(2);
            CountDownLatch cdl = new CountDownLatch(2);
            
            w.schedule(() -> {
                if (wip.decrementAndGet() != 0) {
                    while (wip.get() != 0);
                }
                subj.onSuccess(1);
                cdl.countDown();
            });
            
            Schedulers.single().scheduleDirect(() -> {
                if (wip.decrementAndGet() != 0) {
                    while (wip.get() != 0);
                }
                to.cancel();
                value[0] = null;
                cdl.countDown();
            });

            cdl.await();
            
            Assert.assertNotNull(value[1]);
        }
    } finally {
        w.dispose();
    }
}
 
 类所在包
 类方法
 同包方法