io.reactivex.Flowable#subscribe ( )源码实例Demo

下面列出了io.reactivex.Flowable#subscribe ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: festival   文件: AbstractResponseHandler.java
@Override
public void handle(RoutingContext routingContext, Object result) throws Exception {
    HttpServerResponse httpServerResponse = routingContext.response();
    if (result == null) {
        httpServerResponse.end();
    }
    if (result instanceof Flowable<?>) {
        Flowable<?> flowable = (Flowable<?>) result;
        flowable.subscribe(res -> {
            if (res != null) {
                doHandle(routingContext, res);
            }
        });
    } else {
        doHandle(routingContext, result);
    }
}
 
源代码2 项目: vertx-rx   文件: RxMongoClientExamples.java
public void findBatch(MongoClient mongoClient) {
  // Will match all Tolkien books
  JsonObject query = new JsonObject()
    .put("author", "J. R. R. Tolkien");

  ReadStream<JsonObject> books = mongoClient.findBatch("book", query);

  // Convert the stream to a Flowable
  Flowable<JsonObject> flowable = books.toFlowable();

  flowable.subscribe(doc -> {
    System.out.println("Found doc: " + doc.encodePrettily());
  }, throwable -> {
    throwable.printStackTrace();
  }, () -> {
    System.out.println("End of research");
  });
}
 
源代码3 项目: objectbox-java   文件: QueryObserverTest.java
@Test
public void testFlowableOneByOne() {
    publisher.setQueryResult(listResult);

    latch = new CountDownLatch(2);
    Flowable flowable = RxQuery.flowableOneByOne(mockQuery.getQuery());
    flowable.subscribe(this);
    assertLatchCountedDown(latch, 2);
    assertEquals(2, receivedChanges.size());
    assertEquals(1, receivedChanges.get(0).size());
    assertEquals(1, receivedChanges.get(1).size());
    assertNull(error);

    receivedChanges.clear();
    publisher.publish();
    assertNoMoreResults();
}
 
源代码4 项目: retrocache   文件: FlowableTest.java
@Test
public void resultRespectsBackpressure() {
    server.enqueue(new MockResponse().setBody("Hi"));

    RecordingSubscriber<Result<String>> subscriber = subscriberRule.createWithInitialRequest(0);
    Flowable<Result<String>> o = service.result();

    o.subscribe(subscriber);
    assertThat(server.getRequestCount()).isEqualTo(1);
    subscriber.assertNoEvents();

    subscriber.request(1);
    subscriber.assertAnyValue().assertComplete();

    subscriber.request(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP or notifications.
    assertThat(server.getRequestCount()).isEqualTo(1);
}
 
private void prepareItemsInDatabase(long elements) {
    if (elements <= 0) {
        return;
    }

    MongoCollection<News> collection = mongoClient().getDatabase("news")
                                                    .getCollection("news", News.class);

    Flowable<Success> successFlowable = Flowable.fromPublisher(collection.drop())
                                                .ignoreElements()
                                                .andThen(Flowable.rangeLong(0L,
                                                        elements)
                                                                 .map(l -> NewsHarness.generate())
                                                                 .buffer(500,
                                                                         TimeUnit.MILLISECONDS)
                                                                 .flatMap(collection::insertMany));

    if (elements == Long.MAX_VALUE || elements == Integer.MAX_VALUE) {
        successFlowable.subscribe();
    }
    else {
        successFlowable.blockingSubscribe();
    }
}
 
源代码6 项目: storio   文件: RxQueryTest.java
@Test
public void queryOneNonExistedObjectFlowable() {
    putUsersBlocking(3);

    final Flowable<Optional<User>> userFlowable = storIOSQLite
            .get()
            .object(User.class)
            .withQuery(Query.builder()
                    .table(UserTableMeta.TABLE)
                    .where(UserTableMeta.COLUMN_EMAIL + "=?")
                    .whereArgs("some arg")
                    .build())
            .prepare()
            .asRxFlowable(LATEST)
            .take(1);

    TestSubscriber<Optional<User>> testSubscriber = new TestSubscriber<Optional<User>>();
    userFlowable.subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent(5, SECONDS);
    testSubscriber.assertNoErrors();
    testSubscriber.assertValue(Optional.<User>empty());
}
 
源代码7 项目: retrocache   文件: FlowableTest.java
@Test
public void responseRespectsBackpressure() {
    server.enqueue(new MockResponse().setBody("Hi"));

    RecordingSubscriber<Response<String>> subscriber = subscriberRule.createWithInitialRequest(0);
    Flowable<Response<String>> o = service.response();

    o.subscribe(subscriber);
    assertThat(server.getRequestCount()).isEqualTo(1);
    subscriber.assertNoEvents();

    subscriber.request(1);
    subscriber.assertAnyValue().assertComplete();

    subscriber.request(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP or notifications.
    assertThat(server.getRequestCount()).isEqualTo(1);
}
 
源代码8 项目: storio   文件: RxQueryTest.java
@Test
public void queryOneExistedObjectFlowable() {
    final List<User> users = putUsersBlocking(3);
    final User expectedUser = users.get(0);

    final Flowable<Optional<User>> userFlowable = storIOSQLite
            .get()
            .object(User.class)
            .withQuery(Query.builder()
                    .table(UserTableMeta.TABLE)
                    .where(UserTableMeta.COLUMN_EMAIL + "=?")
                    .whereArgs(expectedUser.email())
                    .build())
            .prepare()
            .asRxFlowable(LATEST)
            .take(1);

    TestSubscriber<Optional<User>> testSubscriber = new TestSubscriber<Optional<User>>();
    userFlowable.subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent(5, SECONDS);
    testSubscriber.assertNoErrors();
    testSubscriber.assertValue(Optional.of(expectedUser));
}
 
public void write(@Nonnull final String bucket,
                  @Nonnull final String organization,
                  @Nonnull final Flowable<BatchWriteDataPoint> stream) {

    if (processor.hasComplete()) {
        throw new InfluxException(CLOSED_EXCEPTION);
    }

    stream.subscribe(
            dataPoint -> write(bucket, organization, dataPoint.point.getPrecision(), Flowable.just(dataPoint)),
            throwable -> publish(new WriteErrorEvent(throwable)));
}
 
源代码10 项目: adamant-android   文件: ChatStorageTest.java
@Test
public void concurrentOneProducerOneConsumer() {
    ChatsStorage chatsStorage = new ChatsStorage();

    Flowable<AdamantBasicMessage> producer = provideProducer(chatsStorage);
    Flowable<Integer> consumer = provideConsumer(chatsStorage, CHAT_COUNT);

    Disposable producerSubscription = producer.subscribe();
    subscriptions.add(producerSubscription);

    Integer size = consumer.blockingFirst();

    Assert.assertEquals(CHAT_COUNT, (int) size);
}
 
源代码11 项目: state-machine   文件: Processor.java
private static <K, T> GroupedFlowable<K, T> grouped(K key, final Flowable<T> o) {
    return new GroupedFlowable<K, T>(key) {
        @Override
        protected void subscribeActual(Subscriber<? super T> s) {
            o.subscribe(s);
        }
    };
}
 
源代码12 项目: cyclops   文件: FlowablesTest.java
@Test
public void testSequenceError() throws InterruptedException {
    Flowable<Single<Integer>> maybes = Flowables.sequence(Flux.just(just,none));
    AtomicBoolean error = new AtomicBoolean(false);
    maybes.subscribe(m->{
        System.out.println(m);
    },t->{
        error.set(true);
    },()->{
        System.out.println("Done");
    });

    assertThat(error.get(),equalTo(true));
}
 
源代码13 项目: cxf   文件: ReactiveIOInvoker.java
protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) {
    final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
    if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
        Disposable d = f.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
        if (d == null) {
            throw new IllegalStateException("Subscribe did not return a Disposable");
        }
    }
    return asyncResponse;
}
 
源代码14 项目: storio   文件: GetOperationTest.java
@Test
public void getOneExistedObjectTableUpdate() {
    TestItem expectedItem = TestItem.create(null, "value");

    contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value1").toContentValues());
    contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value2").toContentValues());
    contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value3").toContentValues());

    Flowable<Optional<TestItem>> testItemFlowable = storIOContentResolver
            .get()
            .object(TestItem.class)
            .withQuery(Query.builder()
                    .uri(TestItem.CONTENT_URI)
                    .where(TestItem.COLUMN_VALUE + "=?")
                    .whereArgs("value")
                    .build())
            .prepare()
            .asRxFlowable(BackpressureStrategy.MISSING)
            .take(2);

    TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();
    testItemFlowable.subscribe(testSubscriber);

    testSubscriber.awaitTerminalEvent(5, SECONDS);
    testSubscriber.assertValue(Optional.<TestItem>empty());
    testSubscriber.assertNoErrors();

    contentResolver.insert(TestItem.CONTENT_URI, expectedItem.toContentValues());

    testSubscriber.awaitTerminalEvent(5, SECONDS);
    testSubscriber.assertNoErrors();

    List<Optional<TestItem>> emittedItems = testSubscriber.values();
    assertThat(emittedItems.size()).isEqualTo(2);
    assertThat(emittedItems.get(0).isPresent()).isFalse();
    assertThat(expectedItem.equalsWithoutId(emittedItems.get(1).get())).isTrue();
}
 
源代码15 项目: akarnokd-misc   文件: NoWebMain.java
public static void main(String[] args) throws IOException, InterruptedException {
    Flowable<Long> flowA = Flowable.interval(100, TimeUnit.MILLISECONDS);
    Flowable<Long> flowB = Flowable.interval(200, TimeUnit.MILLISECONDS);

    Flowable<LongCouple> combined = Flowable.combineLatest(Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()),
            a -> new LongCouple((Long) a[0], (Long) a[1]),
            1)
            .onBackpressureLatest()
            .observeOn(Schedulers.newThread(), false, 1);
    combined.subscribe((longCouple -> {
        System.out.println(longCouple.aLong + ":" + longCouple.bLong);
        Thread.sleep(1000);
    }));
    Thread.sleep(10000000);
}
 
源代码16 项目: My-MVP   文件: RxJavaOperatorActivity.java
private void flowableOperatorPro() {
    Flowable<Integer> mIntegerFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> e) {
            for (int i = 0; i < 1000; i++) {
                e.onNext(i);
                Log.e(TAG, "subscribe: " + i);
                sb.append("e----> " + i + "\n");
            }
        }
    }, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

    Subscriber<Integer> mIntegerSubscriber = new Subscriber<Integer>() {
        @Override
        public void onSubscribe(Subscription s) {
            mSubscription = s;
            Log.e(TAG, "onSubscribe: s=" + s);
            s.request(Integer.MAX_VALUE);
        }

        @Override
        public void onNext(Integer integer) {
            Log.e(TAG, "onNext: integer=" + integer);

            sb.append(integer + "\n");
            logContent.setText(sb.toString());
        }

        @Override
        public void onError(Throwable t) {
            Log.e(TAG, "onError: t=" + t);
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "onComplete: ");
        }
    };

    mIntegerFlowable.subscribe(mIntegerSubscriber);


}
 
@Test
public void flowableJavaOrchestrate() throws InterruptedException {
    List<String> receivedHashValues = new ArrayList<>();

    final CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); // used to keep track of the progress of the subsequent calls

    Flowable<List<Long>> userIdFlowable = getUserIdService().register(RxFlowableInvokerProvider.class).request().rx(RxFlowableInvoker.class).get(new GenericType<List<Long>>() {
    });

    userIdFlowable.subscribe((List<Long> employeeIds) -> {
        logger.info("[FlowableExample] id-service result: {}", employeeIds);
        Flowable.fromIterable(employeeIds).subscribe(id -> {
            getNameService().register(RxFlowableInvokerProvider.class).resolveTemplate("userId", id).request().rx(RxFlowableInvoker.class).get(String.class) // gotten the name for the given userId
                    .doOnError((throwable) -> {
                        logger.warn("[FlowableExample] An error has occurred in the username request step {}", throwable.getMessage());
                    }).subscribe(userName -> {
                logger.info("[FlowableExample] name-service result: {}", userName);
                getHashService().register(RxFlowableInvokerProvider.class).resolveTemplate("rawValue", userName + id).request().rx(RxFlowableInvoker.class).get(String.class) // gotten the hash value for userId+username
                        .doOnError((throwable) -> {
                            logger.warn(" [FlowableExample] An error has occurred in the hashing request step!", throwable);
                        }).subscribe(hashValue -> {
                    logger.info("[FlowableExample] hash-service result: {}", hashValue);
                    receivedHashValues.add(hashValue);
                    completionTracker.countDown();
                });
            });
        });
    });

    // wait for async calls to complete
    try {
        // wait for inner requests to complete in 10 seconds
        if (!completionTracker.await(10, TimeUnit.SECONDS)) {
            logger.warn("[CallbackExample] Some requests didn't complete within the timeout");
        }
    } catch (InterruptedException e) {
        logger.error("Interrupted!", e);
    }

    assertThat(receivedHashValues).containsAll(expectedHashValues);
}
 
源代码18 项目: vertx-rx   文件: RxPgClientExamples.java
public void streamingQuery02Example(PgPool pool) {

    // Create an Observable
    Flowable<Row> flowable = pool.rxGetConnection().flatMapPublisher(conn -> conn
      .rxBegin()
      .flatMapPublisher(tx ->
        conn
          .rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
          .flatMapPublisher(preparedQuery -> {
            // Fetch 50 rows at a time
            RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
            return stream.toFlowable();
          })
          .doAfterTerminate(tx::commit)));

    // Then subscribe
    flowable.subscribe(new Subscriber<Row>() {

      private Subscription sub;

      @Override
      public void onSubscribe(Subscription subscription) {
        sub = subscription;
        subscription.request(1);
      }

      @Override
      public void onNext(Row row) {
        sub.request(1);
        System.out.println("User: " + row.getString("last_name"));
      }

      @Override
      public void onError(Throwable err) {
        System.out.println("Error: " + err.getMessage());
      }

      @Override
      public void onComplete() {
        System.out.println("End of stream");
      }
    });
  }
 
源代码19 项目: rxjava2-extras   文件: FlowableMatch.java
public void subscribe(Flowable<A> a, Flowable<B> b) {
    aSub = new MySubscriber<A, K>(Source.A, this, requestSize);
    bSub = new MySubscriber<B, K>(Source.B, this, requestSize);
    a.subscribe(aSub);
    b.subscribe(bSub);
}
 
源代码20 项目: storio   文件: RxChangesObserverTest.java
@Test
public void shouldRegisterOnlyOneContentObserverAfterSubscribingToFlowableOnSdkVersionGreaterThan15() {
    for (int sdkVersion = 16; sdkVersion < MAX_SDK_VERSION; sdkVersion++) {
        ContentResolver contentResolver = mock(ContentResolver.class);

        final AtomicReference<ContentObserver> contentObserver = new AtomicReference<ContentObserver>();

        doAnswer(new Answer() {
            @Override
            public Object answer(InvocationOnMock invocation) throws Throwable {
                // Save reference to ContentObserver only once to assert that it was created once
                if (contentObserver.get() == null) {
                    contentObserver.set((ContentObserver) invocation.getArguments()[2]);
                } else if (contentObserver.get() != invocation.getArguments()[2]) {
                    throw new AssertionError("More than one ContentObserver was created");
                }
                return null;
            }
        }).when(contentResolver).registerContentObserver(any(Uri.class), eq(true), any(ContentObserver.class));

        Set<Uri> uris = new HashSet<Uri>(3);
        uris.add(mock(Uri.class));
        uris.add(mock(Uri.class));
        uris.add(mock(Uri.class));

        Flowable<Changes> flowable = RxChangesObserver
                .observeChanges(
                        contentResolver,
                        uris,
                        mock(Handler.class),
                        sdkVersion,
                        BackpressureStrategy.MISSING
                );

        // Should not register ContentObserver before subscribing to Flowable
        verify(contentResolver, times(0))
                .registerContentObserver(any(Uri.class), anyBoolean(), any(ContentObserver.class));

        Disposable disposable = flowable.subscribe();

        for (Uri uri : uris) {
            // Assert that same ContentObserver was registered for all uris
            verify(contentResolver).registerContentObserver(same(uri), eq(true), same(contentObserver.get()));
        }

        disposable.dispose();
    }
}