下面列出了怎么用io.reactivex.subjects.CompletableSubject的API类实例代码及写法,或者点击链接到github查看源代码。
public ThrottledLollipopScanner(ParsedAdvertisement.Factory parsedAdDataFactory,
long maxScanDurationMs,
long pauseIntervalMs) {
this.parsedAdDataFactory = parsedAdDataFactory;
this.scanCallback = getScanCallback();
this.maxScanDurationMs = maxScanDurationMs;
this.pauseIntervalMs = pauseIntervalMs;
this.errorSubject = CompletableSubject.create();
this.sharedScanData = scanModeRelay
.switchMap(nextScanMode -> Observable.fromCallable(this::calculateDelay)
.switchMap(delay -> Observable.timer(delay, TimeUnit.MILLISECONDS))
.map(proceed -> nextScanMode))
.distinctUntilChanged()
.switchMap(nextScanMode -> Observable.concat(
// Start a (potentially delayed) throttled scan.
throttledScan(scanDataRelay, nextScanMode),
// Repeat pause followed by throttle scan.
intervalScan(scanDataRelay, nextScanMode)))
.doFinally(this::cleanup)
.share();
}
@Test
public void test() {
@SuppressWarnings("unchecked")
List<Integer> list = mock(List.class);
CompletableSubject source = CompletableSubject.create();
source.doOnSubscribe(v -> list.add(1))
.doOnError(e -> list.remove(1))
.doOnComplete(() -> list.remove(1))
.subscribe();
source.onComplete();
verify(list).add(1);
verify(list).remove(1);
verifyNoMoreInteractions(list);
}
public JellyBeanScanner(ParsedAdvertisement.Factory parsedAdDataFactory) {
this.parsedAdDataFactory = parsedAdDataFactory;
this.leScanCallback = getScanCallback();
this.errorSubject = CompletableSubject.create();
this.sharedScanData = scanDataRelay
.doOnSubscribe(disposable -> startScan())
.doFinally(this::stopScan)
.share();
}
private CompletableSubject getErrorSubject() {
synchronized (syncRoot) {
if (errorSubject.hasThrowable()) {
errorSubject = CompletableSubject.create();
}
return errorSubject;
}
}
private CompletableSubject getErrorSubject() {
synchronized (syncRoot) {
if (errorSubject.hasThrowable()) {
errorSubject = CompletableSubject.create();
}
return errorSubject;
}
}