下面列出了io.reactivex.Flowable#fromPublisher ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testThatTwoSubscribersHaveTwoSubscriptions() {
AtomicInteger count = new AtomicInteger(1);
Publisher<Integer> publisher = Uni.createFrom().deferred(() -> Uni.createFrom()
.item(count.getAndIncrement()))
.convert().toPublisher();
assertThat(publisher).isNotNull();
Flowable<Integer> flow = Flowable.fromPublisher(publisher);
int first = flow.blockingFirst();
assertThat(first).isEqualTo(1);
first = flow.blockingFirst();
assertThat(first).isEqualTo(2);
}
@Test
public void testThatTwoSubscribersWithCache() {
AtomicInteger count = new AtomicInteger(1);
Publisher<Integer> publisher = Uni.createFrom()
.deferred(() -> Uni.createFrom().item(count.getAndIncrement())).cache().convert().toPublisher();
assertThat(publisher).isNotNull();
Flowable<Integer> flow = Flowable.fromPublisher(publisher);
int first = flow.blockingFirst();
assertThat(first).isEqualTo(1);
first = flow.blockingFirst();
assertThat(first).isEqualTo(1);
}
public static void readFile(File file) {
try (final BufferedReader br = new BufferedReader(new FileReader(file))) {
Flowable<String> flow = Flowable.fromPublisher(new FilePublisher(br));
flow.observeOn(Schedulers.io())
.blockingSubscribe(System.out::println);
} catch (IOException e) {
e.printStackTrace();
}
}
@Before
public void setup(){
just = Flowable.just(10);
none = Flowable.error(new Exception("boo"));
active = Flowable.fromPublisher(Future.future());
just2 = Flowable.just(20);
}
@Test
public void testToPublisherWithImmediateCompletion() {
Completable completable = Completable.complete();
Flowable<String> flowable = Flowable.fromPublisher(converter.toRSPublisher(completable));
String res = flowable.blockingFirst("DEFAULT");
assertThat(res).isEqualTo("DEFAULT");
}
@Override
public <O> PublisherStage<O> create(Engine engine, Stage.FromCompletionStage stage) {
Objects.requireNonNull(stage);
return () -> {
CompletionStage<O> cs = Casts.cast(Objects.requireNonNull(stage.getCompletionStage()));
return Flowable.fromPublisher(fromCompletionStage(cs, false));
};
}
@Override
public <O> PublisherStage<O> create(Engine engine, Stage.FromCompletionStageNullable stage) {
Objects.requireNonNull(stage);
return () -> {
CompletionStage<O> cs = Casts.cast(Objects.requireNonNull(stage.getCompletionStage()));
return Flowable.fromPublisher(fromCompletionStage(cs, true));
};
}
@Override
public <I, O> ProcessingStage<I, O> create(Engine engine, Stage.Coupled stage) {
Graph source = Objects.requireNonNull(stage.getPublisher());
Graph sink = Objects.requireNonNull(stage.getSubscriber());
Publisher<O> publisher = engine.buildPublisher(source);
SubscriberWithCompletionStage<I, ?> subscriber = engine.buildSubscriber(sink);
return upstream -> Flowable.fromPublisher(
new CouplingProcessor<>(upstream, subscriber.getSubscriber(), publisher));
}
@Override
public <T, R> Processor<T, R> buildProcessor(Graph graph) {
Processor<T, T> processor = new ConnectableProcessor<>();
Flowable<T> flowable = Flowable.fromPublisher(processor);
for (Stage stage : graph.getStages()) {
Operator operator = Stages.lookup(stage);
flowable = applyProcessors(flowable, stage, (ProcessorOperator) operator);
}
//noinspection unchecked
return (Processor<T, R>) new WrappedProcessor<>(processor, flowable);
}
/**
* Implements a bidirectional stream → stream call as {@link Flowable} → {@link Flowable}, where both the client
* and the server independently stream to each other.
*/
@SuppressWarnings("unchecked")
public static <TRequest, TResponse> Flowable<TResponse> manyToMany(
final Flowable<TRequest> flowableSource,
final Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate,
final CallOptions options) {
final int prefetch = RxCallOptions.getPrefetch(options);
final int lowTide = RxCallOptions.getLowTide(options);
try {
final RxSubscriberAndClientProducer<TRequest> subscriberAndGRPCProducer =
flowableSource.subscribeWith(new RxSubscriberAndClientProducer<TRequest>());
final RxClientStreamObserverAndPublisher<TResponse> observerAndPublisher =
new RxClientStreamObserverAndPublisher<TResponse>(
new com.salesforce.reactivegrpc.common.Consumer<CallStreamObserver<?>>() {
@Override
public void accept(CallStreamObserver<?> observer) {
subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) observer);
}
},
new Runnable() {
@Override
public void run() {
subscriberAndGRPCProducer.cancel();
}
},
prefetch, lowTide);
delegate.apply(observerAndPublisher);
return Flowable.fromPublisher(observerAndPublisher);
} catch (Throwable throwable) {
return Flowable.error(throwable);
}
}
@Override
public void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
LiveData<List<Dog>> dogs = monarchy.findAllMappedWithChanges(realm -> realm.where(RealmDog.class),
from -> Dog.create(from.getName()));
this.dogs = Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(getActivity(), dogs));
}
private Flowable bulk(List<Audit> audits) {
if (audits == null || audits.isEmpty()) {
return Flowable.empty();
}
return Flowable.fromPublisher(reportableCollection.bulkWrite(this.convert(audits)));
}
@Override
public Flowable<T> apply(Uni<T> uni) {
return Flowable.fromPublisher(uni.convert().toPublisher());
}
@Override
public Flowable<T> apply(Multi<T> multi) {
return Flowable.fromPublisher(multi);
}
@Override
public Flowable<T> fetch() {
return Flowable.fromPublisher(delegate.fetch());
}
public static <T1,T2,R> IO<R> merge(Publisher<T1> p1, Publisher<T2> p2, BiFunction<? super T1, ? super T2, ? extends R> fn2){
Flowable<T1> s1 = Flowable.fromPublisher(p1);
Flowable<T2> s2 = Flowable.fromPublisher(p2);
return fromPublisher(s1.zipWith(s2,(a,b)->fn2.apply(a,b)));
}
@SuppressWarnings("unchecked")
@Override
public <O> PublisherStage<O> create(Engine engine, Stage.PublisherStage stage) {
Publisher<O> publisher = (Publisher<O>) Objects.requireNonNull(Objects.requireNonNull(stage.getRsPublisher()));
return () -> Flowable.fromPublisher(publisher);
}
@Override
public <R> IO<R> unit(Publisher<R> pub) {
return new FlowableIO<>(Flowable.fromPublisher(pub));
}
@Override public Flowable<T> flowable() {
return Flowable
.fromPublisher( publishProcessor );
}
public static <T> IO<T> fromPublisher(Publisher<T> p){
return new FlowableIO<T>(Flowable.fromPublisher(p));
}