下面列出了io.reactivex.Flowable#subscribe ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public void handle(RoutingContext routingContext, Object result) throws Exception {
HttpServerResponse httpServerResponse = routingContext.response();
if (result == null) {
httpServerResponse.end();
}
if (result instanceof Flowable<?>) {
Flowable<?> flowable = (Flowable<?>) result;
flowable.subscribe(res -> {
if (res != null) {
doHandle(routingContext, res);
}
});
} else {
doHandle(routingContext, result);
}
}
public void findBatch(MongoClient mongoClient) {
// Will match all Tolkien books
JsonObject query = new JsonObject()
.put("author", "J. R. R. Tolkien");
ReadStream<JsonObject> books = mongoClient.findBatch("book", query);
// Convert the stream to a Flowable
Flowable<JsonObject> flowable = books.toFlowable();
flowable.subscribe(doc -> {
System.out.println("Found doc: " + doc.encodePrettily());
}, throwable -> {
throwable.printStackTrace();
}, () -> {
System.out.println("End of research");
});
}
@Test
public void testFlowableOneByOne() {
publisher.setQueryResult(listResult);
latch = new CountDownLatch(2);
Flowable flowable = RxQuery.flowableOneByOne(mockQuery.getQuery());
flowable.subscribe(this);
assertLatchCountedDown(latch, 2);
assertEquals(2, receivedChanges.size());
assertEquals(1, receivedChanges.get(0).size());
assertEquals(1, receivedChanges.get(1).size());
assertNull(error);
receivedChanges.clear();
publisher.publish();
assertNoMoreResults();
}
@Test
public void resultRespectsBackpressure() {
server.enqueue(new MockResponse().setBody("Hi"));
RecordingSubscriber<Result<String>> subscriber = subscriberRule.createWithInitialRequest(0);
Flowable<Result<String>> o = service.result();
o.subscribe(subscriber);
assertThat(server.getRequestCount()).isEqualTo(1);
subscriber.assertNoEvents();
subscriber.request(1);
subscriber.assertAnyValue().assertComplete();
subscriber.request(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP or notifications.
assertThat(server.getRequestCount()).isEqualTo(1);
}
private void prepareItemsInDatabase(long elements) {
if (elements <= 0) {
return;
}
MongoCollection<News> collection = mongoClient().getDatabase("news")
.getCollection("news", News.class);
Flowable<Success> successFlowable = Flowable.fromPublisher(collection.drop())
.ignoreElements()
.andThen(Flowable.rangeLong(0L,
elements)
.map(l -> NewsHarness.generate())
.buffer(500,
TimeUnit.MILLISECONDS)
.flatMap(collection::insertMany));
if (elements == Long.MAX_VALUE || elements == Integer.MAX_VALUE) {
successFlowable.subscribe();
}
else {
successFlowable.blockingSubscribe();
}
}
@Test
public void queryOneNonExistedObjectFlowable() {
putUsersBlocking(3);
final Flowable<Optional<User>> userFlowable = storIOSQLite
.get()
.object(User.class)
.withQuery(Query.builder()
.table(UserTableMeta.TABLE)
.where(UserTableMeta.COLUMN_EMAIL + "=?")
.whereArgs("some arg")
.build())
.prepare()
.asRxFlowable(LATEST)
.take(1);
TestSubscriber<Optional<User>> testSubscriber = new TestSubscriber<Optional<User>>();
userFlowable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertValue(Optional.<User>empty());
}
@Test
public void responseRespectsBackpressure() {
server.enqueue(new MockResponse().setBody("Hi"));
RecordingSubscriber<Response<String>> subscriber = subscriberRule.createWithInitialRequest(0);
Flowable<Response<String>> o = service.response();
o.subscribe(subscriber);
assertThat(server.getRequestCount()).isEqualTo(1);
subscriber.assertNoEvents();
subscriber.request(1);
subscriber.assertAnyValue().assertComplete();
subscriber.request(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP or notifications.
assertThat(server.getRequestCount()).isEqualTo(1);
}
@Test
public void queryOneExistedObjectFlowable() {
final List<User> users = putUsersBlocking(3);
final User expectedUser = users.get(0);
final Flowable<Optional<User>> userFlowable = storIOSQLite
.get()
.object(User.class)
.withQuery(Query.builder()
.table(UserTableMeta.TABLE)
.where(UserTableMeta.COLUMN_EMAIL + "=?")
.whereArgs(expectedUser.email())
.build())
.prepare()
.asRxFlowable(LATEST)
.take(1);
TestSubscriber<Optional<User>> testSubscriber = new TestSubscriber<Optional<User>>();
userFlowable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
testSubscriber.assertValue(Optional.of(expectedUser));
}
public void write(@Nonnull final String bucket,
@Nonnull final String organization,
@Nonnull final Flowable<BatchWriteDataPoint> stream) {
if (processor.hasComplete()) {
throw new InfluxException(CLOSED_EXCEPTION);
}
stream.subscribe(
dataPoint -> write(bucket, organization, dataPoint.point.getPrecision(), Flowable.just(dataPoint)),
throwable -> publish(new WriteErrorEvent(throwable)));
}
@Test
public void concurrentOneProducerOneConsumer() {
ChatsStorage chatsStorage = new ChatsStorage();
Flowable<AdamantBasicMessage> producer = provideProducer(chatsStorage);
Flowable<Integer> consumer = provideConsumer(chatsStorage, CHAT_COUNT);
Disposable producerSubscription = producer.subscribe();
subscriptions.add(producerSubscription);
Integer size = consumer.blockingFirst();
Assert.assertEquals(CHAT_COUNT, (int) size);
}
private static <K, T> GroupedFlowable<K, T> grouped(K key, final Flowable<T> o) {
return new GroupedFlowable<K, T>(key) {
@Override
protected void subscribeActual(Subscriber<? super T> s) {
o.subscribe(s);
}
};
}
@Test
public void testSequenceError() throws InterruptedException {
Flowable<Single<Integer>> maybes = Flowables.sequence(Flux.just(just,none));
AtomicBoolean error = new AtomicBoolean(false);
maybes.subscribe(m->{
System.out.println(m);
},t->{
error.set(true);
},()->{
System.out.println("Done");
});
assertThat(error.get(),equalTo(true));
}
protected AsyncResponseImpl handleFlowable(Message inMessage, Flowable<?> f) {
final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
if (!isStreamingSubscriberUsed(f, asyncResponse, inMessage)) {
Disposable d = f.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
if (d == null) {
throw new IllegalStateException("Subscribe did not return a Disposable");
}
}
return asyncResponse;
}
@Test
public void getOneExistedObjectTableUpdate() {
TestItem expectedItem = TestItem.create(null, "value");
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value1").toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value2").toContentValues());
contentResolver.insert(TestItem.CONTENT_URI, TestItem.create(null, "value3").toContentValues());
Flowable<Optional<TestItem>> testItemFlowable = storIOContentResolver
.get()
.object(TestItem.class)
.withQuery(Query.builder()
.uri(TestItem.CONTENT_URI)
.where(TestItem.COLUMN_VALUE + "=?")
.whereArgs("value")
.build())
.prepare()
.asRxFlowable(BackpressureStrategy.MISSING)
.take(2);
TestSubscriber<Optional<TestItem>> testSubscriber = new TestSubscriber<Optional<TestItem>>();
testItemFlowable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertValue(Optional.<TestItem>empty());
testSubscriber.assertNoErrors();
contentResolver.insert(TestItem.CONTENT_URI, expectedItem.toContentValues());
testSubscriber.awaitTerminalEvent(5, SECONDS);
testSubscriber.assertNoErrors();
List<Optional<TestItem>> emittedItems = testSubscriber.values();
assertThat(emittedItems.size()).isEqualTo(2);
assertThat(emittedItems.get(0).isPresent()).isFalse();
assertThat(expectedItem.equalsWithoutId(emittedItems.get(1).get())).isTrue();
}
public static void main(String[] args) throws IOException, InterruptedException {
Flowable<Long> flowA = Flowable.interval(100, TimeUnit.MILLISECONDS);
Flowable<Long> flowB = Flowable.interval(200, TimeUnit.MILLISECONDS);
Flowable<LongCouple> combined = Flowable.combineLatest(Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()),
a -> new LongCouple((Long) a[0], (Long) a[1]),
1)
.onBackpressureLatest()
.observeOn(Schedulers.newThread(), false, 1);
combined.subscribe((longCouple -> {
System.out.println(longCouple.aLong + ":" + longCouple.bLong);
Thread.sleep(1000);
}));
Thread.sleep(10000000);
}
private void flowableOperatorPro() {
Flowable<Integer> mIntegerFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) {
for (int i = 0; i < 1000; i++) {
e.onNext(i);
Log.e(TAG, "subscribe: " + i);
sb.append("e----> " + i + "\n");
}
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Subscriber<Integer> mIntegerSubscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
mSubscription = s;
Log.e(TAG, "onSubscribe: s=" + s);
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: integer=" + integer);
sb.append(integer + "\n");
logContent.setText(sb.toString());
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: t=" + t);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
mIntegerFlowable.subscribe(mIntegerSubscriber);
}
@Test
public void flowableJavaOrchestrate() throws InterruptedException {
List<String> receivedHashValues = new ArrayList<>();
final CountDownLatch completionTracker = new CountDownLatch(expectedHashValues.size()); // used to keep track of the progress of the subsequent calls
Flowable<List<Long>> userIdFlowable = getUserIdService().register(RxFlowableInvokerProvider.class).request().rx(RxFlowableInvoker.class).get(new GenericType<List<Long>>() {
});
userIdFlowable.subscribe((List<Long> employeeIds) -> {
logger.info("[FlowableExample] id-service result: {}", employeeIds);
Flowable.fromIterable(employeeIds).subscribe(id -> {
getNameService().register(RxFlowableInvokerProvider.class).resolveTemplate("userId", id).request().rx(RxFlowableInvoker.class).get(String.class) // gotten the name for the given userId
.doOnError((throwable) -> {
logger.warn("[FlowableExample] An error has occurred in the username request step {}", throwable.getMessage());
}).subscribe(userName -> {
logger.info("[FlowableExample] name-service result: {}", userName);
getHashService().register(RxFlowableInvokerProvider.class).resolveTemplate("rawValue", userName + id).request().rx(RxFlowableInvoker.class).get(String.class) // gotten the hash value for userId+username
.doOnError((throwable) -> {
logger.warn(" [FlowableExample] An error has occurred in the hashing request step!", throwable);
}).subscribe(hashValue -> {
logger.info("[FlowableExample] hash-service result: {}", hashValue);
receivedHashValues.add(hashValue);
completionTracker.countDown();
});
});
});
});
// wait for async calls to complete
try {
// wait for inner requests to complete in 10 seconds
if (!completionTracker.await(10, TimeUnit.SECONDS)) {
logger.warn("[CallbackExample] Some requests didn't complete within the timeout");
}
} catch (InterruptedException e) {
logger.error("Interrupted!", e);
}
assertThat(receivedHashValues).containsAll(expectedHashValues);
}
public void streamingQuery02Example(PgPool pool) {
// Create an Observable
Flowable<Row> flowable = pool.rxGetConnection().flatMapPublisher(conn -> conn
.rxBegin()
.flatMapPublisher(tx ->
conn
.rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
.flatMapPublisher(preparedQuery -> {
// Fetch 50 rows at a time
RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
return stream.toFlowable();
})
.doAfterTerminate(tx::commit)));
// Then subscribe
flowable.subscribe(new Subscriber<Row>() {
private Subscription sub;
@Override
public void onSubscribe(Subscription subscription) {
sub = subscription;
subscription.request(1);
}
@Override
public void onNext(Row row) {
sub.request(1);
System.out.println("User: " + row.getString("last_name"));
}
@Override
public void onError(Throwable err) {
System.out.println("Error: " + err.getMessage());
}
@Override
public void onComplete() {
System.out.println("End of stream");
}
});
}
public void subscribe(Flowable<A> a, Flowable<B> b) {
aSub = new MySubscriber<A, K>(Source.A, this, requestSize);
bSub = new MySubscriber<B, K>(Source.B, this, requestSize);
a.subscribe(aSub);
b.subscribe(bSub);
}
@Test
public void shouldRegisterOnlyOneContentObserverAfterSubscribingToFlowableOnSdkVersionGreaterThan15() {
for (int sdkVersion = 16; sdkVersion < MAX_SDK_VERSION; sdkVersion++) {
ContentResolver contentResolver = mock(ContentResolver.class);
final AtomicReference<ContentObserver> contentObserver = new AtomicReference<ContentObserver>();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
// Save reference to ContentObserver only once to assert that it was created once
if (contentObserver.get() == null) {
contentObserver.set((ContentObserver) invocation.getArguments()[2]);
} else if (contentObserver.get() != invocation.getArguments()[2]) {
throw new AssertionError("More than one ContentObserver was created");
}
return null;
}
}).when(contentResolver).registerContentObserver(any(Uri.class), eq(true), any(ContentObserver.class));
Set<Uri> uris = new HashSet<Uri>(3);
uris.add(mock(Uri.class));
uris.add(mock(Uri.class));
uris.add(mock(Uri.class));
Flowable<Changes> flowable = RxChangesObserver
.observeChanges(
contentResolver,
uris,
mock(Handler.class),
sdkVersion,
BackpressureStrategy.MISSING
);
// Should not register ContentObserver before subscribing to Flowable
verify(contentResolver, times(0))
.registerContentObserver(any(Uri.class), anyBoolean(), any(ContentObserver.class));
Disposable disposable = flowable.subscribe();
for (Uri uri : uris) {
// Assert that same ContentObserver was registered for all uris
verify(contentResolver).registerContentObserver(same(uri), eq(true), same(contentObserver.get()));
}
disposable.dispose();
}
}