下面列出了io.reactivex.Single#fromPublisher ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Before
public void setup(){
just = Single.just(10);
none = Single.error(new Exception("boo"));
active = Single.fromPublisher(Future.future());
just2 = Single.just(20);
}
@Test
public void givenReactiveClient_whenRequested_ShouldPrintEvents() throws Exception {
ReactiveRequest request = ReactiveRequest.newBuilder(httpClient, uri())
.content(ReactiveRequest.Content.fromString(CONTENT, MediaType.TEXT_PLAIN_VALUE, UTF_8))
.build();
Publisher<ReactiveRequest.Event> requestEvents = request.requestEvents();
Publisher<ReactiveResponse.Event> responseEvents = request.responseEvents();
List<Type> requestEventTypes = new ArrayList<>();
List<ReactiveResponse.Event.Type> responseEventTypes = new ArrayList<>();
Flowable.fromPublisher(requestEvents)
.map(ReactiveRequest.Event::getType)
.subscribe(requestEventTypes::add);
Flowable.fromPublisher(responseEvents)
.map(ReactiveResponse.Event::getType)
.subscribe(responseEventTypes::add);
Single<ReactiveResponse> response = Single.fromPublisher(request.response());
int actualStatus = response.blockingGet()
.getStatus();
Assert.assertEquals(6, requestEventTypes.size());
Assert.assertEquals(5, responseEventTypes.size());
Assert.assertEquals(actualStatus, HttpStatus.OK_200);
}
@Override
public Single<T> apply(Uni<T> uni) {
return Single.fromPublisher(uni.onItem().ifNull().continueWith(defaultValue).convert().toPublisher());
}
@Override
public Single<Optional<T>> apply(Uni<T> uni) {
return Single.fromPublisher(uni.map(Optional::ofNullable).convert().toPublisher());
}
@Override
public Single<T> apply(Uni<T> uni) {
return Single.fromPublisher(uni.onItem().ifNull().failWith(NoSuchElementException::new).convert().toPublisher());
}
@Override
public Single<Long> count() {
return Single.fromPublisher(collection.countDocuments());
}
@Override
public Single<WriteResult> delete(Criterion<T> criteria) {
return Single.fromPublisher(writable.delete(criteria));
}
@Override
public Single<Long> countByDomain(String domain) {
return Single.fromPublisher(applicationsCollection.countDocuments(eq(FIELD_DOMAIN, domain)));
}
public static <T> Single<T> fromValue(MonadicValue<T> future){
return Single.fromPublisher(future);
}
@Override
public Single<WriteResult> upsertAll(Iterable<? extends T> docs) {
return Single.fromPublisher(writable.upsertAll(docs));
}
@Override
public Single<T> one() {
return Single.fromPublisher(delegate.one());
}
@Override
public <T, R> AnyM<single, R> ap(AnyM<single,? extends Function<? super T,? extends R>> fn, AnyM<single, T> apply) {
Single<T> f = future(apply);
Single<? extends Function<? super T, ? extends R>> fnF = future(fn);
Future<T> crF1 = Future.fromPublisher(f.toFlowable());
Future<? extends Function<? super T, ? extends R>> crFnF = Future.fromPublisher(fnF.toFlowable());
Single<R> res = Single.fromPublisher(crF1.zip(crFnF,(a,b)->b.apply(a)));
return SingleAnyM.anyM(res);
}
/**
* Wait until all the provided Future's to complete
*
* @see CompletableFuture#allOf(CompletableFuture...)
*
* @param fts Singles to wait on
* @return Single that completes when all the provided Futures Complete. Empty Future result, or holds an Exception
* from a provided Future that failed.
*/
public static <T> Single<T> allOf(Single<T>... fts) {
return Single.fromPublisher(Future.allOf(futures(fts)));
}
/**
* Lazily combine this Single with the supplied value via the supplied BiFunction
*
* @param single Single to combine with another value
* @param app Value to combine with supplied single
* @param fn Combiner function
* @return Combined Single
*/
public static <T1, T2, R> Single<R> combine(Single<? extends T1> single, Value<? extends T2> app,
BiFunction<? super T1, ? super T2, ? extends R> fn) {
return Single.fromPublisher(Future.fromPublisher(single.toFlowable())
.zip(app, fn));
}
/**
* Combine the provided Single with the first element (if present) in the provided Iterable using the provided BiFunction
*
* @param single Single to combine with an Iterable
* @param app Iterable to combine with a Single
* @param fn Combining function
* @return Combined Single
*/
public static <T1, T2, R> Single<R> zip(Single<? extends T1> single, Iterable<? extends T2> app,
BiFunction<? super T1, ? super T2, ? extends R> fn) {
return Single.fromPublisher(Future.fromPublisher(single.toFlowable())
.zip(app, fn));
}
/**
* Combine the provided Single with the first element (if present) in the provided Publisher using the provided BiFunction
*
* @param single Single to combine with a Publisher
* @param fn Publisher to combine with a Single
* @param app Combining function
* @return Combined Single
*/
public static <T1, T2, R> Single<R> zip(Single<? extends T1> single, BiFunction<? super T1, ? super T2, ? extends R> fn,
Publisher<? extends T2> app) {
Single<R> res = Single.fromPublisher(Future.fromPublisher(single.toFlowable()).zip(fn,app));
return res;
}
/**
* Construct a Single from Iterable by taking the first value from Iterable
*
* @param t Iterable to populate Single from
* @return Single containing first element from Iterable (or empty Single)
*/
public static <T> Single<T> fromIterable(Iterable<T> t) {
return Single.fromPublisher(Future.fromIterable(t));
}
/**
* Block until a Quorum of results have returned as determined by the provided Predicate
*
* <pre>
* {@code
*
* Single<ListX<Integer>> strings = Singles.quorum(status -> status.getCompleted() >0, Single.deferred(()->1),Single.empty(),Single.empty());
strings.get().size()
//1
*
* }
* </pre>
*
*
* @param breakout Predicate that determines whether the block should be
* continued or removed
* @param fts FutureWs to wait on results from
* @param errorHandler Consumer to handle any exceptions thrown
* @return Future which will be populated with a Quorum of results
*/
@SafeVarargs
public static <T> Single<Seq<T>> quorum(Predicate<Status<T>> breakout, Consumer<Throwable> errorHandler, Single<T>... fts) {
return Single.fromPublisher(Futures.quorum(breakout,errorHandler,futures(fts)));
}
/**
* Block until a Quorum of results have returned as determined by the provided Predicate
*
* <pre>
* {@code
*
* Single<ListX<Integer>> strings = Singles.quorum(status -> status.getCompleted() >0, Single.deferred(()->1),Single.empty(),Single.empty());
strings.get().size()
//1
*
* }
* </pre>
*
*
* @param breakout Predicate that determines whether the block should be
* continued or removed
* @param fts Singles to wait on results from
* @return Single which will be populated with a Quorum of results
*/
@SafeVarargs
public static <T> Single<Seq<T>> quorum(Predicate<Status<T>> breakout, Single<T>... fts) {
return Single.fromPublisher(Futures.quorum(breakout,futures(fts)));
}
/**
* Select the first Future to return with a successful result
*
* <pre>
* {@code
* Single<Integer> ft = Single.empty();
Single<Integer> result = Singles.firstSuccess(Single.deferred(()->1),ft);
ft.complete(10);
result.get() //1
* }
* </pre>
*
* @param fts Singles to race
* @return First Single to return with a result
*/
@SafeVarargs
public static <T> Single<T> firstSuccess(Single<T>... fts) {
return Single.fromPublisher(Future.firstSuccess(futures(fts)));
}