下面列出了io.reactivex.observers.TestObserver#assertValues ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void shouldSupportStartingALoopWithAnInit() throws Exception {
MobiusLoop.Builder<String, Integer, Boolean> withInit =
builder.init(
new Init<String, Boolean>() {
@Nonnull
@Override
public First<String, Boolean> init(String model) {
return First.first(model + "-init");
}
});
ObservableTransformer<Integer, String> transformer = RxMobius.loopFrom(withInit, "hi");
final TestObserver<String> observer = Observable.just(10).compose(transformer).test();
observer.awaitCount(2);
observer.assertValues("hi-init", "hi-init10");
}
@Test
public void testScheduler() {
TestScheduler scheduler = new TestScheduler(); //1
Observable<Long> tick = Observable
.interval(1, TimeUnit.SECONDS, scheduler); //2
Observable<String> observable =
Observable.just("foo", "bar", "biz", "baz") //3
.zipWith(tick, (string, index) -> index + "-" + string);//4
TestObserver<String> testObserver = observable
.subscribeOn(scheduler).test();//5
scheduler.advanceTimeBy(2300, TimeUnit.MILLISECONDS);//6
testObserver.assertNoErrors(); //7
testObserver.assertValues("0-foo", "1-bar");
testObserver.assertNotComplete();
}
@Test
public void manyToOneRetryWhen() {
TestObserver<Integer> test = newThreeErrorFlowable()
.as(GrpcRetry.ManyToOne.retryWhen(new Function<Flowable<Integer>, Single<Integer>>() {
@Override
public Single<Integer> apply(Flowable<Integer> flowable) {
return flowable.singleOrError();
}
}, RetryWhen.maxRetries(3).build()))
.test();
test.awaitTerminalEvent(1, TimeUnit.SECONDS);
test.assertValues(0);
test.assertNoErrors();
test.assertComplete();
}
@Test
public void manyToOneRetryAfter() {
TestObserver<Integer> test = newThreeErrorFlowable()
.as(GrpcRetry.ManyToOne.retryAfter(new Function<Flowable<Integer>, Single<Integer>>() {
@Override
public Single<Integer> apply(Flowable<Integer> flowable) {
return flowable.singleOrError();
}
}, 10, TimeUnit.MILLISECONDS))
.test();
test.awaitTerminalEvent(1, TimeUnit.SECONDS);
test.assertValues(0);
test.assertNoErrors();
test.assertComplete();
}
@Test
public void runQueryOnceOrDefaultEmitsDefault() {
final Author defaultVal = Author.newRandom();
final TestObserver<Author> ts = selectAuthors
.takeFirst()
.observe()
.runQueryOnceOrDefault(defaultVal)
.test();
final Author insertedVal = Author.newRandom();
insertedVal.insert().execute();
awaitTerminalEvent(ts);
ts.assertNoErrors();
ts.assertComplete();
ts.assertValues(defaultVal);
}
@Test
public void testExists() throws Exception {
RxPaperBook book = RxPaperBook.with("EXISTS", Schedulers.trampoline());
final String key = "hello";
book.write(key, ComplexObject.random()).subscribe();
final TestObserver<Boolean> foundSubscriber = book.exists(key).test();
foundSubscriber.awaitTerminalEvent();
foundSubscriber.assertNoErrors();
foundSubscriber.assertValueCount(1);
foundSubscriber.assertValues(true);
// notFoundSubscriber
String noKey = ":(";
final TestObserver<Boolean> notFoundSubscriber = book.exists(noKey).test();
notFoundSubscriber.awaitTerminalEvent();
notFoundSubscriber.assertComplete();
notFoundSubscriber.assertValueCount(1);
notFoundSubscriber.assertValues(false);
}
@Test
public void testContains() throws Exception {
RxPaperBook book = RxPaperBook.with("CONTAINS", Schedulers.trampoline());
final String key = "hello";
book.write(key, ComplexObject.random()).subscribe();
final TestObserver<Boolean> foundSubscriber = book.contains(key).test();
foundSubscriber.awaitTerminalEvent();
foundSubscriber.assertNoErrors();
foundSubscriber.assertValueCount(1);
foundSubscriber.assertValues(true);
// notFoundSubscriber
String noKey = ":(";
final TestObserver<Boolean> notFoundSubscriber = book.contains(noKey).test();
notFoundSubscriber.awaitTerminalEvent();
notFoundSubscriber.assertComplete();
notFoundSubscriber.assertValueCount(1);
notFoundSubscriber.assertValues(false);
}
@Test
public void switchToLatest_noErrors() {
RxCommand<String> command = RxCommand.create(o -> {
if (o == null) {
return Observable.error(new Exception("something wrong"));
} else {
return Observable.just((String) o);
}
});
TestObserver<String> testObserver = new TestObserver<>();
command.switchToLatest().subscribe(testObserver);
command.execute(null);
command.execute("1");
command.execute("2");
testObserver.assertValueCount(2);
testObserver.assertValues("1", "2");
}
@Test
public void testObservable_oneSensor() {
SensorDatabaseImpl db =
new SensorDatabaseImpl(getContext(), getAppAccount(), TEST_DATABASE_NAME);
db.addScalarReading("id", "tag", 0, 0, 0.0);
db.addScalarReading("id", "tag", 0, 1, 1.5);
db.addScalarReading("id", "tag", 0, 101, 2.0);
db.addScalarReading("id", "tag", 0, 102, 2.0);
db.addScalarReading("id", "tag", 0, 103, 2.0);
db.addScalarReading("id", "tag2", 0, 0, 1.0);
TestObserver<ScalarReading> testObserver = new TestObserver<>();
Observable<ScalarReading> obs =
db.createScalarObservable(
"id", new String[] {"tag"}, TimeRange.oldest(Range.closed(0L, 1L)), 0);
obs.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertValues(new ScalarReading(0, 0.0, "tag"), new ScalarReading(1, 1.5, "tag"));
}
@Test
@TargetApi(Build.VERSION_CODES.M)
public void eachSubscription_severalPermissions_oneRevoked() {
TestObserver<Permission> sub = new TestObserver<>();
String[] permissions = new String[]{Manifest.permission.READ_PHONE_STATE, Manifest.permission.CAMERA};
when(mRxPermissions.isGranted(Matchers.<String>anyVararg())).thenReturn(false);
when(mRxPermissions.isRevoked(Manifest.permission.CAMERA)).thenReturn(true);
trigger().compose(mRxPermissions.ensureEach(permissions)).subscribe(sub);
mRxPermissions.onRequestPermissionsResult(
new String[]{Manifest.permission.READ_PHONE_STATE},
new int[]{PackageManager.PERMISSION_GRANTED});
sub.assertNoErrors();
sub.assertTerminated();
sub.assertValues(new Permission(permissions[0], true), new Permission(permissions[1], false));
}
@Test
public void givenSubscriberAndError_whenHandleOnErrorResumeFunc_thenResumed() {
TestObserver<String> testObserver = new TestObserver<>();
Observable
.<String>error(UNKNOWN_ERROR)
.onErrorResumeNext(throwable -> {
return Observable.just(throwable.getMessage(), "nextValue");
})
.subscribe(testObserver);
testObserver.assertNoErrors();
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertValues("unknown error", "nextValue");
}
@Test
public void whenObserverSubscribedToReplayRelayWithLimitedSize_thenItReceivesEmittedEvents () {
ReplayRelay<Integer> replayRelay = ReplayRelay.createWithSize(2);
TestObserver<Integer> firstObserver = TestObserver.create();
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.accept(20);
replayRelay.subscribe(firstObserver);
firstObserver.assertValues(15, 20);
}
@Test
public void testIsViewReady_AttachView_ShouldCallValueFalseTrue() throws Exception {
mPresenter.create();
final TestObserver<Boolean> test = RxTiPresenterUtils.isViewReady(mPresenter).test();
mPresenter.attachView(mView);
test.assertValues(false, true);
}
@Test
@TargetApi(Build.VERSION_CODES.M)
public void eachSubscriptionCombined_severalPermissions_granted() {
TestObserver<Permission> sub = new TestObserver<>();
String[] permissions = new String[]{Manifest.permission.READ_PHONE_STATE, Manifest.permission.CAMERA};
when(mRxPermissions.isGranted(Matchers.<String>anyVararg())).thenReturn(false);
int[] result = new int[]{PackageManager.PERMISSION_GRANTED, PackageManager.PERMISSION_GRANTED};
trigger().compose(mRxPermissions.ensureEachCombined(permissions)).subscribe(sub);
mRxPermissions.onRequestPermissionsResult(permissions, result);
sub.assertNoErrors();
sub.assertTerminated();
sub.assertValues(new Permission(permissions[0] + ", " + permissions[1], true));
}
@Test
public void test_skip() {
String[] fruits = { "mango", "pineapple", "apple", "mango", "papaya" };
Observable<String> observable = Observable.fromArray(fruits).skip(3);
TestObserver<String> testObserver = new TestObserver<>();
observable.subscribe(testObserver);
testObserver.assertComplete();
testObserver.assertValueCount(2);
testObserver.assertNoErrors();
testObserver.assertValues("mango", "papaya");
}
@Test
public void emitsAppStates() {
FakeApplication fakeApplication = new FakeApplication();
TestObserver<AppState> subscriber = TestObserver.create();
RxAppStateMonitor.monitor(fakeApplication).subscribe(subscriber);
fakeApplication.goForeground();
fakeApplication.goBackground();
subscriber.assertValues(FOREGROUND, BACKGROUND);
subscriber.assertNotTerminated();
}
@Test
public void whenObserverSubscribedToReplayRelay_thenItReceivesEmittedEvents () {
ReplayRelay<Integer> replayRelay = ReplayRelay.create();
TestObserver<Integer> firstObserver = TestObserver.create();
TestObserver<Integer> secondObserver = TestObserver.create();
replayRelay.subscribe(firstObserver);
replayRelay.accept(5);
replayRelay.accept(10);
replayRelay.accept(15);
replayRelay.subscribe(secondObserver);
firstObserver.assertValues(5, 10, 15);
secondObserver.assertValues(5, 10, 15);
}
@Test
public void deleteBuilderObserve() {
Author.newRandom()
.insert()
.execute();
Author.newRandom()
.insert()
.usingConnection(newConnection)
.execute();
final TestObserver<Long> ts1 = SELECT_AUTHORS
.count()
.observe()
.runQuery()
.test();
final TestObserver<Long> ts2 = Select
.from(AUTHOR)
.usingConnection(newConnection)
.count()
.observe()
.runQuery()
.test();
ts1.assertValue(1L);
ts2.assertValue(1L);
Delete.from(AUTHOR)
.usingConnection(newConnection)
.execute();
ts1.assertValue(1L);
ts2.assertValues(1L, 0L);
ts1.dispose();
ts2.dispose();
}
@Test
public void testRead() throws Exception {
RxPaperBook book = RxPaperBook.with("READ", Schedulers.trampoline());
final String key = "hello";
final ComplexObject value = ComplexObject.random();
book.write(key, value).subscribe();
final TestObserver<ComplexObject> testSubscriber = book.<ComplexObject>read(key).test();
testSubscriber.awaitTerminalEvent();
testSubscriber.assertComplete();
testSubscriber.assertNoErrors();
testSubscriber.assertValueCount(1);
testSubscriber.assertValues(value);
// notFoundSubscriber
String noKey = ":(";
final TestObserver<ComplexObject> notFoundSubscriber = book.<ComplexObject>read(noKey).test();
notFoundSubscriber.awaitTerminalEvent();
notFoundSubscriber.assertError(IllegalArgumentException.class);
// incorrectTypeSubscriber
book.<Integer>read(key).subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
Assert.fail();
}
@Override
public void onError(Throwable e) {
if (!(e instanceof ClassCastException)) {
Assert.fail(e.getMessage());
}
}
});
// immutable objects
book.write(key, new ImmutableObject(key)).subscribe();
final TestObserver<ImmutableObject> immutableReadSubscriber = book.<ImmutableObject>read(key).test();
immutableReadSubscriber.awaitTerminalEvent();
immutableReadSubscriber.assertNoErrors();
immutableReadSubscriber.assertComplete();
immutableReadSubscriber.assertValueCount(1);
}
@Test
public void connected() {
when(peripheral.connected()).thenReturn(connectedRelay.hide());
TestObserver<Boolean> connectedTestObserver = corePeripheralManager.connected().test();
connectedRelay.accept(true);
connectedTestObserver.assertValues(false, true);
}