下面列出了io.reactivex.Single#test ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public static void main(String[] args) throws Throwable {
TestScheduler scheduler = new TestScheduler();
Single<Long> s1 = Single.timer(1, TimeUnit.SECONDS, scheduler);
Single<String> s2 = Single.just("Hello");
Single<String> r = Single.zip(s1, s2, (t, s) -> t + " -> " + s);
TestObserver<String> testObserver = r.test();
testObserver.assertNoValues();
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
testObserver.assertNoValues();
scheduler.advanceTimeBy(600, TimeUnit.MILLISECONDS);
testObserver
.assertNoErrors()
.assertValue("0 -> Hello");
}
@Test public void When_Call_OnLegalMock_If_Method_Return_Type_Is_Single_Then_Get_Object()
throws NoSuchMethodException {
Method method = Providers.class.getDeclaredMethod("single");
Rx2Retrofit annotation = PlaceholderRetrofitAnnotation.class.getAnnotation(Rx2Retrofit.class);
Metadata<Rx2Retrofit> metadata = new Metadata(Providers.class,
method, null, annotation, method.getGenericReturnType());
Single single = (Single) rx2RetrofitInterceptor.onLegalMock(new Mock(), metadata);
TestObserver<Mock> subscriber = single.test();
subscriber.awaitTerminalEvent();
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
Mock mock = subscriber.values().get(0);
assertNotNull(mock);
}
@Test
public void When_Call_OnLegalMock_If_Method_Return_Type_Is_Single_Response_Then_Get_Response()
throws NoSuchMethodException {
Method method = Providers.class.getDeclaredMethod("singleResponseMock");
Rx2Retrofit annotation = PlaceholderRetrofitAnnotation.class.getAnnotation(Rx2Retrofit.class);
Metadata<Rx2Retrofit> metadata = new Metadata(Providers.class,
method, null, annotation, method.getGenericReturnType());
Single single = (Single) rx2RetrofitInterceptor.onLegalMock(new Mock(), metadata);
TestObserver<Response<Mock>> subscriber = single.test();
subscriber.awaitTerminalEvent();
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
Response<Mock> response = subscriber.values().get(0);
assertTrue(response.isSuccessful());
assertNotNull(response.body());
}
@Test public void When_Call_OnIllegalMock_If_Method_Return_Type_Is_Single_Then_Get_Error_Single()
throws NoSuchMethodException, IOException {
Method method = Providers.class.getDeclaredMethod("single");
Rx2Retrofit annotation = PlaceholderRetrofitAnnotation.class.getAnnotation(Rx2Retrofit.class);
Metadata<Rx2Retrofit> metadata = new Metadata(Providers.class,
method, null, annotation, method.getGenericReturnType());
Single single = (Single) rx2RetrofitInterceptor.onIllegalMock(new AssertionError(), metadata);
TestObserver<List<Mock>> subscriber = single.test();
subscriber.awaitTerminalEvent();
subscriber.assertNoValues();
HttpException httpException = (HttpException) subscriber.errors().get(0);
assertThat(httpException.getMessage(), is("HTTP 404 null"));
}
@Test
public void When_Call_OnIllegalMock_If_Method_Return_Type_Is_Single_Response_Then_Get_Response_Body_Null()
throws NoSuchMethodException, IOException {
Method method = Providers.class.getDeclaredMethod("singleResponseMock");
Rx2Retrofit annotation = PlaceholderRetrofitAnnotation.class.getAnnotation(Rx2Retrofit.class);
Metadata<Rx2Retrofit> metadata = new Metadata(Providers.class,
method, null, annotation, method.getGenericReturnType());
Single single = (Single) rx2RetrofitInterceptor
.onIllegalMock(new AssertionError("BOOM!"), metadata);
TestObserver<Response<Mock>> subscriber = single.test();
subscriber.awaitTerminalEvent();
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
Response<Mock> response = subscriber.values().get(0);
assertNull(response.body());
assertFalse(response.isSuccessful());
assertThat(response.errorBody().string(), is("BOOM!"));
}
@Test public void When_Call_OnIllegalMock_Response_With_Custom_Response_Adapter_Adapt_It()
throws NoSuchMethodException, IOException {
Method method = Providers.class.getDeclaredMethod("singleResponseMock");
Rx2Retrofit annotation =
PlaceholderRetrofitErrorResponseAdapterAnnotation.class.getAnnotation(Rx2Retrofit.class);
Metadata<Rx2Retrofit> metadata = new Metadata(Providers.class,
method, null, annotation, method.getGenericReturnType());
Single single =
(Single) rx2RetrofitInterceptor.onIllegalMock(new AssertionError("BOOM!"), metadata);
TestObserver<Response<Mock>> subscriber = single.test();
subscriber.awaitTerminalEvent();
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
Response<Mock> response = subscriber.values().get(0);
assertNull(response.body());
assertThat(response.errorBody().string(), is("{'message':'BOOM!'}"));
}
@Test
public void test() {
TestScheduler testScheduler = new TestScheduler();
final Single<List<Integer>> first = Single.timer(2, TimeUnit.SECONDS, testScheduler)
.map(u -> Arrays.asList(1, 2, 3));
final Single<List<Integer>> second = Single.just(Collections.emptyList());
final Single<List<Integer>> third = Single.just(Collections.singletonList(4));
final Single<List<Integer>> fourth = Single.just(Collections.singletonList(5));
Single<List<Integer>> subject = Observable
.fromIterable(Arrays.asList(first, second, third, fourth))
.concatMapSingle(single -> single)
.reduce(new ArrayList<>(), (seed, items) -> {
seed.addAll(items);
return seed;
});
TestObserver<List<Integer>> testObserver = subject.test();
testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);
System.out.println(testObserver.values());
testObserver.assertValue(list -> list.equals(Arrays.asList(1, 2, 3, 4, 5)));
// 5 is currently missing ; fourth was never subscribed in the first place
}
public static <T> T testSingle(Single<T> single) {
assertNotNull("Single itself was null", single);
// If you don't do this it blocks trying to do the operation and therefore can't observe the results
single = single.subscribeOn(Schedulers.io());
TestObserver<T> observer = single.test();
assertTrue("Observer failed to reach terminal event", observer.awaitTerminalEvent());
observer.assertNoErrors();
T object = observer.values().get(0);
assertNotNull("Returned object was null - RxJava should not allow this", object);
return object;
}
@Test
public void queueOperations() {
when(peripheralOperation1.result()).thenReturn(operationResultSubject1.hide());
when(peripheralOperation2.result()).thenReturn(operationResultSubject2.hide());
when(peripheralOperation3.result()).thenReturn(operationResultSubject3.hide());
when(peripheralOperation4.result()).thenReturn(operationResultSubject4.hide());
Single<Irrelevant> op1 = corePeripheralManager.queueOperation(peripheralOperation1);
verify(peripheralOperation1, times(0)).execute(any());
Single<Irrelevant> op2 = corePeripheralManager.queueOperation(peripheralOperation2);
verify(peripheralOperation2, times(0)).execute(any());
Single<Irrelevant> op3 = corePeripheralManager.queueOperation(peripheralOperation3);
verify(peripheralOperation3, times(0)).execute(any());
op1.test();
op2.test();
TestObserver op3Observer = op3.test();
verify(peripheralOperation1).execute(any());
verify(peripheralOperation2, times(0)).execute(any());
// Subscriber for Operation 3 decides they want to cancel the operation.
op3Observer.dispose();
operationResultSubject1.onSuccess(Irrelevant.INSTANCE);
verify(peripheralOperation2).execute(any());
Single<Irrelevant> op4 = corePeripheralManager.queueOperation(peripheralOperation4);
op4.test();
verify(peripheralOperation4, times(0)).execute(any());
operationResultSubject2.onError(new Exception());
verify(peripheralOperation4).execute(any());
// Operation 3 should never have executed.
verify(peripheralOperation3, times(0)).execute(any());
}
@Test
public void oneToOne() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Single<HelloResponse> resp = Single.just(HelloRequest.getDefaultInstance()).compose(stub::sayHello);
TestObserver<HelloResponse> test = resp.test();
test.awaitTerminalEvent(3, TimeUnit.SECONDS);
test.assertError(t -> t instanceof StatusRuntimeException);
test.assertError(t -> ((StatusRuntimeException)t).getStatus() == Status.INTERNAL);
}
@Test
public void manyToOne() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Single<HelloResponse> resp = Flowable.just(HelloRequest.getDefaultInstance()).as(stub::sayHelloReqStream);
TestObserver<HelloResponse> test = resp.test();
test.awaitTerminalEvent(3, TimeUnit.SECONDS);
test.assertError(t -> t instanceof StatusRuntimeException);
test.assertError(t -> ((StatusRuntimeException)t).getStatus() == Status.INTERNAL);
}
@Test
public void servicesCanCallOtherServices() throws InterruptedException {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Single<String> chain = Single.just(request("X"))
// one -> one
.compose(stub::sayHello)
.map(ChainedCallIntegrationTest::bridge)
.doOnSuccess(System.out::println)
// one -> many
.as(stub::sayHelloRespStream)
.map(ChainedCallIntegrationTest::bridge)
.doOnNext(System.out::println)
// many -> many
.compose(stub::sayHelloBothStream)
.map(ChainedCallIntegrationTest::bridge)
.doOnNext(System.out::println)
// many -> one
.as(stub::sayHelloReqStream)
.map(ChainedCallIntegrationTest::bridge)
.doOnSuccess(System.out::println)
// one -> one
.compose(stub::sayHello)
.map(HelloResponse::getMessage)
.doOnSuccess(System.out::println);
TestObserver<String> test = chain.test();
test.awaitTerminalEvent(2, TimeUnit.SECONDS);
test.assertComplete();
test.assertValue("[<{[X]}> :: </[X]/> :: <\\[X]\\> :: <([X])>]");
}
@Test
public void oneToOne() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Single<HelloResponse> resp = Single.just(HelloRequest.getDefaultInstance()).compose(stub::sayHello);
TestObserver<HelloResponse> test = resp.test();
test.awaitTerminalEvent(3, TimeUnit.SECONDS);
test.assertError(t -> t instanceof StatusRuntimeException);
test.assertError(t -> ((StatusRuntimeException)t).getStatus().getCode() == Status.Code.INTERNAL);
}
@Test
public void manyToOne() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Flowable<HelloRequest> req = Flowable.just(HelloRequest.getDefaultInstance());
Single<HelloResponse> resp = req.as(stub::sayHelloReqStream);
TestObserver<HelloResponse> test = resp.test();
test.awaitTerminalEvent(3, TimeUnit.SECONDS);
test.assertError(t -> t instanceof StatusRuntimeException);
// Flowable requests get canceled when unexpected errors happen
test.assertError(t -> ((StatusRuntimeException)t).getStatus().getCode() == Status.Code.INTERNAL);
}
@Test
public void oneToOne() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Single<String> rxRequest = Single.just("World");
Single<String> rxResponse = rxRequest
.map(this::toRequest)
.compose(stub::sayHello)
.map(this::fromResponse);
TestObserver<String> test = rxResponse.test();
test.awaitTerminalEvent(1, TimeUnit.SECONDS);
test.assertNoErrors();
test.assertValue("Hello World");
}
@Test
public void manyToOne() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Flowable<String> rxRequest = Flowable.just("A", "B", "C");
Single<String> rxResponse = rxRequest
.map(this::toRequest)
.as(stub::sayHelloReqStream)
.map(this::fromResponse);
TestObserver<String> test = rxResponse.test();
test.awaitTerminalEvent(1, TimeUnit.SECONDS);
test.assertNoErrors();
test.assertValue("Hello A and B and C");
}