io.reactivex.rxjava3.functions.Function#io.reactivex.rxjava3.core.Flowable源码实例Demo

下面列出了io.reactivex.rxjava3.functions.Function#io.reactivex.rxjava3.core.Flowable 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: java-11-examples   文件: DataServiceTest.java
@Test
public void testDataServiceWithBackPressureComplete() throws InterruptedException {
    DataService dataService = new DataServiceImpl(executor);
    Flowable<DataItem> dataFlow = dataService.getDataFlowWithBackPressure(new DataQuery("query1-back-pressure-complete", 10));
    LOG.info("query submitted");
    SynchronousDataSubscriber dataSubscriber = new SynchronousDataSubscriber();
    dataFlow.subscribe(dataSubscriber);

    dataSubscriber.request(10);
    dataSubscriber.await(10, TimeUnit.SECONDS);
    LOG.info("evaluating test results");

    Assert.assertTrue(dataSubscriber.getErrors().size() == 0);
    Assert.assertTrue(dataSubscriber.getResults().size() == 10);
    Assert.assertTrue(dataSubscriber.isCompleted());
    Assert.assertNotNull(dataSubscriber.getSubscription());
}
 
源代码2 项目: java-11-examples   文件: DataServiceTest.java
@Test
public void testDataServiceWithBackPressureIncomplete() throws InterruptedException {
    DataService dataService = new DataServiceImpl(executor);
    Flowable<DataItem> dataFlow = dataService.getDataFlowWithBackPressure(new DataQuery("query2-back-pressure-incomplete", 10));
    LOG.info("query submitted");
    SynchronousDataSubscriber dataSubscriber = new SynchronousDataSubscriber();
    dataFlow.subscribe(dataSubscriber);

    dataSubscriber.request(5);
    dataSubscriber.await(2, TimeUnit.SECONDS);
    LOG.info("evaluating test results");

    Assert.assertTrue(dataSubscriber.getErrors().size() == 0);
    Assert.assertTrue(dataSubscriber.getResults().size() == 5);
    Assert.assertFalse(dataSubscriber.isCompleted());
    Assert.assertNotNull(dataSubscriber.getSubscription());
}
 
源代码3 项目: GetApk   文件: LocalRepository.java
private Publisher<List<App>> getApps(Context context, final DateFormat dateFormat) {
    return Flowable.just(new WeakReference<>(context))
            .map(new Function<WeakReference<Context>, List<App>>() {
                @Override
                public List<App> apply(WeakReference<Context> weakContext) throws Exception {
                    final PackageManager pm = weakContext.get().getPackageManager();
                    List<PackageInfo> infos = pm.getInstalledPackages(0);
                    List<App> apps = new ArrayList<>();
                    for (PackageInfo info : infos) {
                        App app = new App(info, pm);
                        app.isFormFile = false;
                        Date date = new Date(info.lastUpdateTime);
                        app.time = dateFormat.format(date);
                        apps.add(app);
                    }
                    setApps(apps);
                    return apps;
                }
            });
}
 
源代码4 项目: RxCache   文件: CacheAndRemoteStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return Flowable.concatDelayError(Arrays.asList(cache, remote))
            .filter(new Predicate<Record<T>>() {
                @Override
                public boolean test(@NonNull Record<T> record) throws Exception {
                    return record.getData() != null;
                }
            });
}
 
源代码5 项目: RxCache   文件: RemoteFirstStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote.switchIfEmpty(cache);
}
 
源代码6 项目: RxCache   文件: CacheFirstStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {

    Flowable<Record<T>> cache = rxCache.<T>load2Flowable(key, type);

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return cache.switchIfEmpty(remote);
}
 
源代码7 项目: RxCache   文件: RemoteOnlyStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type) {

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote;
}
 
源代码8 项目: RxCache   文件: RemoteOnlyStrategy.java
@Override
public <T> Publisher<Record<T>> execute(RxCache rxCache, String key, Flowable<T> source, Type type, BackpressureStrategy backpressureStrategy) {

    Flowable<Record<T>> remote = source
            .map(new Function<T, Record<T>>() {
                @Override
                public Record<T> apply(@NonNull T t) throws Exception {

                    rxCache.save(key, t);

                    return new Record<>(Source.CLOUD, key, t);
                }
            });

    return remote;
}
 
源代码9 项目: objectbox-java   文件: QueryObserverTest.java
@Test
public void flowableOneByOne() {
    publisher.setQueryResult(listResult);

    Flowable<String> flowable = RxQuery.flowableOneByOne(mockQuery.getQuery());

    TestObserver testObserver = new TestObserver();
    testObserver.resetLatch(2);
    //noinspection ResultOfMethodCallIgnored
    flowable.subscribe(testObserver);

    testObserver.assertLatchCountedDown(2);
    assertEquals(2, testObserver.receivedChanges.size());
    assertEquals(1, testObserver.receivedChanges.get(0).size());
    assertEquals(1, testObserver.receivedChanges.get(1).size());
    assertNull(testObserver.error);

    testObserver.receivedChanges.clear();

    publisher.publish();
    testObserver.assertNoMoreResults();
}
 
@Test
public void doNotTimeout() {
    given(timeLimiter.getTimeLimiterConfig())
        .willReturn(toConfig(Duration.ofMinutes(1)));
    TestSubscriber<Long> subscriber = Flowable.interval(1, TimeUnit.SECONDS)
        .take(2)
        .compose(TimeLimiterTransformer.of(timeLimiter))
        .test();

    testScheduler.advanceTimeBy(1, TimeUnit.MINUTES);

    subscriber.assertValueCount(2)
        .assertComplete();
    then(timeLimiter).should(times(3))
        .onSuccess();
}
 
@Test public void initialValueToNewSubscriberAfterUnsubscribe() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");
  subscriber1.cancel();

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertValues("Foo");
}
 
@Test public void valueMissedWhenNoSubscribers() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();
  subscriber1.cancel();

  subject.onNext("Foo");
  subscriber1.assertNoValues();

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertNoValues();
}
 
@Test public void backpressureHonoredWhenCached() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");

  TestSubscriber<String> subscriber2 = new TestSubscriber<>(0);
  flowable.subscribe(subscriber2);
  subscriber2.assertNoValues();

  subject.onNext("Bar"); // Replace the cached value...
  subscriber2.request(1); // ...and ensure new requests see it.
  subscriber2.assertValues("Bar");
}
 
@Test public void streamsDoNotShareInstances() {
  PublishProcessor<String> subjectA = PublishProcessor.create();
  Flowable<String> flowableA = subjectA.compose(ReplayingShare.<String>instance());
  TestSubscriber<String> subscriberA1 = new TestSubscriber<>();
  flowableA.subscribe(subscriberA1);

  PublishProcessor<String> subjectB = PublishProcessor.create();
  Flowable<String> flowableB = subjectB.compose(ReplayingShare.<String>instance());
  TestSubscriber<String> subscriberB1 = new TestSubscriber<>();
  flowableB.subscribe(subscriberB1);

  subjectA.onNext("Foo");
  subscriberA1.assertValues("Foo");
  subjectB.onNext("Bar");
  subscriberB1.assertValues("Bar");

  TestSubscriber<String> subscriberA2 = new TestSubscriber<>();
  flowableA.subscribe(subscriberA2);
  subscriberA2.assertValues("Foo");

  TestSubscriber<String> subscriberB2 = new TestSubscriber<>();
  flowableB.subscribe(subscriberB2);
  subscriberB2.assertValues("Bar");
}
 
@Test public void completeClearsCacheAndResubscribes() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  replayed.subscribe(subscriber1);
  subscriber1.assertValues("initA");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  replayed.subscribe(observer2);
  subscriber1.assertValues("initA");

  upstream.onComplete();
  subscriber1.assertComplete();
  observer2.assertComplete();

  start.set(0, "initB");

  TestSubscriber<String> observer3 = new TestSubscriber<>();
  replayed.subscribe(observer3);
  observer3.assertValues("initB");
}
 
@Test public void errorClearsCacheAndResubscribes() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed = upstream.startWithIterable(start).compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  replayed.subscribe(subscriber1);
  subscriber1.assertValues("initA");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  replayed.subscribe(observer2);
  subscriber1.assertValues("initA");

  RuntimeException r = new RuntimeException();
  upstream.onError(r);
  subscriber1.assertError(r);
  observer2.assertError(r);

  start.set(0, "initB");

  TestSubscriber<String> observer3 = new TestSubscriber<>();
  replayed.subscribe(observer3);
  observer3.assertValues("initB");
}
 
@Test public void completeClearsCacheAndResubscribesStartingWithDefault() {
  List<String> start = new ArrayList<>();
  start.add("initA");

  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed =
      upstream.startWithIterable(start).compose(ReplayingShare.createWithDefault("default"));

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  replayed.subscribe(subscriber1);
  subscriber1.assertValues("default", "initA");

  TestSubscriber<String> observer2 = new TestSubscriber<>();
  replayed.subscribe(observer2);
  subscriber1.assertValues("default", "initA");

  upstream.onComplete();
  subscriber1.assertComplete();
  observer2.assertComplete();

  start.set(0, "initB");

  TestSubscriber<String> observer3 = new TestSubscriber<>();
  replayed.subscribe(observer3);
  observer3.assertValues("default", "initB");
}
 
源代码18 项目: resilience4j   文件: FlowableRateLimiterTest.java
@Test
public void shouldPropagateError() {
    given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(0).toNanos());

    Flowable.error(new IOException("BAM!"))
        .compose(RateLimiterOperator.of(rateLimiter))
        .test()
        .assertError(IOException.class)
        .assertNotComplete();
}
 
源代码19 项目: catnip   文件: DefaultDispatchManager.java
@Override
public Flowable<T> asFlowable(final BackpressureStrategy backpressureStrategy) {
    return Flowable.create(emitter -> {
        internalHandler = emitter::onNext;
        emitter.setCancellable(this::close);
    }, backpressureStrategy);
}
 
源代码20 项目: cxf   文件: RxJava3FlowableService.java
@GET
@Produces("application/json")
@Path("textJsonImplicitList")
public Flowable<HelloWorldBean> getJsonImplicitList() {
    return Flowable.create(subscriber -> {
        Thread t = new Thread(() -> {
            subscriber.onNext(new HelloWorldBean("Hello"));
            sleep();
            subscriber.onNext(new HelloWorldBean("Ciao"));
            sleep();
            subscriber.onComplete();
        });
        t.start();
    }, BackpressureStrategy.MISSING);
}
 
源代码21 项目: GetApk   文件: AppDelegate.java
public void getIcon() {
    recycler();
    final WeakReference<App> appWeak = new WeakReference<>(mApp);
    mDisposable = Flowable.just(itemView.getContext().getApplicationContext().getPackageManager())
            .map(new Function<PackageManager, Drawable>() {
                @Override
                public Drawable apply(PackageManager pm) throws Exception {
                    App app = appWeak.get();
                    if (app != null){
                        return app.applicationInfo.loadIcon(pm);
                    }else {
                        throw new Exception("app is null");
                    }
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableSubscriber<Drawable>() {
                @Override
                public void onNext(Drawable drawable) {
                    mIconImg.setImageDrawable(drawable);
                }

                @Override
                public void onError(Throwable t) {

                }

                @Override
                public void onComplete() {

                }
            });
}
 
源代码22 项目: GetApk   文件: LocalRepository.java
public Disposable getApp(Context context, final Uri path, KWSubscriber<App> subscriber) {
    return Flowable.just(new WeakReference<>(context))
            .map(weakContext -> {
                PackageManager pm = weakContext.get().getPackageManager();
                String realPath;
                if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.Q){
                    File file = new File(weakContext.get().getExternalCacheDir(), "temp.apk");
                    file.deleteOnExit();
                    realPath = file.getAbsolutePath();
                    FileUtil.copy(weakContext.get().getContentResolver(), path, Uri.fromFile(new File(realPath)), null);
                }else {
                    realPath = FileUtil.getPath(weakContext.get(), path);
                }
                PackageInfo info = pm.getPackageArchiveInfo(realPath, 0);
                if (info == null) {
                    throw new NullPointerException();
                }
                ApplicationInfo applicationInfo = info.applicationInfo;
                applicationInfo.sourceDir = realPath;
                applicationInfo.publicSourceDir = realPath;
                App app = new App(info, pm);
                app.isFormFile = true;
                return app;
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(subscriber);
}
 
源代码23 项目: resilience4j   文件: FlowableRateLimiterTest.java
@Test
public void shouldEmitSingleEventWithSinglePermit() {
    given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(0).toNanos());

    Flowable.just(1)
        .compose(RateLimiterOperator.of(rateLimiter))
        .test()
        .assertResult(1);
}
 
源代码24 项目: cxf   文件: JAXRSRxJava3FlowableTest.java
@Test
public void testGetHelloWorldAsyncObservable() throws Exception {
    String address = "http://localhost:" + PORT + "/rx3/flowable/textAsync";
    WebClient wc = WebClient.create(address,
                                    Collections.singletonList(new FlowableRxInvokerProvider()));
    Flowable<String> obs = wc.accept("text/plain")
        .rx(FlowableRxInvoker.class)
        .get(String.class);

    final TestSubscriber<String> subscriber = new TestSubscriber<>();
    obs.map(s -> s + s).subscribe(subscriber);
    
    subscriber.await(2, TimeUnit.SECONDS);
    subscriber.assertResult("Hello, world!Hello, world!");
}
 
源代码25 项目: resilience4j   文件: FlowableRateLimiterTest.java
@Test
public void shouldEmitAllEvents() {
    given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(0).toNanos());

    Flowable.fromArray(1, 2)
        .compose(RateLimiterOperator.of(rateLimiter))
        .test()
        .assertResult(1, 2);
}
 
源代码26 项目: code-examples   文件: ReactiveBatchProcessorV3.java
public void start() {
  // WARNING: this code doesn't work as expected
  Scheduler scheduler = threadPoolScheduler(threads, threadPoolQueueSize);

  messageSource.getMessageBatches()
      .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
      .doOnNext(batch -> logger.log(batch.toString()))
      .flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
      .flatMapSingle(m -> Single.defer(() -> Single.just(messageHandler.handleMessage(m)))
          .subscribeOn(scheduler))
      .subscribeWith(new SimpleSubscriber<>(threads, 1));
}
 
源代码27 项目: code-examples   文件: ReactiveBatchProcessorV1.java
public void start() {
  // WARNING: this code doesn't work as expected
  messageSource.getMessageBatches()
      .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
      .doOnNext(batch -> logger.log(batch.toString()))
      .flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
      .flatMapSingle(m -> Single.just(messageHandler.handleMessage(m))
          .subscribeOn(threadPoolScheduler(threads, threadPoolQueueSize)))
      .subscribeWith(new SimpleSubscriber<>(threads, 1));
}
 
源代码28 项目: tutorials   文件: RxJavaLiveVideo.java
public static Disposable streamLiveVideo(long produceDelay, long consumeDelay, int bufferSize, Runnable onError) {
    return Flowable
      .fromStream(Stream.iterate(new VideoFrame(0), videoFrame -> {
          sleep(produceDelay);
          return new VideoFrame(videoFrame.getNumber() + 1);
      }))
      .subscribeOn(Schedulers.from(Executors.newSingleThreadScheduledExecutor()), true)
      .onBackpressureBuffer(bufferSize, null, BackpressureOverflowStrategy.ERROR)
      .observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
      .subscribe(item -> {
          sleep(consumeDelay);
      }, throwable -> {
          onError.run();
      });
}
 
@Test public void initialValueToNewSubscriber() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  TestSubscriber<String> subscriber1 = new TestSubscriber<>();
  flowable.subscribe(subscriber1);
  subscriber1.assertNoValues();

  subject.onNext("Foo");
  subscriber1.assertValues("Foo");

  TestSubscriber<String> subscriber2 = new TestSubscriber<>();
  flowable.subscribe(subscriber2);
  subscriber2.assertValues("Foo");
}
 
@Test public void unsubscribeBeforeSubscribePreventsCacheEmission() {
  PublishProcessor<String> upstream = PublishProcessor.create();
  Flowable<String> replayed = upstream.compose(ReplayingShare.<String>instance());
  replayed.subscribe();
  upstream.onNext("something to cache");

  TestSubscriber<String> testSubscriber = new TestSubscriber<>();
  testSubscriber.cancel();
  replayed.subscribe(testSubscriber);
  testSubscriber.assertNoValues();
}