下面列出了retrofit2.http.GET#io.reactivex.Flowable 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@SuppressWarnings("unchecked")
@Test
public void testRetryWhenSpecificExceptionFails() {
Exception ex = new IllegalArgumentException("boo");
TestSubscriber<Integer> ts = TestSubscriber.create();
TestScheduler scheduler = new TestScheduler();
Flowable.just(1, 2)
// force error after 3 emissions
.concatWith(Flowable.<Integer>error(ex))
// retry with backoff
.retryWhen(RetryWhen.maxRetries(2).action(log).exponentialBackoff(1, TimeUnit.MINUTES)
.scheduler(scheduler).failWhenInstanceOf(IllegalArgumentException.class).build())
// go
.subscribe(ts);
ts.assertValues(1, 2);
ts.assertError(ex);
}
@Override
public Flowable<Fortune> fortunes() {
return Flowable.create(sink ->
pgClients.getOne().preparedQuery("SELECT * FROM fortune", ar -> {
if (ar.failed()) {
sink.onError(ar.cause());
return;
}
PgIterator resultSet = ar.result().iterator();
while (resultSet.hasNext()) {
Tuple row = resultSet.next();
sink.onNext(new Fortune(row.getInteger(0), row.getString(1)));
}
sink.onComplete();
}), BackpressureStrategy.BUFFER);
}
private static Flowable<Boolean> insertPostOption(String option) {
PostOption postOption = new PostOption();
postOption.option = option;
postOption.lastUsed = System.currentTimeMillis();
BriteDatabase db = MimiApplication.getInstance().getBriteDatabase();
BriteDatabase.Transaction transaction = db.newTransaction();
long val = 0;
try {
val = db.insert(PostOption.TABLE_NAME, SQLiteDatabase.CONFLICT_REPLACE, postOption.toContentValues());
transaction.markSuccessful();
} catch (Exception e) {
Log.e(LOG_TAG, "Error putting post options into the database", e);
}
finally {
transaction.end();
}
return Flowable.just(val > 0);
}
@Test
public void testWithPublishers() {
AtomicBoolean subscribed = new AtomicBoolean();
MultiAssertSubscriber<String> subscriber = Multi.createFrom().items(
Flowable.just("a", "b", "c"),
Flowable.just("d", "e"),
Flowable.empty(),
Flowable.just("f", "g")
.doOnSubscribe(s -> subscribed.set(true)))
.onItem().<String> disjoint()
.subscribe().withSubscriber(MultiAssertSubscriber.create(4));
assertThat(subscribed).isFalse();
subscriber.assertReceived("a", "b", "c", "d");
subscriber.request(3);
subscriber.assertCompletedSuccessfully();
assertThat(subscriber.items()).contains("e", "f", "g");
assertThat(subscribed).isTrue();
}
private void computeEvaluation(WebClient webClient, Handler<AsyncResult<Double>> resultHandler) {
// We need to call the service for each company in which we own shares
Flowable.fromIterable(portfolio.getShares().entrySet())
// For each, we retrieve the value
.flatMapSingle(entry -> getValueForCompany(webClient, entry.getKey(), entry.getValue()))
// We accumulate the results
.toList()
// And compute the sum
.map(list -> list.stream().mapToDouble(x -> x).sum())
// We report the result or failure
.subscribe((sum, err) -> {
if (err != null) {
resultHandler.handle(Future.failedFuture(err));
} else {
resultHandler.handle(Future.succeededFuture(sum));
}
});
}
public Flowable<TransferEventResponse> transferEventFlowable(BcosFilter filter) {
return web3j.logFlowable(filter)
.map(
new io.reactivex.functions.Function<Log, TransferEventResponse>() {
@Override
public TransferEventResponse apply(Log log) {
EventValuesWithLog eventValues =
extractEventParametersWithLog(TRANSFER_EVENT, log);
TransferEventResponse typedResponse = new TransferEventResponse();
typedResponse.log = log;
typedResponse.from =
(String) eventValues.getIndexedValues().get(0).getValue();
typedResponse.to =
(String) eventValues.getIndexedValues().get(1).getValue();
typedResponse.tokens =
(BigInteger)
eventValues.getNonIndexedValues().get(0).getValue();
return typedResponse;
}
});
}
@SuppressWarnings("unchecked")
@Test
public void testSinkUsingChannelName() throws InterruptedException {
String topic = UUID.randomUUID().toString();
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger expected = new AtomicInteger(0);
usage.consumeIntegers(topic, 10, 10, TimeUnit.SECONDS,
latch::countDown,
v -> expected.getAndIncrement());
Map<String, Object> config = new HashMap<>();
config.put("channel-name", topic);
config.put("host", address);
config.put("port", port);
MqttSink sink = new MqttSink(vertx, new MqttConnectorOutgoingConfiguration(new MapBasedConfig(config)));
Subscriber<? extends Message<?>> subscriber = sink.getSink().build();
Flowable.range(0, 10)
.map(Message::of)
.subscribe((Subscriber<? super Message<Integer>>) subscriber);
assertThat(latch.await(1, TimeUnit.MINUTES)).isTrue();
await().untilAtomic(expected, is(10));
assertThat(expected).hasValue(10);
}
@Override
protected void onResume() {
super.onResume();
boolean isLogin = BaseApplication.getAppComponent().getSharedPreferences().getBoolean(ConstantUtil.LOGIN_STATUS, false);
if (isLogin) {
Flowable.timer(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(aLong -> {
LogUtil.e("该用户有缓存数据,直接跳转到主界面");
updateUserInfo();
});
} else {
Flowable.timer(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(aLong -> {
LogUtil.e("该用户无缓存数据,直接跳转到登录界面");
LoginActivity.start(SplashActivity.this, null);
finish();
});
}
}
/**
* Create a mapper from retrofit service to {@link Resource} with rx's {@link Flowable}
* To indicate current state while execute an rest api (loading, error, success with status and message if error)
* @param remote from retrofit service
* @param onSave will be called after success response come, to save response data into local database
* @param <T> type of response
* @return a {@link Flowable} instance to deal with progress showing and error handling
*/
public static <T> Flowable<Resource<T>> createRemoteSourceMapper(@Nullable Single<T> remote,
@Nullable PlainConsumer<T> onSave) {
return Flowable.create(emitter -> {
new SimpleRemoteSourceMapper<T>(emitter) {
@Override
public Single<T> getRemote() {
return remote;
}
@Override
public void saveCallResult(T data) {
if (onSave != null) {
onSave.accept(data);
}
}
};
}, BackpressureStrategy.BUFFER);
}
@Test
public void testUpdateWithTestDatabaseForReadme() {
try (Database db = db()) {
db //
.update("update person set date_of_birth = ?") //
.parameterStream(Flowable.just(Parameter.NULL)) //
.counts() //
.test() //
.awaitDone(TIMEOUT_SECONDS, TimeUnit.SECONDS) //
.assertValue(3) //
.assertComplete();
}
}
@Incoming("count")
@Outgoing("sink")
public Flowable<String> process(Flowable<Integer> source) {
return source
.map(i -> i + 1)
.flatMap(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i));
}
/**
* start and execute the streamer flow
*/
@SuppressWarnings("unchecked")
public final void execute() {
log.info("starting the stream for the ignite source/sink flow");
if (igniteCacheSourceTask.isStopped()) throw new IllegalStateException("Ignite source task is not yet started");
if (igniteCacheSinkTask.isStopped()) throw new IllegalStateException("Ignite sink task is not yet started");
//noinspection unchecked
Flowable.fromCallable(igniteCacheSourceTask::poll)
.repeatWhen(flowable -> flowable.delay(periodicInterval, TimeUnit.MILLISECONDS))
.retry(retryTimes, retryPredicate)
.doOnError(throwable -> streamReport.addErrorMsg(throwable.getMessage()))
.doOnNext(igniteCacheSinkTask::put)
.doOnError(throwable -> streamReport.addErrorMsg(throwable.getMessage()))
.doOnNext(data -> {
if (null != data && !data.isEmpty() && atomicInteger.addAndGet(data.size()) % flushBufferSize == 0) {
igniteCacheSinkTask.flush();
}
})
.retry(retryTimes, retryPredicate)
.doOnError(throwable -> streamReport.addErrorMsg(throwable.getMessage()))
.doFinally(() -> {
log.info("cleaning and stopping ignite tasks from the stream");
if (log.isDebugEnabled()) {
log.debug("final execution report: error messages : {}, Total streamed record number : {}", streamReport.getErrorMsgs().toArray(), streamReport.getAddedRecords());
}
igniteCacheSinkTask.stop();
igniteCacheSourceTask.stop();
})
.subscribe(onNextAction, onErrorAction);
}
@Override
public Publisher<Long> createPublisher(long elements) {
AtomicLong counter = new AtomicLong();
ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1);
scheduledExecutor.setRemoveOnCancelPolicy(true);
return Flowable.fromPublisher(new ScheduledPublisher<>(
() -> Flowable.just(counter.getAndIncrement()).delaySubscription(100, TimeUnit.MILLISECONDS),
100,
TimeUnit.MILLISECONDS,
scheduledExecutor
)).take(elements);
}
@Test
public void applyForTables_throwsIfTagsNull() {
expectedException.expect(NullPointerException.class);
expectedException.expectMessage(equalTo("Set of tags can not be null"));
expectedException.expectCause(nullValue(Throwable.class));
//noinspection ConstantConditions
ChangesFilter.applyForTags(Flowable.<Changes>empty(), null);
}
@Test
public void getExistedObjectExecuteAsFlowable() {
final TestSubscriber<Changes> changesTestSubscriber = new TestSubscriber<Changes>();
storIOContentResolver
.observeChangesOfUri(TestItem.CONTENT_URI, BackpressureStrategy.MISSING)
.take(1)
.subscribe(changesTestSubscriber);
TestItem expectedItem = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, expectedItem.toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value1").toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value2").toContentValues());
Flowable<Optional<TestItem>> testItemFlowable = storIOContentResolver
.get()
.object(TestItem.class)
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.where(TestItem.COLUMN_VALUE + "=?")
.whereArgs("value")
.build())
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.take(1);
TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();
testItemFlowable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
List<Optional<TestItem>> emmitedItems = testSubscriber.values();
assertThat(emmitedItems.size()).isEqualTo(1);
assertThat(expectedItem.equalsWithoutId(emmitedItems.get(0).get())).isTrue();
changesTestSubscriber.awaitTerminalEvent(60, SECONDS);
changesTestSubscriber.assertNoErrors();
changesTestSubscriber.assertValue(Changes.newInstance(TestItem.CONTENT_URI));
}
private Flowable<CameraUpdate> createLocationLockCameraUpdateFlowable(BooleanOrError lockState) {
if (!lockState.isTrue()) {
return Flowable.empty();
}
// The first update pans and zooms the camera to the appropriate zoom level; subsequent ones
// only pan the map.
Flowable<Point> locationUpdates = locationManager.getLocationUpdates();
return locationUpdates
.take(1)
.map(CameraUpdate::panAndZoom)
.concatWith(locationUpdates.map(CameraUpdate::pan).skip(1));
}
@Test
public void testFirst() throws Exception {
LOG.info("Starting RX-Java2 Flowable first");
// use stream engine to subscribe from a timer interval that runs a continued
// stream with data once per second
Flowable.just("Camel", "rocks", "streams", "as", "well")
// upper case the word
.map(String::toUpperCase)
// log the big number
.doOnNext(LOG::info)
// start the subscriber so it runs
.subscribe();
}
@Incoming("count")
@Outgoing("sink")
public ProcessorBuilder<Message<Integer>, Message<String>> process() {
return ReactiveStreams.<Message<Integer>> builder()
.map(Message::getPayload)
.map(i -> i + 1)
.flatMapRsPublisher(i -> Flowable.just(i, i))
.map(i -> Integer.toString(i))
.map(Message::of);
}
/**
* Deletes one or more records from a table, non-blocking operation.
*
* @param strategy the backpressure strategy used for the {@link Flowable}.
* (see {@link BackpressureStrategy})
* @return a {@link Flowable<Integer>} where the number of records deleted is passed
* as the parameter to {@link io.reactivex.observers.DisposableObserver#onNext(Object)}
*/
@Override
public Flowable<Integer> asFlowable(BackpressureStrategy strategy) {
return Flowable.fromCallable(new Callable<Integer>() {
@Override
public Integer call() {
return executeBlocking();
}
});
}
@Test
public void testStreamCompletingAsynchronously() {
Optional<T> instance = createInstanceCompletingAsynchronously();
if (!instance.isPresent()) {
// Test ignored
return;
}
Publisher<String> publisher = converter().toRSPublisher(instance.get());
assertThat(Flowable.fromPublisher(publisher).isEmpty().blockingGet()).isTrue();
}
@SuppressWarnings("unchecked")
@Override
public <I, O> ProcessingStage<I, O> create(Engine engine, Stage.OnErrorResume stage) {
Function<Throwable, I> function = (Function<Throwable, I>) Objects.requireNonNull(stage).getFunction();
Objects.requireNonNull(function);
return source -> (Flowable<O>) RxJavaPlugins.onAssembly(new OnErrorReturn<>(source, function));
}
@Override
public Flowable<CommittedFile> getCommittedFilesByUrl(String url) {
return Flowable.create(emitter -> {
SingleCommit commit = gitHbubRepos.getSingleCommitByUrl(url);
log.info(commit.toString());
commit.getFiles().forEach(files -> {
log.info(files.toString());
emitter.onNext(files);
});
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
}
private Single<Map<Membership, Map<Permission, Set<Acl>>>> findMembershipPermissions(User user, Stream<Map.Entry<ReferenceType, String>> referenceStream) {
return groupService.findByMember(user.getId())
.flattenAsFlowable(groups -> groups)
.map(Group::getId)
.toList()
.flatMap(userGroupIds -> {
MembershipCriteria criteria = new MembershipCriteria();
criteria.setUserId(user.getId());
criteria.setGroupIds(userGroupIds.isEmpty() ? null : userGroupIds);
criteria.setLogicalOR(true);
// Get all user and group memberships.
return Flowable.merge(referenceStream.map(p -> membershipService.findByCriteria(p.getKey(), p.getValue(), criteria)).collect(Collectors.toList()))
.toList()
.flatMap(allMemberships -> {
if (allMemberships.isEmpty()) {
return Single.just(Collections.emptyMap());
}
// Get all roles.
return roleService.findByIdIn(allMemberships.stream().map(Membership::getRoleId).collect(Collectors.toList()))
.map(allRoles -> permissionsPerMembership(allMemberships, allRoles));
});
});
}
@Test
public void shouldWrapExceptionIntoStorIOExceptionForFlowable() {
final StorIOSQLite storIOSQLite = mock(StorIOSQLite.class);
when(storIOSQLite.observeChanges(any(BackpressureStrategy.class))).thenReturn(Flowable.<Changes>empty());
//noinspection unchecked
final GetResolver<Cursor> getResolver = mock(GetResolver.class);
when(getResolver.performGet(eq(storIOSQLite), any(Query.class)))
.thenThrow(new IllegalStateException("test exception"));
final TestSubscriber<Cursor> testSubscriber = new TestSubscriber<Cursor>();
new PreparedGetCursor.Builder(storIOSQLite)
.withQuery(Query.builder().table("test_table").observesTags("test_tag").build())
.withGetResolver(getResolver)
.prepare()
.asRxFlowable(LATEST)
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(60, SECONDS);
testSubscriber.assertError(StorIOException.class);
StorIOException storIOException = (StorIOException) testSubscriber.errors().get(0);
IllegalStateException cause = (IllegalStateException) storIOException.getCause();
assertThat(cause).hasMessage("test exception");
testSubscriber.dispose();
}
@Test
public void testStatisticsOnEmptyStream() {
Flowable<Integer> nums = Flowable.empty();
Statistics s = nums.compose(Transformers.<Integer>collectStats()).blockingLast();
assertEquals(0, s.count());
assertEquals(0, s.sum(), 0.0001);
assertTrue(Double.isNaN(s.mean()));
assertTrue(Double.isNaN(s.sd()));
}
@Test
public void supports() throws Exception {
testSupports(this.testMethod.arg(httpEntityType(String.class)));
testSupports(this.testMethod.arg(httpEntityType(Mono.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Single.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(io.reactivex.Single.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Maybe.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(CompletableFuture.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Flux.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Observable.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(io.reactivex.Observable.class, String.class)));
testSupports(this.testMethod.arg(httpEntityType(Flowable.class, String.class)));
testSupports(this.testMethod.arg(forClassWithGenerics(RequestEntity.class, String.class)));
}
@Override
public Flowable<CommittedFile> getCommittedFilesByUrl(String url) {
return Flowable.create(emitter -> {
SingleCommit commit = gitHbubRepos.getSingleCommitByUrl(url);
log.info(commit.toString());
commit.getFiles().forEach(files -> {
log.info(files.toString());
emitter.onNext(files);
});
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
}
@SuppressLint("CheckResult")
@Override
protected void startRefresh(HandleBase<StickyItem> refreshData) {
Flowable.just(refreshData)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.map(handleBase -> handleRefresh(handleBase.getNewData(), handleBase.getNewHeader(), handleBase.getNewFooter(), handleBase.getLevel(), handleBase.getRefreshType()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::handleResult);
}
@Test
public void shouldEmitSingleEventWithSinglePermit() {
given(rateLimiter.reservePermission()).willReturn(Duration.ofSeconds(0).toNanos());
Flowable.just(1)
.compose(RateLimiterOperator.of(rateLimiter))
.test()
.assertResult(1);
}
@Test
public void whenEmitsNoValue_thenSignalsCompletionAndNoValueObserved() {
Maybe<Integer> maybe = Flowable.just(1, 2, 3, 4, 5)
.skip(5)
.firstElement();
maybe.test()
.assertComplete()
.assertNoValues();
}