下面列出了io.reactivex.rxjava3.functions.Function#io.reactivex.rxjava3.core.Flowable 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testDataServiceWithBackPressureComplete() throws InterruptedException {
DataService dataService = new DataServiceImpl(executor);
Flowable<DataItem> dataFlow = dataService.getDataFlowWithBackPressure(new DataQuery("query1-back-pressure-complete", 10));
LOG.info("query submitted");
SynchronousDataSubscriber dataSubscriber = new SynchronousDataSubscriber();
dataFlow.subscribe(dataSubscriber);
dataSubscriber.request(10);
dataSubscriber.await(10, TimeUnit.SECONDS);
LOG.info("evaluating test results");
Assert.assertTrue(dataSubscriber.getErrors().size() == 0);
Assert.assertTrue(dataSubscriber.getResults().size() == 10);
Assert.assertTrue(dataSubscriber.isCompleted());
Assert.assertNotNull(dataSubscriber.getSubscription());
}
@Test
public void testDataServiceWithBackPressureIncomplete() throws InterruptedException {
DataService dataService = new DataServiceImpl(executor);
Flowable<DataItem> dataFlow = dataService.getDataFlowWithBackPressure(new DataQuery("query2-back-pressure-incomplete", 10));
LOG.info("query submitted");
SynchronousDataSubscriber dataSubscriber = new SynchronousDataSubscriber();
dataFlow.subscribe(dataSubscriber);
dataSubscriber.request(5);
dataSubscriber.await(2, TimeUnit.SECONDS);
LOG.info("evaluating test results");
Assert.assertTrue(dataSubscriber.getErrors().size() == 0);
Assert.assertTrue(dataSubscriber.getResults().size() == 5);
Assert.assertFalse(dataSubscriber.isCompleted());
Assert.assertNotNull(dataSubscriber.getSubscription());
}
private Publisher<List<App>> getApps(Context context, final DateFormat dateFormat) {
return Flowable.just(new WeakReference<>(context))
.map(new Function<WeakReference<Context>, List<App>>() {
@Override
public List<App> apply(WeakReference<Context> weakContext) throws Exception {
final PackageManager pm = weakContext.get().getPackageManager();
List<PackageInfo> infos = pm.getInstalledPackages(0);
List<App> apps = new ArrayList<>();
for (PackageInfo info : infos) {
App app = new App(info, pm);
app.isFormFile = false;
Date date = new Date(info.lastUpdateTime);
app.time = dateFormat.format(date);
apps.add(app);
}
setApps(apps);
return apps;
}
});
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return Flowable.concatDelayError(Arrays.asList(cache, remote))
.filter(new Predicate<Record<T>>() {
@Override
public boolean test(@NonNull Record<T> record) throws Exception {
return record.getData() != null;
}
});
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote.switchIfEmpty(cache);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return cache.switchIfEmpty(remote);
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote;
}
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {
Flowable<Record<T>> remote = source
.map(new Function<T, Record<T>>() {
@Override
public Record<T> apply(@NonNull T t) throws Exception {
rxCache.save(key, t);
return new Record<>(Source.CLOUD, key, t);
}
});
return remote;
}
@Test
public void flowableOneByOne() {
publisher.setQueryResult(listResult);
Flowable<String> flowable = RxQuery.flowableOneByOne(mockQuery.getQuery());
TestObserver testObserver = new TestObserver();
testObserver.resetLatch(2);
//noinspection ResultOfMethodCallIgnored
flowable.subscribe(testObserver);
testObserver.assertLatchCountedDown(2);
assertEquals(2, testObserver.receivedChanges.size());
assertEquals(1, testObserver.receivedChanges.get(0).size());
assertEquals(1, testObserver.receivedChanges.get(1).size());
assertNull(testObserver.error);
testObserver.receivedChanges.clear();
publisher.publish();
testObserver.assertNoMoreResults();
}
@Test
public void doNotTimeout() {
given(timeLimiter.getTimeLimiterConfig())
.willReturn(toConfig(Duration.ofMinutes(1)));
TestSubscriber<Long> subscriber = Flowable.interval(1, TimeUnit.SECONDS)
.take(2)
.compose(TimeLimiterTransformer.of(timeLimiter))
.test();
testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);
subscriber.assertValueCount(2)
.assertComplete();
then(timeLimiter).should(times(3))
.onSuccess();
}
@Test public void initialValueToNewSubscriberAfterUnsubscribe() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subject.onNext("Foo");
subscriber1.assertValues("Foo");
subscriber1.cancel();
TestSubscriber<String> subscriber2 = new TestSubscriber<>();
flowable.subscribe(subscriber2);
subscriber2.assertValues("Foo");
}
@Test public void valueMissedWhenNoSubscribers() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subscriber1.cancel();
subject.onNext("Foo");
subscriber1.assertNoValues();
TestSubscriber<String> subscriber2 = new TestSubscriber<>();
flowable.subscribe(subscriber2);
subscriber2.assertNoValues();
}
@Test public void backpressureHonoredWhenCached() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subject.onNext("Foo");
subscriber1.assertValues("Foo");
TestSubscriber<String> subscriber2 = new TestSubscriber<>(0);
flowable.subscribe(subscriber2);
subscriber2.assertNoValues();
subject.onNext("Bar"); // Replace the cached value...
subscriber2.request(1); // ...and ensure new requests see it.
subscriber2.assertValues("Bar");
}
@Test public void streamsDoNotShareInstances() {
PublishProcessor<String> subjectA = PublishProcessor.create();
Flowable<String> flowableA = subjectA.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriberA1 = new TestSubscriber<>();
flowableA.subscribe(subscriberA1);
PublishProcessor<String> subjectB = PublishProcessor.create();
Flowable<String> flowableB = subjectB.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriberB1 = new TestSubscriber<>();
flowableB.subscribe(subscriberB1);
subjectA.onNext("Foo");
subscriberA1.assertValues("Foo");
subjectB.onNext("Bar");
subscriberB1.assertValues("Bar");
TestSubscriber<String> subscriberA2 = new TestSubscriber<>();
flowableA.subscribe(subscriberA2);
subscriberA2.assertValues("Foo");
TestSubscriber<String> subscriberB2 = new TestSubscriber<>();
flowableB.subscribe(subscriberB2);
subscriberB2.assertValues("Bar");
}
@Test public void completeClearsCacheAndResubscribes() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("initA");
TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("initA");
upstream.onComplete();
subscriber1.assertComplete();
observer2.assertComplete();
start.set(0, "initB");
TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("initB");
}
@Test public void errorClearsCacheAndResubscribes() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("initA");
TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("initA");
RuntimeException r = new RuntimeException();
upstream.onError(r);
subscriber1.assertError(r);
observer2.assertError(r);
start.set(0, "initB");
TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("initB");
}
@Test public void completeClearsCacheAndResubscribesStartingWithDefault() {
List<String> start = new ArrayList<>();
start.add("initA");
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed =
upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
replayed.subscribe(subscriber1);
subscriber1.assertValues("default", "initA");
TestSubscriber<String> observer2 = new TestSubscriber<>();
replayed.subscribe(observer2);
subscriber1.assertValues("default", "initA");
upstream.onComplete();
subscriber1.assertComplete();
observer2.assertComplete();
start.set(0, "initB");
TestSubscriber<String> observer3 = new TestSubscriber<>();
replayed.subscribe(observer3);
observer3.assertValues("default", "initB");
}
@Test
public void shouldPropagateError() {
given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(0).toNanos());
Flowable.error(new IOException("BAM!"))
.compose(RateLimiterOperator.of(rateLimiter))
.test()
.assertError(IOException.class)
.assertNotComplete();
}
@Override
public Flowable<T> asFlowable(final BackpressureStrategy backpressureStrategy) {
return Flowable.create(emitter -> {
internalHandler = emitter::onNext;
emitter.setCancellable(this::close);
}, backpressureStrategy);
}
@GET
@Produces("application/json")
@Path("textJsonImplicitList")
public Flowable<HelloWorldBean> getJsonImplicitList() {
return Flowable.create(subscriber -> {
Thread t = new Thread(() -> {
subscriber.onNext(new HelloWorldBean("Hello"));
sleep();
subscriber.onNext(new HelloWorldBean("Ciao"));
sleep();
subscriber.onComplete();
});
t.start();
}, BackpressureStrategy.MISSING);
}
public void getIcon() {
recycler();
final WeakReference<App> appWeak = new WeakReference<>(mApp);
mDisposable = Flowable.just(itemView.getContext().getApplicationContext().getPackageManager())
.map(new Function<PackageManager, Drawable>() {
@Override
public Drawable apply(PackageManager pm) throws Exception {
App app = appWeak.get();
if (app != null){
return app.applicationInfo.loadIcon(pm);
}else {
throw new Exception("app is null");
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableSubscriber<Drawable>() {
@Override
public void onNext(Drawable drawable) {
mIconImg.setImageDrawable(drawable);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
public Disposable getApp(Context context, final Uri path, KWSubscriber<App> subscriber) {
return Flowable.just(new WeakReference<>(context))
.map(weakContext -> {
PackageManager pm = weakContext.get().getPackageManager();
String realPath;
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q){
File file = new File(weakContext.get().getExternalCacheDir(), "temp.apk");
file.deleteOnExit();
realPath = file.getAbsolutePath();
FileUtil.copy(weakContext.get().getContentResolver(), path, Uri.fromFile(new File(realPath)), null);
}else {
realPath = FileUtil.getPath(weakContext.get(), path);
}
PackageInfo info = pm.getPackageArchiveInfo(realPath, 0);
if (info == null) {
throw new NullPointerException();
}
ApplicationInfo applicationInfo = info.applicationInfo;
applicationInfo.sourceDir = realPath;
applicationInfo.publicSourceDir = realPath;
App app = new App(info, pm);
app.isFormFile = true;
return app;
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(subscriber);
}
@Test
public void shouldEmitSingleEventWithSinglePermit() {
given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(0).toNanos());
Flowable.just(1)
.compose(RateLimiterOperator.of(rateLimiter))
.test()
.assertResult(1);
}
@Test
public void testGetHelloWorldAsyncObservable() throws Exception {
String address = "http://localhost:" + PORT + "/rx3/flowable/textAsync";
WebClient wc = WebClient.create(address,
Collections.singletonList(new FlowableRxInvokerProvider()));
Flowable<String> obs = wc.accept("text/plain")
.rx(FlowableRxInvoker.class)
.get(String.class);
final TestSubscriber<String> subscriber = new TestSubscriber<>();
obs.map(s -> s + s).subscribe(subscriber);
subscriber.await(2, TimeUnit.SECONDS);
subscriber.assertResult("Hello, world!Hello, world!");
}
@Test
public void shouldEmitAllEvents() {
given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(0).toNanos());
Flowable.fromArray(1, 2)
.compose(RateLimiterOperator.of(rateLimiter))
.test()
.assertResult(1, 2);
}
public void start() {
// WARNING: this code doesn't work as expected
Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);
messageSource.getMessageBatches()
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.doOnNext(batch -> logger.log(batch.toString()))
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
.flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
.subscribeOn(scheduler))
.subscribeWith(new SimpleSubscriber<>(threads, 1));
}
public void start() {
// WARNING: this code doesn't work as expected
messageSource.getMessageBatches()
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.doOnNext(batch -> logger.log(batch.toString()))
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
.flatMapSingle(m -> Single.just(messageHandler.handleMessage(m))
.subscribeOn(threadPoolScheduler(threads, threadPoolQueueSize)))
.subscribeWith(new SimpleSubscriber<>(threads, 1));
}
public static Disposable streamLiveVideo(long produceDelay, long consumeDelay, int bufferSize, Runnable onError) {
return Flowable
.fromStream(Stream.iterate(new VideoFrame(0), videoFrame -> {
sleep(produceDelay);
return new VideoFrame(videoFrame.getNumber() + 1);
}))
.subscribeOn(Schedulers.from(Executors.newSingleThreadScheduledExecutor()), true)
.onBackpressureBuffer(bufferSize, null, BackpressureOverflowStrategy.ERROR)
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(item -> {
sleep(consumeDelay);
}, throwable -> {
onError.run();
});
}
@Test public void initialValueToNewSubscriber() {
PublishProcessor<String> subject = PublishProcessor.create();
Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());
TestSubscriber<String> subscriber1 = new TestSubscriber<>();
flowable.subscribe(subscriber1);
subscriber1.assertNoValues();
subject.onNext("Foo");
subscriber1.assertValues("Foo");
TestSubscriber<String> subscriber2 = new TestSubscriber<>();
flowable.subscribe(subscriber2);
subscriber2.assertValues("Foo");
}
@Test public void unsubscribeBeforeSubscribePreventsCacheEmission() {
PublishProcessor<String> upstream = PublishProcessor.create();
Flowable<String> replayed = upstream.compose(ReplayingShare.<String>instance());
replayed.subscribe();
upstream.onNext("something to cache");
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.cancel();
replayed.subscribe(testSubscriber);
testSubscriber.assertNoValues();
}