io.reactivex.Flowable#fromPublisher ( )源码实例Demo

下面列出了io.reactivex.Flowable#fromPublisher ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testThatTwoSubscribersHaveTwoSubscriptions() {
    AtomicInteger count = new AtomicInteger(1);
    Publisher<Integer> publisher = Uni.createFrom().deferred(() -> Uni.createFrom()
            .item(count.getAndIncrement()))
            .convert().toPublisher();
    assertThat(publisher).isNotNull();
    Flowable<Integer> flow = Flowable.fromPublisher(publisher);
    int first = flow.blockingFirst();
    assertThat(first).isEqualTo(1);
    first = flow.blockingFirst();
    assertThat(first).isEqualTo(2);
}
 
源代码2 项目: smallrye-mutiny   文件: UniToPublisherTest.java
@Test
public void testThatTwoSubscribersWithCache() {
    AtomicInteger count = new AtomicInteger(1);
    Publisher<Integer> publisher = Uni.createFrom()
            .deferred(() -> Uni.createFrom().item(count.getAndIncrement())).cache().convert().toPublisher();
    assertThat(publisher).isNotNull();
    Flowable<Integer> flow = Flowable.fromPublisher(publisher);
    int first = flow.blockingFirst();
    assertThat(first).isEqualTo(1);
    first = flow.blockingFirst();
    assertThat(first).isEqualTo(1);
}
 
源代码3 项目: reactive-streams-in-java   文件: RxJavaDemo.java
public static void readFile(File file) {
    try (final BufferedReader br = new BufferedReader(new FileReader(file))) {
        Flowable<String> flow = Flowable.fromPublisher(new FilePublisher(br));

        flow.observeOn(Schedulers.io())
                .blockingSubscribe(System.out::println);

    } catch (IOException e) {
        e.printStackTrace();
    }
}
 
源代码4 项目: cyclops   文件: FlowablesTest.java
@Before
public void setup(){
    just = Flowable.just(10);
    none = Flowable.error(new Exception("boo"));
    active = Flowable.fromPublisher(Future.future());
    just2 = Flowable.just(20);
}
 
@Test
public void testToPublisherWithImmediateCompletion() {
    Completable completable = Completable.complete();
    Flowable<String> flowable = Flowable.fromPublisher(converter.toRSPublisher(completable));
    String res = flowable.blockingFirst("DEFAULT");
    assertThat(res).isEqualTo("DEFAULT");
}
 
@Override
public <O> PublisherStage<O> create(Engine engine, Stage.FromCompletionStage stage) {
    Objects.requireNonNull(stage);
    return () -> {
        CompletionStage<O> cs = Casts.cast(Objects.requireNonNull(stage.getCompletionStage()));
        return Flowable.fromPublisher(fromCompletionStage(cs, false));
    };
}
 
@Override
public <O> PublisherStage<O> create(Engine engine, Stage.FromCompletionStageNullable stage) {
    Objects.requireNonNull(stage);
    return () -> {
        CompletionStage<O> cs = Casts.cast(Objects.requireNonNull(stage.getCompletionStage()));
        return Flowable.fromPublisher(fromCompletionStage(cs, true));
    };
}
 
@Override
public <I, O> ProcessingStage<I, O> create(Engine engine, Stage.Coupled stage) {
    Graph source = Objects.requireNonNull(stage.getPublisher());
    Graph sink = Objects.requireNonNull(stage.getSubscriber());

    Publisher<O> publisher = engine.buildPublisher(source);
    SubscriberWithCompletionStage<I, ?> subscriber = engine.buildSubscriber(sink);

    return upstream -> Flowable.fromPublisher(
            new CouplingProcessor<>(upstream, subscriber.getSubscriber(), publisher));
}
 
@Override
public <T, R> Processor<T, R> buildProcessor(Graph graph) {
    Processor<T, T> processor = new ConnectableProcessor<>();

    Flowable<T> flowable = Flowable.fromPublisher(processor);
    for (Stage stage : graph.getStages()) {
        Operator operator = Stages.lookup(stage);
        flowable = applyProcessors(flowable, stage, (ProcessorOperator) operator);
    }

    //noinspection unchecked
    return (Processor<T, R>) new WrappedProcessor<>(processor, flowable);
}
 
源代码10 项目: reactive-grpc   文件: ClientCalls.java
/**
 * Implements a bidirectional stream → stream call as {@link Flowable} → {@link Flowable}, where both the client
 * and the server independently stream to each other.
 */
@SuppressWarnings("unchecked")
public static <TRequest, TResponse> Flowable<TResponse> manyToMany(
        final Flowable<TRequest> flowableSource,
        final Function<StreamObserver<TResponse>, StreamObserver<TRequest>> delegate,
        final CallOptions options) {

    final int prefetch = RxCallOptions.getPrefetch(options);
    final int lowTide = RxCallOptions.getLowTide(options);

    try {
        final RxSubscriberAndClientProducer<TRequest> subscriberAndGRPCProducer =
                flowableSource.subscribeWith(new RxSubscriberAndClientProducer<TRequest>());
        final RxClientStreamObserverAndPublisher<TResponse> observerAndPublisher =
            new RxClientStreamObserverAndPublisher<TResponse>(
                new com.salesforce.reactivegrpc.common.Consumer<CallStreamObserver<?>>() {
                    @Override
                    public void accept(CallStreamObserver<?> observer) {
                        subscriberAndGRPCProducer.subscribe((CallStreamObserver<TRequest>) observer);
                    }
                },
                new Runnable() {
                    @Override
                    public void run() {
                        subscriberAndGRPCProducer.cancel();
                    }
                },
                prefetch, lowTide);
        delegate.apply(observerAndPublisher);

        return Flowable.fromPublisher(observerAndPublisher);
    } catch (Throwable throwable) {
        return Flowable.error(throwable);
    }
}
 
源代码11 项目: realm-monarchy   文件: MappedRxFragment.java
@Override
public void onCreate(@Nullable Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    LiveData<List<Dog>> dogs = monarchy.findAllMappedWithChanges(realm -> realm.where(RealmDog.class),
                                                           from -> Dog.create(from.getName()));
    this.dogs = Flowable.fromPublisher(LiveDataReactiveStreams.toPublisher(getActivity(), dogs));
}
 
private Flowable bulk(List<Audit> audits) {
    if (audits == null || audits.isEmpty()) {
        return Flowable.empty();
    }

    return Flowable.fromPublisher(reportableCollection.bulkWrite(this.convert(audits)));
}
 
源代码13 项目: smallrye-mutiny   文件: ToFlowable.java
@Override
public Flowable<T> apply(Uni<T> uni) {
    return Flowable.fromPublisher(uni.convert().toPublisher());
}
 
源代码14 项目: smallrye-mutiny   文件: ToFlowable.java
@Override
public Flowable<T> apply(Multi<T> multi) {
    return Flowable.fromPublisher(multi);
}
 
源代码15 项目: immutables   文件: RxJavaFetcherDelegate.java
@Override
public Flowable<T> fetch() {
  return Flowable.fromPublisher(delegate.fetch());
}
 
源代码16 项目: cyclops   文件: FlowableIO.java
public static <T1,T2,R> IO<R> merge(Publisher<T1> p1, Publisher<T2> p2, BiFunction<? super T1, ? super T2, ? extends R> fn2){
    Flowable<T1> s1 = Flowable.fromPublisher(p1);
    Flowable<T2> s2 = Flowable.fromPublisher(p2);
    return fromPublisher(s1.zipWith(s2,(a,b)->fn2.apply(a,b)));
}
 
@SuppressWarnings("unchecked")
@Override
public <O> PublisherStage<O> create(Engine engine, Stage.PublisherStage stage) {
    Publisher<O> publisher = (Publisher<O>) Objects.requireNonNull(Objects.requireNonNull(stage.getRsPublisher()));
    return () -> Flowable.fromPublisher(publisher);
}
 
源代码18 项目: cyclops   文件: FlowableIO.java
@Override
public <R> IO<R> unit(Publisher<R> pub) {
    return new FlowableIO<>(Flowable.fromPublisher(pub));
}
 
源代码19 项目: pentaho-kettle   文件: BlockingQueueStreamSource.java
@Override public Flowable<T> flowable() {
  return Flowable
    .fromPublisher( publishProcessor );
}
 
源代码20 项目: cyclops   文件: FlowableIO.java
public static <T> IO<T> fromPublisher(Publisher<T> p){
    return new FlowableIO<T>(Flowable.fromPublisher(p));
}