下面列出了io.reactivex.Flowable#fromArray ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
/**
* 遍历集合进行处理(IO线程处理,UI线程显示)
*
* @param rxIteratorTask
* @param errorConsumer 出错的处理
* @return
*/
public static <T, R> Disposable executeRxIteratorTask(final RxIteratorTask<T, R> rxIteratorTask, @NonNull Consumer<Throwable> errorConsumer) {
Flowable<T> flowable = rxIteratorTask.isArray() ? Flowable.fromArray(rxIteratorTask.getArray()) : Flowable.fromIterable(rxIteratorTask.getIterable());
return flowable.map(new Function<T, R>() {
@Override
public R apply(T t) throws Exception {
return rxIteratorTask.doInIOThread(t);
}
}).compose(RxSchedulerUtils.<R>_io_main_f())
.subscribe(new Consumer<R>() {
@Override
public void accept(R r) throws Exception {
rxIteratorTask.doInUIThread(r);
}
}, errorConsumer);
}
@Test
public void createUniFromRx() {
// tag::uni-create[]
Completable completable = Completable.complete();
Single<String> single = Single.just("hello");
Maybe<String> maybe = Maybe.just("hello");
Maybe<String> emptyMaybe = Maybe.empty();
Observable<String> observable = Observable.fromArray("a", "b", "c");
Flowable<String> flowable = Flowable.fromArray("a", "b", "c");
Uni<Void> uniFromCompletable = Uni.createFrom().converter(UniRxConverters.fromCompletable(), completable);
Uni<String> uniFromSingle = Uni.createFrom().converter(UniRxConverters.fromSingle(), single);
Uni<String> uniFromMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), maybe);
Uni<String> uniFromEmptyMaybe = Uni.createFrom().converter(UniRxConverters.fromMaybe(), emptyMaybe);
Uni<String> uniFromObservable = Uni.createFrom().converter(UniRxConverters.fromObservable(), observable);
Uni<String> uniFromFlowable = Uni.createFrom().converter(UniRxConverters.fromFlowable(), flowable);
Uni<String> uniFromPublisher = Uni.createFrom().publisher(flowable);
// end::uni-create[]
assertThat(uniFromCompletable.await().indefinitely()).isNull();
assertThat(uniFromSingle.await().indefinitely()).isEqualTo("hello");
assertThat(uniFromMaybe.await().indefinitely()).isEqualTo("hello");
assertThat(uniFromEmptyMaybe.await().indefinitely()).isNull();
assertThat(uniFromObservable.await().indefinitely()).isEqualTo("a");
assertThat(uniFromFlowable.await().indefinitely()).isEqualTo("a");
assertThat(uniFromPublisher.await().indefinitely()).isEqualTo("a");
}
@Test
public void createMultiFromRx() {
// tag::multi-create[]
Completable completable = Completable.complete();
Single<String> single = Single.just("hello");
Maybe<String> maybe = Maybe.just("hello");
Maybe<String> emptyMaybe = Maybe.empty();
Observable<String> observable = Observable.fromArray("a", "b", "c");
Flowable<String> flowable = Flowable.fromArray("a", "b", "c");
Multi<Void> multiFromCompletable = Multi.createFrom()
.converter(MultiRxConverters.fromCompletable(), completable);
Multi<String> multiFromSingle = Multi.createFrom().converter(MultiRxConverters.fromSingle(), single);
Multi<String> multiFromMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), maybe);
Multi<String> multiFromEmptyMaybe = Multi.createFrom().converter(MultiRxConverters.fromMaybe(), emptyMaybe);
Multi<String> multiFromObservable = Multi.createFrom()
.converter(MultiRxConverters.fromObservable(), observable);
Multi<String> multiFromFlowable = Multi.createFrom().converter(MultiRxConverters.fromFlowable(), flowable);
Multi<String> multiFromPublisher = Multi.createFrom().publisher(flowable);
// end::multi-create[]
assertThat(multiFromCompletable.collectItems().first().await().indefinitely()).isNull();
assertThat(multiFromSingle.collectItems().first().await().indefinitely()).isEqualTo("hello");
assertThat(multiFromMaybe.collectItems().first().await().indefinitely()).isEqualTo("hello");
assertThat(multiFromEmptyMaybe.collectItems().first().await().indefinitely()).isNull();
assertThat(multiFromObservable.collectItems().asList().await().indefinitely()).containsExactly("a", "b", "c");
assertThat(multiFromFlowable.collectItems().asList().await().indefinitely()).containsExactly("a", "b", "c");
assertThat(multiFromPublisher.collectItems().asList().await().indefinitely()).containsExactly("a", "b", "c");
}
@Test
public void publisherToMono() {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
Object target = getAdapter(Mono.class).fromPublisher(source);
assertTrue(target instanceof Mono);
assertEquals(Integer.valueOf(1), ((Mono<Integer>) target).block(Duration.ofMillis(1000)));
}
@Test
public void publisherToCompletableFuture() throws Exception {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
Object target = getAdapter(CompletableFuture.class).fromPublisher(source);
assertTrue(target instanceof CompletableFuture);
assertEquals(Integer.valueOf(1), ((CompletableFuture<Integer>) target).get());
}
@Transactional(value = TxType.REQUIRES_NEW)
public Flowable<String> doInTxPublisher() {
Assertions.assertEquals(0, ContextEntity.count());
ContextEntity entity = new ContextEntity();
entity.name = "Stef";
entity.persist();
return Flowable.fromArray("OK");
}
public BenchmarkRxServerServiceImpl(int times) {
Messages.SimpleResponse[] array = new Messages.SimpleResponse[times];
Arrays.fill(array, Messages.SimpleResponse.getDefaultInstance());
this.responseFlux = Flowable.fromArray(array);
this.responseMono = Single.just(Messages.SimpleResponse.getDefaultInstance());
}
@Test
public void publisherToMono() {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
Object target = getAdapter(Mono.class).fromPublisher(source);
assertTrue(target instanceof Mono);
assertEquals(Integer.valueOf(1), ((Mono<Integer>) target).block(Duration.ofMillis(1000)));
}
@Test
public void publisherToCompletableFuture() throws Exception {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
Object target = getAdapter(CompletableFuture.class).fromPublisher(source);
assertTrue(target instanceof CompletableFuture);
assertEquals(Integer.valueOf(1), ((CompletableFuture<Integer>) target).get());
}
@Test
public void publisherToRxSingle() {
Publisher<Integer> source = Flowable.fromArray(1);
Object target = getAdapter(rx.Single.class).fromPublisher(source);
assertTrue(target instanceof rx.Single);
assertEquals(Integer.valueOf(1), ((rx.Single<Integer>) target).toBlocking().value());
}
@Test
public void publisherToRxCompletable() {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
Object target = getAdapter(rx.Completable.class).fromPublisher(source);
assertTrue(target instanceof rx.Completable);
assertNull(((rx.Completable) target).get());
}
@Test
public void publisherToReactivexCompletable() {
Publisher<Integer> source = Flowable.fromArray(1, 2, 3);
Object target = getAdapter(io.reactivex.Completable.class).fromPublisher(source);
assertTrue(target instanceof io.reactivex.Completable);
assertNull(((io.reactivex.Completable) target).blockingGet());
}
@Setup
public void setup() {
Integer[] array1 = new Integer[firstLen];
Arrays.fill(array1, 777);
Integer[] array2 = new Integer[secondLen];
Arrays.fill(array2, 777);
baseline = Flowable.fromArray(firstLen < secondLen ? array2 : array1);
Flowable<Integer> o1 = Flowable.fromArray(array1);
Flowable<Integer> o2 = Flowable.fromArray(array2);
BiFunction<Integer, Integer, Integer> plus = (a, b) -> a + b;
bothSync = Flowable.zip(o1, o2, plus);
firstSync = Flowable.zip(o1, o2.subscribeOn(Schedulers.computation()), plus);
secondSync = Flowable.zip(o1.subscribeOn(Schedulers.computation()), o2, plus);
bothAsync = Flowable.zip(o1.subscribeOn(Schedulers.computation()), o2.subscribeOn(Schedulers.computation()), plus);
small = Math.min(firstLen, secondLen) < 100;
}
@Override
public RecordStreamWithMetadata<String, String> recordStream(AtomicBoolean shutdownRequest) throws IOException {
return new RecordStreamWithMetadata<>(Flowable.fromArray(this.stream),
GlobalMetadata.<String>builder().schema("schema").build());
}
private static ReactiveFetcher<String> create(String ... values) {
FakeBackend backend = new FakeBackend(Flowable.fromArray(values));
return ReactiveFetcherDelegate.of(Query.of(TypeHolder.StringHolder.class), backend.open(TypeHolder.StringHolder.class));
}
@Outgoing("Z1")
public Flowable<String> z1() {
return Flowable.fromArray("d", "e", "f");
}
@Outgoing("Z1")
public Flowable<String> z1() {
return Flowable.fromArray("d", "e", "f");
}
@Outgoing("Z1")
public Flowable<String> z1() {
return Flowable.fromArray("d", "e", "f");
}
@Outgoing("hello")
public Publisher<String> hello() {
return Flowable.fromArray("h", "e", "l", "l", "o");
}
@Outgoing("hello")
public Publisher<String> hello() {
return Flowable.fromArray("h", "e", "l", "l", "o");
}