下面列出了io.reactivex.Flowable#blockingFirst ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void concurrentTwoProducerOneConsumer() throws InterruptedException {
ChatsStorage chatsStorage = new ChatsStorage();
Flowable<AdamantBasicMessage> producer1 = provideProducer(chatsStorage);
Flowable<AdamantBasicMessage> producer2 = provideProducer(chatsStorage);
Flowable<Integer> consumer = provideConsumer(chatsStorage, CHAT_COUNT * 2);
Disposable producerSubscription1 = producer1.subscribe();
subscriptions.add(producerSubscription1);
Disposable producerSubscription2 = producer2.subscribe();
subscriptions.add(producerSubscription2);
Integer size = consumer.blockingFirst();
Assert.assertEquals(CHAT_COUNT * 2, (int) size);
}
@Test
public void whenCreateIsCalledAndAnErrorOccursThenObserverOnErrorIsCalled()
throws Throwable {
this.expectedException.expectCause(isA(MqttException.class));
final ArgumentCaptor<IMqttActionListener> actionListener = ArgumentCaptor
.forClass(IMqttActionListener.class);
final ArgumentCaptor<IMqttMessageListener[]> messageListener = ArgumentCaptor
.forClass(IMqttMessageListener[].class);
final String[] topics = new String[] { "topic1", "topic2" };
final int[] qos = new int[] { 1, 2 };
final IMqttAsyncClient client = Mockito.mock(IMqttAsyncClient.class);
Mockito.when(client.subscribe(Matchers.same(topics), Matchers.same(qos),
Matchers.isNull(), actionListener.capture(),
messageListener.capture()))
.thenThrow(new MqttException(
MqttException.REASON_CODE_CLIENT_CONNECTED));
final SubscribeFactory factory = new SubscribeFactory(client);
final Flowable<SubscribeMessage> obs = factory.create(topics, qos,
BackpressureStrategy.ERROR);
obs.blockingFirst();
}
@Test
public void testThatTwoSubscribersHaveTwoSubscriptions() {
AtomicInteger count = new AtomicInteger(1);
Publisher<Integer> publisher = Uni.createFrom().deferred(() -> Uni.createFrom()
.item(count.getAndIncrement()))
.convert().toPublisher();
assertThat(publisher).isNotNull();
Flowable<Integer> flow = Flowable.fromPublisher(publisher);
int first = flow.blockingFirst();
assertThat(first).isEqualTo(1);
first = flow.blockingFirst();
assertThat(first).isEqualTo(2);
}
@Test
public void testThatTwoSubscribersWithCache() {
AtomicInteger count = new AtomicInteger(1);
Publisher<Integer> publisher = Uni.createFrom()
.deferred(() -> Uni.createFrom().item(count.getAndIncrement())).cache().convert().toPublisher();
assertThat(publisher).isNotNull();
Flowable<Integer> flow = Flowable.fromPublisher(publisher);
int first = flow.blockingFirst();
assertThat(first).isEqualTo(1);
first = flow.blockingFirst();
assertThat(first).isEqualTo(1);
}
@Test
public void concurrentOneProducerOneConsumer() {
ChatsStorage chatsStorage = new ChatsStorage();
Flowable<AdamantBasicMessage> producer = provideProducer(chatsStorage);
Flowable<Integer> consumer = provideConsumer(chatsStorage, CHAT_COUNT);
Disposable producerSubscription = producer.subscribe();
subscriptions.add(producerSubscription);
Integer size = consumer.blockingFirst();
Assert.assertEquals(CHAT_COUNT, (int) size);
}
@Test
public void testToPublisherWithImmediateCompletion() {
Completable completable = Completable.complete();
Flowable<String> flowable = Flowable.fromPublisher(converter.toRSPublisher(completable));
String res = flowable.blockingFirst("DEFAULT");
assertThat(res).isEqualTo("DEFAULT");
}
@Test
public void testToPublisherWithDelayedCompletion() {
Completable completable = Single.just("hello").delay(10, TimeUnit.MILLISECONDS).ignoreElement();
Flowable<String> flowable = Flowable.fromPublisher(converter.toRSPublisher(completable));
String res = flowable.blockingFirst("DEFAULT");
assertThat(res).isEqualTo("DEFAULT");
}