类io.reactivex.Flowable源码实例Demo

下面列出了怎么用io.reactivex.Flowable的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: rxjava2-extras   文件: RetryWhenTest.java
@SuppressWarnings("unchecked")
@Test
public void testRetryWhenSpecificExceptionFails() {
    Exception ex = new IllegalArgumentException("boo");
    TestSubscriber<Integer> ts = TestSubscriber.create();
    TestScheduler scheduler = new TestScheduler();
    Flowable.just(1, 2)
            // force error after 3 emissions
            .concatWith(Flowable.<Integer>error(ex))
            // retry with backoff
            .retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
                    .scheduler(scheduler).failWhenInstanceOf(IllegalArgumentException.class).build())
            // go
            .subscribe(ts);
    ts.assertValues(1, 2);
    ts.assertError(ex);
}
 
@Override
public Flowable<Fortune> fortunes() {
    return Flowable.create(sink ->
            pgClients.getOne().preparedQuery("SELECT * FROM fortune", ar -> {
                if (ar.failed()) {
                    sink.onError(ar.cause());
                    return;
                }

                PgIterator resultSet = ar.result().iterator();
                while (resultSet.hasNext()) {
                    Tuple row = resultSet.next();
                    sink.onNext(new Fortune(row.getInteger(0), row.getString(1)));
                }
                sink.onComplete();
            }), BackpressureStrategy.BUFFER);
}
 
源代码3 项目: mimi-reader   文件: PostOptionTableConnection.java
private static Flowable<Boolean> insertPostOption(String option) {
    PostOption postOption = new PostOption();
    postOption.option = option;
    postOption.lastUsed = System.currentTimeMillis();

    BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
    BriteDatabase.Transaction transaction = db.newTransaction();
    long val = 0;
    try {
        val = db.insert(PostOption.TABLE_NAME, SQLiteDatabase.CONFLICT_REPLACE, postOption.toContentValues());
        transaction.markSuccessful();
    } catch (Exception e) {
        Log.e(LOG_TAG, "Error putting post options into the database", e);
    }
    finally {
        transaction.end();
    }

    return Flowable.just(val > 0);
}
 
源代码4 项目: smallrye-mutiny   文件: MultiFlattenTest.java
@Test
public void testWithPublishers() {
    AtomicBoolean subscribed = new AtomicBoolean();
    MultiAssertSubscriber<String> subscriber = Multi.createFrom().items(
            Flowable.just("a", "b", "c"),
            Flowable.just("d", "e"),
            Flowable.empty(),
            Flowable.just("f", "g")
                    .doOnSubscribe(s -> subscribed.set(true)))
            .onItem().<String> disjoint()
            .subscribe().withSubscriber(MultiAssertSubscriber.create(4));
    assertThat(subscribed).isFalse();
    subscriber.assertReceived("a", "b", "c", "d");
    subscriber.request(3);
    subscriber.assertCompletedSuccessfully();
    assertThat(subscriber.items()).contains("e", "f", "g");
    assertThat(subscribed).isTrue();
}
 
private void computeEvaluation(WebClient webClient, Handler<AsyncResult<Double>> resultHandler) {
    // We need to call the service for each company in which we own shares
    Flowable.fromIterable(portfolio.getShares().entrySet())
        // For each, we retrieve the value
        .flatMapSingle(entry -> getValueForCompany(webClient, entry.getKey(), entry.getValue()))
        // We accumulate the results
        .toList()
        // And compute the sum
        .map(list -> list.stream().mapToDouble(x -> x).sum())
        // We report the result or failure
        .subscribe((sum, err) -> {
            if (err != null) {
                resultHandler.handle(Future.failedFuture(err));
            } else {
                resultHandler.handle(Future.succeededFuture(sum));
            }
        });
}
 
源代码6 项目: web3sdk   文件: NewSolTest.java
public Flowable<TransferEventResponse> transferEventFlowable(BcosFilter filter) {
    return web3j.logFlowable(filter)
            .map(
                    new io.reactivex.functions.Function<Log, TransferEventResponse>() {
                        @Override
                        public TransferEventResponse apply(Log log) {
                            EventValuesWithLog eventValues =
                                    extractEventParametersWithLog(TRANSFER_EVENT, log);
                            TransferEventResponse typedResponse = new TransferEventResponse();
                            typedResponse.log = log;
                            typedResponse.from =
                                    (String) eventValues.getIndexedValues().get(0).getValue();
                            typedResponse.to =
                                    (String) eventValues.getIndexedValues().get(1).getValue();
                            typedResponse.tokens =
                                    (BigInteger)
                                            eventValues.getNonIndexedValues().get(0).getValue();
                            return typedResponse;
                        }
                    });
}
 
@SuppressWarnings("unchecked")
@Test
public void testSinkUsingChannelName() throws InterruptedException {
    String topic = UUID.randomUUID().toString();
    CountDownLatch latch = new CountDownLatch(1);
    AtomicInteger expected = new AtomicInteger(0);
    usage.consumeIntegers(topic, 10, 10, TimeUnit.SECONDS,
            latch::countDown,
            v -> expected.getAndIncrement());

    Map<String, Object> config = new HashMap<>();
    config.put("channel-name", topic);
    config.put("host", address);
    config.put("port", port);
    MqttSink sink = new MqttSink(vertx, new MqttConnectorOutgoingConfiguration(new MapBasedConfig(config)));

    Subscriber<? extends Message<?>> subscriber = sink.getSink().build();
    Flowable.range(0, 10)
            .map(Message::of)
            .subscribe((Subscriber<? super Message<Integer>>) subscriber);

    assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue();
    await().untilAtomic(expected, is(10));
    assertThat(expected).hasValue(10);
}
 
源代码8 项目: NewFastFrame   文件: SplashActivity.java
@Override
protected void onResume() {
    super.onResume();
    boolean isLogin = BaseApplication.getAppComponent().getSharedPreferences().getBoolean(ConstantUtil.LOGIN_STATUS, false);
    if (isLogin) {
        Flowable.timer(1, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(aLong -> {
                    LogUtil.e("该用户有缓存数据,直接跳转到主界面");
                    updateUserInfo();
                });
    } else {
        Flowable.timer(1, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(aLong -> {
                    LogUtil.e("该用户无缓存数据,直接跳转到登录界面");
                    LoginActivity.start(SplashActivity.this, null);
                    finish();
                });
    }
}
 
源代码9 项目: mvvm-template   文件: RestHelper.java
/**
 * Create a mapper from retrofit service to {@link Resource} with rx's {@link Flowable}
 * To indicate current state while execute an rest api (loading, error, success with status and message if error)
 * @param remote from retrofit service
 * @param onSave will be called after success response come, to save response data into local database
 * @param <T> type of response
 * @return a {@link Flowable} instance to deal with progress showing and error handling
 */
public static <T> Flowable<Resource<T>> createRemoteSourceMapper(@Nullable Single<T> remote,
                                                                 @Nullable PlainConsumer<T> onSave) {
    return Flowable.create(emitter -> {
        new SimpleRemoteSourceMapper<T>(emitter) {

            @Override
            public Single<T> getRemote() {
                return remote;
            }

            @Override
            public void saveCallResult(T data) {
                if (onSave != null) {
                    onSave.accept(data);
                }
            }
        };
    }, BackpressureStrategy.BUFFER);
}
 
源代码10 项目: rxjava2-jdbc   文件: DatabaseTest.java
@Test
public void testUpdateWithTestDatabaseForReadme() {
    try (Database db = db()) {
        db //
                .update("update person set date_of_birth = ?") //
                .parameterStream(Flowable.just(Parameter.NULL)) //
                .counts() //
                .test() //
                .awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
                .assertValue(3) //
                .assertComplete();
    }
}
 
@Incoming("count")
@Outgoing("sink")
public Flowable<String> process(Flowable<Integer> source) {
    return source
            .map(i -> i + 1)
            .flatMap(i -> Flowable.just(i, i))
            .map(i -> Integer.toString(i));
}
 
/**
 * start and execute the streamer flow
 */
@SuppressWarnings("unchecked")
public final void execute() {

	log.info("starting the stream for the ignite source/sink flow");

	if (igniteCacheSourceTask.isStopped()) throw new IllegalStateException("Ignite source task is not yet started");
	if (igniteCacheSinkTask.isStopped()) throw new IllegalStateException("Ignite sink task is not yet started");

	//noinspection unchecked
	Flowable.fromCallable(igniteCacheSourceTask::poll)
			.repeatWhen(flowable -> flowable.delay(periodicInterval, TimeUnit.MILLISECONDS))
			.retry(retryTimes, retryPredicate)
			.doOnError(throwable -> streamReport.addErrorMsg(throwable.getMessage()))
			.doOnNext(igniteCacheSinkTask::put)
			.doOnError(throwable -> streamReport.addErrorMsg(throwable.getMessage()))
			.doOnNext(data -> {
				if (null != data && !data.isEmpty() && atomicInteger.addAndGet(data.size()) % flushBufferSize == 0) {
					igniteCacheSinkTask.flush();
				}
			})
			.retry(retryTimes, retryPredicate)
			.doOnError(throwable -> streamReport.addErrorMsg(throwable.getMessage()))
			.doFinally(() -> {
				log.info("cleaning and stopping ignite tasks from the stream");
				if (log.isDebugEnabled()) {
					log.debug("final execution report: error messages : {}, Total streamed record number : {}", streamReport.getErrorMsgs().toArray(), streamReport.getAddedRecords());
				}
				igniteCacheSinkTask.stop();
				igniteCacheSourceTask.stop();
			})
			.subscribe(onNextAction, onErrorAction);
}
 
@Override
public Publisher<Long> createPublisher(long elements) {
    AtomicLong counter = new AtomicLong();

    ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1);
    scheduledExecutor.setRemoveOnCancelPolicy(true);

    return Flowable.fromPublisher(new ScheduledPublisher<>(
        () -> Flowable.just(counter.getAndIncrement()).delaySubscription(100, TimeUnit.MILLISECONDS),
        100,
        TimeUnit.MILLISECONDS,
        scheduledExecutor
    )).take(elements);
}
 
源代码14 项目: storio   文件: ChangesFilterTest.java
@Test
public void applyForTables_throwsIfTagsNull() {
    expectedException.expect(NullPointerException.class);
    expectedException.expectMessage(equalTo("Set of tags can not be null"));
    expectedException.expectCause(nullValue(Throwable.class));

    //noinspection ConstantConditions
    ChangesFilter.applyForTags(Flowable.<Changes>empty(), null);
}
 
源代码15 项目: storio   文件: GetOperationTest.java
@Test
public void getExistedObjectExecuteAsFlowable() {
    final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();

    storIOContentResolver
            .observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
            .take(1)
            .subscribe(changesTestSubscriber);

    TestItem expectedItem = TestItem.create(null, "value");
    contentResolver.insert(TestItem.CONTENT_URI, expectedItem.toContentValues());
    contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value1").toContentValues());
    contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value2").toContentValues());

    Flowable<Optional<TestItem>> testItemFlowable = storIOContentResolver
            .get()
            .object(TestItem.class)
            .withQuery(Query.builder()
                    .uri(TestItem.CONTENT_URI)
                    .where(TestItem.COLUMN_VALUE + "=?")
                    .whereArgs("value")
                    .build())
            .prepare()
            .asRxFlowable(BackpressureStrategy.MISSING)
            .take(1);

    TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();
    testItemFlowable.subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent(5, SECONDS);
    testSubscriber.assertNoErrors();

    List<Optional<TestItem>> emmitedItems = testSubscriber.values();
    assertThat(emmitedItems.size()).isEqualTo(1);
    assertThat(expectedItem.equalsWithoutId(emmitedItems.get(0).get())).isTrue();

    changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
    changesTestSubscriber.assertNoErrors();
    changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
 
源代码16 项目: ground-android   文件: MapContainerViewModel.java
private Flowable<CameraUpdate> createLocationLockCameraUpdateFlowable(BooleanOrError lockState) {
  if (!lockState.isTrue()) {
    return Flowable.empty();
  }
  // The first update pans and zooms the camera to the appropriate zoom level; subsequent ones
  // only pan the map.
  Flowable<Point> locationUpdates = locationManager.getLocationUpdates();
  return locationUpdates
      .take(1)
      .map(CameraUpdate::panAndZoom)
      .concatWith(locationUpdates.map(CameraUpdate::pan).skip(1));
}
 
源代码17 项目: camelinaction2   文件: FirstTest.java
@Test
public void testFirst() throws Exception {
    LOG.info("Starting RX-Java2 Flowable first");

    // use stream engine to subscribe from a timer interval that runs a continued
    // stream with data once per second
    Flowable.just("Camel", "rocks", "streams", "as", "well")
        // upper case the word
        .map(String::toUpperCase)
        // log the big number
        .doOnNext(LOG::info)
        // start the subscriber so it runs
        .subscribe();
}
 
@Incoming("count")
@Outgoing("sink")
public ProcessorBuilder<Message<Integer>, Message<String>> process() {
    return ReactiveStreams.<Message<Integer>> builder()
            .map(Message::getPayload)
            .map(i -> i + 1)
            .flatMapRsPublisher(i -> Flowable.just(i, i))
            .map(i -> Integer.toString(i))
            .map(Message::of);
}
 
源代码19 项目: HighLite   文件: DeleteOperation.java
/**
 * Deletes one or more records from a table, non-blocking operation.
 *
 * @param strategy the backpressure strategy used for the {@link Flowable}.
 *                 (see {@link BackpressureStrategy})
 * @return a {@link Flowable<Integer>} where the number of records deleted is passed
 * as the parameter to {@link io.reactivex.observers.DisposableObserver#onNext(Object)}
 */
@Override
public Flowable<Integer> asFlowable(BackpressureStrategy strategy) {
    return Flowable.fromCallable(new Callable<Integer>() {
        @Override
        public Integer call() {
            return executeBlocking();
        }
    });
}
 
@Test
public void testStreamCompletingAsynchronously() {
    Optional<T> instance = createInstanceCompletingAsynchronously();
    if (!instance.isPresent()) {
        // Test ignored
        return;
    }
    Publisher<String> publisher = converter().toRSPublisher(instance.get());
    assertThat(Flowable.fromPublisher(publisher).isEmpty().blockingGet()).isTrue();
}
 
@SuppressWarnings("unchecked")
@Override
public <I, O> ProcessingStage<I, O> create(Engine engine, Stage.OnErrorResume stage) {
    Function<Throwable, I> function = (Function<Throwable, I>) Objects.requireNonNull(stage).getFunction();
    Objects.requireNonNull(function);
    return source -> (Flowable<O>) RxJavaPlugins.onAssembly(new OnErrorReturn<>(source, function));
}
 
源代码22 项目: Java-9-Spring-Webflux   文件: GitHubServiceImpl.java
@Override
public Flowable<CommittedFile> getCommittedFilesByUrl(String url) {
    return Flowable.create(emitter -> {
        SingleCommit commit = gitHbubRepos.getSingleCommitByUrl(url);
        log.info(commit.toString());
        commit.getFiles().forEach(files -> {
            log.info(files.toString());
            emitter.onNext(files);
        });
        emitter.onComplete();
    }, BackpressureStrategy.BUFFER);
}
 
private Single<Map<Membership, Map<Permission, Set<Acl>>>> findMembershipPermissions(User user, Stream<Map.Entry<ReferenceType, String>> referenceStream) {

        return groupService.findByMember(user.getId())
                .flattenAsFlowable(groups -> groups)
                .map(Group::getId)
                .toList()
                .flatMap(userGroupIds -> {
                    MembershipCriteria criteria = new MembershipCriteria();
                    criteria.setUserId(user.getId());
                    criteria.setGroupIds(userGroupIds.isEmpty() ? null : userGroupIds);
                    criteria.setLogicalOR(true);

                    // Get all user and group memberships.
                    return Flowable.merge(referenceStream.map(p -> membershipService.findByCriteria(p.getKey(), p.getValue(), criteria)).collect(Collectors.toList()))
                            .toList()
                            .flatMap(allMemberships -> {

                                if (allMemberships.isEmpty()) {
                                    return Single.just(Collections.emptyMap());
                                }

                                // Get all roles.
                                return roleService.findByIdIn(allMemberships.stream().map(Membership::getRoleId).collect(Collectors.toList()))
                                        .map(allRoles -> permissionsPerMembership(allMemberships, allRoles));
                            });
                });
    }
 
源代码24 项目: storio   文件: PreparedGetCursorTest.java
@Test
public void shouldWrapExceptionIntoStorIOExceptionForFlowable() {
    final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);

    when(storIOSQLite.observeChanges(any(BackpressureStrategy.class))).thenReturn(Flowable.<Changes>empty());

    //noinspection unchecked
    final GetResolver<Cursor> getResolver = mock(GetResolver.class);

    when(getResolver.performGet(eq(storIOSQLite), any(Query.class)))
            .thenThrow(new IllegalStateException("test exception"));

    final TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();

    new PreparedGetCursor.Builder(storIOSQLite)
            .withQuery(Query.builder().table("test_table").observesTags("test_tag").build())
            .withGetResolver(getResolver)
            .prepare()
            .asRxFlowable(LATEST)
            .subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent(60, SECONDS);
    testSubscriber.assertError(StorIOException.class);

    StorIOException storIOException = (StorIOException) testSubscriber.errors().get(0);
    IllegalStateException cause = (IllegalStateException) storIOException.getCause();
    assertThat(cause).hasMessage("test exception");

    testSubscriber.dispose();
}
 
源代码25 项目: rxjava2-extras   文件: TransformersTest.java
@Test
public void testStatisticsOnEmptyStream() {
    Flowable<Integer> nums = Flowable.empty();
    Statistics s = nums.compose(Transformers.<Integer>collectStats()).blockingLast();
    assertEquals(0, s.count());
    assertEquals(0, s.sum(), 0.0001);
    assertTrue(Double.isNaN(s.mean()));
    assertTrue(Double.isNaN(s.sd()));
}
 
@Test
public void supports() throws Exception {
	testSupports(this.testMethod.arg(httpEntityType(String.class)));
	testSupports(this.testMethod.arg(httpEntityType(Mono.class, String.class)));
	testSupports(this.testMethod.arg(httpEntityType(Single.class, String.class)));
	testSupports(this.testMethod.arg(httpEntityType(io.reactivex.Single.class, String.class)));
	testSupports(this.testMethod.arg(httpEntityType(Maybe.class, String.class)));
	testSupports(this.testMethod.arg(httpEntityType(CompletableFuture.class, String.class)));
	testSupports(this.testMethod.arg(httpEntityType(Flux.class, String.class)));
	testSupports(this.testMethod.arg(httpEntityType(Observable.class, String.class)));
	testSupports(this.testMethod.arg(httpEntityType(io.reactivex.Observable.class, String.class)));
	testSupports(this.testMethod.arg(httpEntityType(Flowable.class, String.class)));
	testSupports(this.testMethod.arg(forClassWithGenerics(RequestEntity.class, String.class)));
}
 
@Override
public Flowable<CommittedFile> getCommittedFilesByUrl(String url) {
    return Flowable.create(emitter -> {
        SingleCommit commit = gitHbubRepos.getSingleCommitByUrl(url);
        log.info(commit.toString());
        commit.getFiles().forEach(files -> {
            log.info(files.toString());
            emitter.onNext(files);
        });
        emitter.onComplete();
    }, BackpressureStrategy.BUFFER);
}
 
@SuppressLint("CheckResult")
@Override
protected void startRefresh(HandleBase<StickyItem> refreshData) {
    Flowable.just(refreshData)
            .onBackpressureDrop()
            .observeOn(Schedulers.computation())
            .map(handleBase -> handleRefresh(handleBase.getNewData(), handleBase.getNewHeader(), handleBase.getNewFooter(), handleBase.getLevel(), handleBase.getRefreshType()))
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::handleResult);
}
 
源代码29 项目: resilience4j   文件: FlowableRateLimiterTest.java
@Test
public void shouldEmitSingleEventWithSinglePermit() {
    given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(0).toNanos());

    Flowable.just(1)
        .compose(RateLimiterOperator.of(rateLimiter))
        .test()
        .assertResult(1);
}
 
源代码30 项目: tutorials   文件: MaybeUnitTest.java
@Test
public void whenEmitsNoValue_thenSignalsCompletionAndNoValueObserved() {
    Maybe<Integer> maybe = Flowable.just(1, 2, 3, 4, 5)
        .skip(5)
        .firstElement();

    maybe.test()
        .assertComplete()
        .assertNoValues();
}
 
 类所在包
 同包方法