下面列出了io.reactivex.subscribers.TestSubscriber#assertComplete ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testFindAll() {
Entrypoint entrypoint = new Entrypoint();
entrypoint.setName("testName");
entrypoint.setDescription("Description");
entrypoint.setOrganizationId(ORGANIZATION_ID);
entrypointRepository.create(entrypoint).blockingGet();
TestSubscriber<Entrypoint> testObserver1 = entrypointRepository.findAll(ORGANIZATION_ID).test();
testObserver1.awaitTerminalEvent();
testObserver1.assertComplete();
testObserver1.assertNoErrors();
testObserver1.assertValueCount(1);
}
@Test
public void testFindAllByEnvironment() throws TechnicalException {
// create domain
Domain domain = new Domain();
domain.setName("testName");
domain.setReferenceType(ReferenceType.ENVIRONMENT);
domain.setReferenceId("environment#1");
domainRepository.create(domain).blockingGet();
// create domain on different environment.
Domain otherDomain = new Domain();
otherDomain.setName("testName");
otherDomain.setReferenceType(ReferenceType.ENVIRONMENT);
otherDomain.setReferenceId("environment#2");
domainRepository.create(otherDomain).blockingGet();
// fetch domains
TestSubscriber<Domain> testObserver1 = domainRepository.findAllByEnvironment("environment#1").test();
testObserver1.awaitTerminalEvent();
testObserver1.assertComplete();
testObserver1.assertNoErrors();
testObserver1.assertValueCount(1);
}
@Test public void observeQuery() throws Exception {
enqueue("demo/locales.json");
enqueue("demo/types.json");
enqueue("demo/initial.json");
sync();
TestSubscriber<Cat> subscriber = new TestSubscriber<>();
vault.observe(Cat.class)
.where(BaseFields.REMOTE_ID + " IN(?, ?, ?)", "happycat", "nyancat", "garfield")
.limit(2)
.order(Cat$Fields.UPDATED_AT + " DESC")
.all()
.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertComplete();
List<Cat> cats = subscriber.values();
assertThat(cats).hasSize(2);
assertThat(cats.get(0).updatedAt()).isEqualTo("2013-11-18T15:58:02.018Z");
assertThat(cats.get(1).updatedAt()).isEqualTo("2013-09-04T09:19:39.027Z");
}
@Test
public void manyToMany() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Flowable<HelloRequest> req = Flowable.just(
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build(),
HelloRequest.newBuilder().setName("d").build(),
HelloRequest.newBuilder().setName("e").build());
Flowable<HelloResponse> resp = req.compose(stub::sayHelloBothStream);
TestSubscriber<String> testSubscriber = resp.map(HelloResponse::getMessage).test();
testSubscriber.awaitTerminalEvent(3, TimeUnit.SECONDS);
testSubscriber.assertValues("Hello a and b", "Hello c and d", "Hello e");
testSubscriber.assertComplete();
}
@Test
public void oneToManyRetryWhen() {
TestSubscriber<Integer> test = newThreeErrorSingle()
.as(GrpcRetry.OneToMany.retryWhen(new Function<Single<Integer>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Single<Integer> single) {
return single.toFlowable();
}
}, RetryWhen.maxRetries(3).build()))
.test();
test.awaitTerminalEvent(1, TimeUnit.SECONDS);
test.assertValues(0);
test.assertNoErrors();
test.assertComplete();
}
@Test
public void oneToManyRetryImmediately() {
TestSubscriber<Integer> test = newThreeErrorSingle()
.as(GrpcRetry.OneToMany.retryImmediately(new Function<Single<Integer>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Single<Integer> single) {
return single.toFlowable();
}
}))
.test();
test.awaitTerminalEvent(1, TimeUnit.SECONDS);
test.assertValues(0);
test.assertNoErrors();
test.assertComplete();
}
@Test
public void oneToManyRetryAfter() {
TestSubscriber<Integer> test = newThreeErrorSingle()
.as(GrpcRetry.OneToMany.retryAfter(new Function<Single<Integer>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Single<Integer> single) {
return single.toFlowable();
}
}, 10, TimeUnit.MILLISECONDS))
.test();
test.awaitTerminalEvent(1, TimeUnit.SECONDS);
test.assertValues(0);
test.assertNoErrors();
test.assertComplete();
}
@Test
public void manyToManyRetryImmediately() {
TestSubscriber<Integer> test = newThreeErrorFlowable()
.compose(GrpcRetry.ManyToMany.retryImmediately(new Function<Flowable<Integer>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Flowable<Integer> flowable) {
return flowable;
}
}))
.test();
test.awaitTerminalEvent(1, TimeUnit.SECONDS);
test.assertValues(0);
test.assertNoErrors();
test.assertComplete();
}
@Test
public void manyToManyRetryAfter() {
TestSubscriber<Integer> test = newThreeErrorFlowable()
.compose(GrpcRetry.ManyToMany.retryAfter(new Function<Flowable<Integer>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Flowable<Integer> flowable) {
return flowable;
}
}, 10, TimeUnit.MILLISECONDS))
.test();
test.awaitTerminalEvent(1, TimeUnit.SECONDS);
test.assertValues(0);
test.assertNoErrors();
test.assertComplete();
}
@Test
public void chunkOperatorCorrectlyChunksFiniteRequest() {
int chunkSize = DEFAULT_CHUNK_SIZE;
int partOfChunk = TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE;
int num = chunkSize * 2;
AbstractStreamObserverAndPublisher<Long> source =
new TestStreamObserverAndPublisherWithFusion<Long>(new ConcurrentLinkedQueue<Long>(), null);
AsyncRangeCallStreamObserver observer = new AsyncRangeCallStreamObserver(Executors.newSingleThreadExecutor(), source, num);
source.onSubscribe(observer);
TestSubscriber<Long> testSubscriber = Flowable.fromPublisher(source)
.test(num);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertComplete();
assertThat(observer.requestsQueue).containsExactly(chunkSize, partOfChunk, partOfChunk, partOfChunk);
assertThat(source.outputFused).isFalse();
}
@Test
public void chunkOperatorCorrectlyChunksInfiniteRequestFusion() {
int chunkSize = DEFAULT_CHUNK_SIZE;
int partOfChunk = TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE;
int num = chunkSize * 2;
AbstractStreamObserverAndPublisher<Long> source =
new TestStreamObserverAndPublisherWithFusion<Long>(new ConcurrentLinkedQueue<Long>(), null);
AsyncRangeCallStreamObserver observer = new AsyncRangeCallStreamObserver(Executors.newSingleThreadExecutor(), source, num);
source.onSubscribe(observer);
TestSubscriber<Long> testSubscriber = Flowable.fromPublisher(source)
.observeOn(Schedulers.trampoline())
.test();
testSubscriber.awaitTerminalEvent();
testSubscriber.assertComplete();
assertThat(observer.requestsQueue).containsExactly(chunkSize, partOfChunk, partOfChunk, partOfChunk);
assertThat(source.outputFused).isTrue();
}
@Test
public void chunkOperatorCorrectlyChunksFiniteRequestFusion() {
int chunkSize = DEFAULT_CHUNK_SIZE;
int partOfChunk = TWO_THIRDS_OF_DEFAULT_CHUNK_SIZE;
int num = chunkSize * 2;
AbstractStreamObserverAndPublisher<Long> source =
new TestStreamObserverAndPublisherWithFusion<Long>(new ConcurrentLinkedQueue<Long>(), null);
AsyncRangeCallStreamObserver observer = new AsyncRangeCallStreamObserver(Executors.newSingleThreadExecutor(), source, num);
source.onSubscribe(observer);
TestSubscriber<Long> testSubscriber = Flowable.fromPublisher(source)
.observeOn(Schedulers.trampoline())
.test(num);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertComplete();
assertThat(observer.requestsQueue).containsExactly(chunkSize, partOfChunk, partOfChunk, partOfChunk);
assertThat(source.outputFused).isTrue();
}
@Test
public void testExecute_oneshot_exception_no_buffers() throws IOException {
RxCmdShell shell = mock(RxCmdShell.class);
Exception ex = new IOException("Error message");
when(shell.open()).thenReturn(Single.error(ex));
when(shell.isAlive()).thenReturn(Single.just(false));
final PublishProcessor<String> errorPub = PublishProcessor.create();
final TestSubscriber<String> errorSub = errorPub.test();
final PublishProcessor<String> outputPub = PublishProcessor.create();
final TestSubscriber<String> outputSub = outputPub.test();
final Cmd.Result result = Cmd.builder("")
.outputBuffer(false)
.errorBuffer(false)
.outputProcessor(outputPub)
.errorProcessor(errorPub)
.execute(shell);
assertThat(result.getExitCode(), is(Cmd.ExitCode.EXCEPTION));
assertThat(result.getErrors(), is(nullValue()));
assertThat(result.getOutput(), is(nullValue()));
assertThat(errorSub.valueCount(), is(1));
assertThat(outputSub.valueCount(), is(0));
errorSub.assertComplete();
outputSub.assertComplete();
}
@SuppressWarnings("CheckResult")
@Test
public void shouldExecuteAsBlockingAfterSubscription() {
//noinspection unchecked
final PreparedOperation<String, String, String> preparedOperation = mock(PreparedOperation.class);
final String expectedResult = "expected_string";
when(preparedOperation.executeAsBlocking()).thenReturn(expectedResult);
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
Flowable
.create(new FlowableOnSubscribeExecuteAsBlocking<String, String, String>(preparedOperation), BackpressureStrategy.MISSING)
.subscribe(testSubscriber);
verify(preparedOperation).executeAsBlocking();
testSubscriber.assertValue(expectedResult);
testSubscriber.assertNoErrors();
testSubscriber.assertComplete();
}
@Test
public void testWithImmediateValueWithRequests() {
Publisher<Integer> publisher = Uni.createFrom().item(1).convert().toPublisher();
assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(publisher).test(0);
test.assertSubscribed();
test.request(20);
test.assertResult(1);
test.assertComplete();
}
@Test
public void testSwitchNodeIfErrorOnAuthorize() throws Exception {
TestSubscriber<Authorization> subscriber = new TestSubscriber<>();
when(mockedApi.authorize(any(String.class)))
.thenReturn(Flowable.just(authorization));
wrapper
.authorize(TestConstants.TEST_PASS_PHRASE)
.retryWhen(flowable -> flowable.delay(AdamantApi.SYNCHRONIZE_DELAY_SECONDS,TimeUnit.SECONDS, scheduler))
.subscribe(subscriber);
scheduler.advanceTimeBy(AdamantApi.SYNCHRONIZE_DELAY_SECONDS * (2 + 1), TimeUnit.SECONDS);
subscriber.assertComplete();
}
@Test
public void testUnsubscribeAfterActionButBeforeCompletionDoesNotAffectCompletion() {
final TestSubscriber<Object> ts = TestSubscriber.create();
Flowable.empty() //
.compose(Transformers.doOnEmpty(new Action() {
@Override
public void run() {
ts.cancel();
}
})).subscribe(ts);
ts.assertNoValues();
ts.assertComplete();
}
@Test
public void testFindAll() throws TechnicalException {
// create role
Role role1 = new Role();
role1.setName("testName1");
role1.setReferenceType(ReferenceType.DOMAIN);
role1.setReferenceId(DOMAIN_ID);
Role roleCreated1 = roleRepository.create(role1).blockingGet();
Role role2 = new Role();
role2.setName("testName2");
role2.setReferenceType(ReferenceType.DOMAIN);
role2.setReferenceId(DOMAIN_ID);
Role roleCreated2 = roleRepository.create(role2).blockingGet();
// Role 3 is on domain#2.
Role role3 = new Role();
role3.setName("testName3");
role3.setReferenceType(ReferenceType.DOMAIN);
role3.setReferenceId("domain#2");
roleRepository.create(role3).blockingGet();
// fetch role
TestSubscriber<Role> testObserver = roleRepository.findAll(ReferenceType.DOMAIN, DOMAIN_ID).test();
testObserver.awaitTerminalEvent();
testObserver.assertComplete();
testObserver.assertNoErrors();
testObserver.assertValueCount(2);
List<Role> roles = testObserver.values();
assertTrue(roles.stream().anyMatch(role -> role.getId().equals(roleCreated1.getId())));
assertTrue(roles.stream().anyMatch(role -> role.getId().equals(roleCreated2.getId())));
}
private void rx_createAsync_success(BiConsumer<Service, TestSubscriber<Object>> subscriber) {
TestSubscriber<Object> observer = new TestSubscriber<>(currentTraceContextObserver);
Service service = service(RxJava2CallAdapterFactory.createAsync());
subscriber.accept(service, observer);
// enqueue later
server.enqueue(new MockResponse());
observer.awaitTerminalEvent(1, SECONDS);
observer.assertComplete();
assertThat(currentTraceContextObserver.onComplete).isEqualTo(context1);
}
@Test
public void testIsDeferred() {
AtomicBoolean a = new AtomicBoolean();
Single<Integer> s = Single.fromCallable(() -> {
a.set(true);
return 1;
});
TestSubscriber<Integer> ts = new FlowableSingleDeferUntilRequest<Integer>(s) //
.test(0);
assertFalse(a.get());
ts.requestMore(1);
assertTrue(a.get());
ts.assertValue(1);
ts.assertComplete();
}