下面列出了android.test.UiThreadTest#rx.observers.TestSubscriber 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void shouldForceTimeoutOfSocketConnectDoesNotReturn() {
BootstrapAdapter bootstrap = mock(BootstrapAdapter.class);
when(bootstrap.connect()).thenReturn(channel.newPromise()); // this promise never completes
Endpoint endpoint = new DummyEndpoint(bootstrap, ctx);
Observable<LifecycleState> observable = endpoint.connect();
TestSubscriber<LifecycleState> testSubscriber = new TestSubscriber<LifecycleState>();
observable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
List<Throwable> errors = testSubscriber.getOnErrorEvents();
assertEquals(1, errors.size());
assertEquals(ConnectTimeoutException.class, errors.get(0).getClass());
endpoint.disconnect().subscribe();
}
@Test
public void shouldPropagateErrorOnEncode() {
String id = "key";
ByteBuf content = Unpooled.buffer();
content.release(); // provoke a IllegalReferenceCountException
UpsertRequest request = new UpsertRequest(id, content, BUCKET);
request.partition((short) 1);
TestSubscriber<CouchbaseResponse> ts = TestSubscriber.create();
request.observable().subscribe(ts);
try {
channel.writeOutbound(request);
fail("Expected exception, none thrown.");
} catch (EncoderException ex) {
assertTrue(ex.getCause() instanceof IllegalReferenceCountException);
}
List<Throwable> onErrorEvents = ts.getOnErrorEvents();
assertTrue(onErrorEvents.get(0) instanceof RequestCancelledException);
assertTrue(onErrorEvents.get(0).getCause() instanceof IllegalReferenceCountException);
}
@Test
public void energyService_findByEnvironment() {
Integer ENVIRONMENT = 108;
List<EnergyModel> energies = new ArrayList<>();
when(energyRepository.findByEnvironment(ENVIRONMENT)).thenReturn(Observable.just(energies));
TestSubscriber<List<EnergyModel>> tester = new TestSubscriber<>();
energyService.findByEnvironment(ENVIRONMENT).subscribe(tester);
verify(energyRepository).findByEnvironment(ENVIRONMENT);
tester.assertValue(energies);
tester.assertCompleted();
tester.assertNoErrors();
}
@Test
public void environmentService_findById() {
Integer ENVIRONMENT_ID = 110;
EnvironmentModel environment = new EnvironmentModel();
environment.setId(ENVIRONMENT_ID);
when(environmentRepository.findById(ENVIRONMENT_ID)).thenReturn(Observable.just(environment));
TestSubscriber<EnvironmentModel> tester = new TestSubscriber<>();
environmentService.findById(ENVIRONMENT_ID).subscribe(tester);
verify(environmentRepository).findById(ENVIRONMENT_ID);
tester.assertValue(environment);
tester.assertCompleted();
tester.assertNoErrors();
}
@SuppressWarnings("unchecked")
@Test
public void bufferUntilSomeNull() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
Observable.just(1, 2, 3, null, 4, 5)
.compose(Transformers.bufferUntil(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer v) {
return v != null && v == 20;
}
}))
.subscribe(ts);
ts.assertValues(
Arrays.asList(1, 2, 3, null, 4, 5)
);
ts.assertNoErrors();
ts.assertCompleted();
}
@Test
public void rxObservableInt() {
server.enqueue(new MockResponse().setBody("1"));
final TestInterface api = target();
final Observable<Integer> observable = api.intObservable();
assertThat(observable).isNotNull();
assertThat(server.getRequestCount()).isEqualTo(0);
final TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>();
observable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
Assertions.assertThat(testSubscriber.getOnNextEvents().get(0)).isEqualTo(new Integer(1));
}
@Test
public void testObserveAuthState() {
TestSubscriber<FirebaseUser> testSubscriber = new TestSubscriber<>();
RxFirebaseAuth.observeAuthState(mockAuth)
.subscribeOn(Schedulers.immediate())
.subscribe(testSubscriber);
ArgumentCaptor<FirebaseAuth.AuthStateListener> argument = ArgumentCaptor.forClass(FirebaseAuth.AuthStateListener.class);
verify(mockAuth).addAuthStateListener(argument.capture());
argument.getValue().onAuthStateChanged(mockAuth);
testSubscriber.assertNoErrors();
testSubscriber.assertValueCount(1);
testSubscriber.assertReceivedOnNext(Collections.singletonList(mockUser));
testSubscriber.assertNotCompleted();
testSubscriber.unsubscribe();
}
@Test
public void lightService_add() {
String LIGHT_UUID = "myUuid";
LightModel lightModel = new LightModel();
when(lightRepository.add(lightModel)).thenReturn(Observable.just(LIGHT_UUID));
TestSubscriber<String> tester = new TestSubscriber<>();
lightService.add(lightModel).subscribe(tester);
verify(lightRepository).add(lightModel);
tester.assertValue(LIGHT_UUID);
tester.assertCompleted();
tester.assertNoErrors();
}
@Test
public void performBeaconEncounter() {
String beaconId = MockModelFabric.randomString();
Encounter encounter = MockModelFabric.newEncounter();
doReturn(Observable.just(encounter))
.when(mMockRibotsService)
.performBeaconEncounter(anyString(), eq(beaconId));
TestSubscriber<Encounter> testSubscriber = new TestSubscriber<>();
mDataManager.performBeaconEncounter(beaconId).subscribe(testSubscriber);
testSubscriber.assertCompleted();
testSubscriber.assertValue(encounter);
verify(mMockPreferencesHelper).putLatestEncounter(encounter);
}
@Test
public void scenarioService_findByEnvironment() {
Integer ENVIRONMENT = 108;
List<ScenarioModel> scenarios = new ArrayList<>();
when(scenarioRepository.findByEnvironment(ENVIRONMENT)).thenReturn(Observable.just(scenarios));
TestSubscriber<List<ScenarioModel>> tester = new TestSubscriber<>();
scenarioService.findByEnvironment(ENVIRONMENT).subscribe(tester);
verify(scenarioRepository).findByEnvironment(ENVIRONMENT);
tester.assertValue(scenarios);
tester.assertCompleted();
tester.assertNoErrors();
}
@Test
public void testUploadFailedUnhandledException() throws Exception {
final File file = getFile(TEST_FILE);
final String jobId = "job-id";
final Job job = Job.builder()
.setId(jobId)
.setStatus(Status.createQueued(jobId))
.setMetadata(Collections.emptyMap())
.setFilepath(file.getPath())
.setMimeType("text/plain")
.build();
final UploadService service = mock(UploadService.class);
when(service.upload(anyMap(), any(MultipartBody.Part.class)))
.thenThrow(new RuntimeException(""));
final Uploader uploader = new Uploader(service, Schedulers.io());
final TestSubscriber<Status> ts = TestSubscriber.create();
uploader.upload(job, file).subscribe(ts);
ts.awaitTerminalEvent(1, TimeUnit.SECONDS);
ts.assertError(RuntimeException.class);
ts.assertNoValues();
}
@SuppressWarnings("unchecked")
@Test
public void bufferUntilWith5() {
TestSubscriber<List<Integer>> ts = new TestSubscriber<List<Integer>>();
Observable.range(1, 10)
.compose(Transformers.bufferUntil(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer v) {
return v == 5;
}
}))
.subscribe(ts);
ts.assertValues(
Arrays.asList(1, 2, 3, 4, 5),
Arrays.asList(6, 7, 8, 9, 10)
);
ts.assertNoErrors();
ts.assertCompleted();
}
@Test
public void givenStringObservable_whenEncodingString_ThenSuccessfullObtainingByteStream()
{
//given
Observable<String> sourceObservable = Observable.just("Lorem ipsum loream");
TestSubscriber<byte[]> subscriber = TestSubscriber.create();
// when
StringObservable.encode(sourceObservable, StandardCharsets.UTF_8)
.subscribe(subscriber);
// then
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.getOnNextEvents()
.stream()
.forEach(bytes -> Assert.assertTrue(Arrays.equals(bytes, "Lorem ipsum loream".getBytes(StandardCharsets.UTF_8))));
}
@Test
public void givenTimedObservable_whenThrottlingLast_thenThrottleLastItemsAreEmitted() {
TestScheduler testScheduler = new TestScheduler();
Observable<Integer> timedObservable =
Observable.just(1, 2, 3, 4, 5, 6)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler),
(item, time) -> item
);
TestSubscriber<Integer> subscriber = new TestSubscriber();
Observable<Integer> filteredObservable = timedObservable.throttleLast(3100L, TimeUnit.MILLISECONDS, testScheduler);
filteredObservable.subscribe(subscriber);
testScheduler.advanceTimeBy(7, TimeUnit.SECONDS);
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValues(4, 6);
}
@Test
public void rxObservableFallback() {
server.enqueue(new MockResponse().setResponseCode(500));
final TestInterface api = target();
final Observable<String> observable = api.observable();
assertThat(observable).isNotNull();
assertThat(server.getRequestCount()).isEqualTo(0);
final TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
observable.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
Assertions.assertThat(testSubscriber.getOnNextEvents().get(0)).isEqualTo("fallback");
}
@Test
public void ipcamService_findByEnvironment() {
Integer ENVIRONMENT = 108;
List<IpcamModel> ipcams = new ArrayList<>();
when(ipcamRepository.findByEnvironment(ENVIRONMENT)).thenReturn(Observable.just(ipcams));
TestSubscriber<List<IpcamModel>> tester = new TestSubscriber<>();
ipcamService.findByEnvironment(ENVIRONMENT).subscribe(tester);
verify(ipcamRepository).findByEnvironment(ENVIRONMENT);
tester.assertValue(ipcams);
tester.assertCompleted();
tester.assertNoErrors();
}
@Test
public void selectionStrategyShouldNotBeCalledWithEmptyEndpoints() {
EndpointFactoryMock ef = EndpointFactoryMock.simple(envCtx);
ef.onConnectTransition(new Func1<Endpoint, LifecycleState>() {
@Override
public LifecycleState call(Endpoint endpoint) {
return LifecycleState.DISCONNECTING;
}
});
SelectionStrategy ss = mock(SelectionStrategy.class);
MockedService ms = new MockedService(ServiceType.QUERY, ef, ssc(0, 2), ss);
ms.connect().toBlocking().single();
Tuple2<CouchbaseRequest, TestSubscriber<CouchbaseResponse>> mr1 = mockRequest();
ms.send(mr1.value1());
verify(ss, never()).select(any(CouchbaseRequest.class), any(List.class));
}
@Test
public void fetchProvidersForEmail() {
TestSubscriber<ProviderQueryResult> testSubscriber = new TestSubscriber<>();
RxFirebaseAuth.fetchProvidersForEmail(mockAuth, "email")
.subscribeOn(Schedulers.immediate())
.subscribe(testSubscriber);
testOnSuccessListener.getValue().onSuccess(mockProviderQueryResult);
testOnCompleteListener.getValue().onComplete(mockProviderQueryResultTask);
verify(mockAuth).fetchProvidersForEmail(eq("email"));
testSubscriber.assertNoErrors();
testSubscriber.assertValueCount(1);
testSubscriber.assertReceivedOnNext(Collections.singletonList(mockProviderQueryResult));
testSubscriber.assertCompleted();
testSubscriber.unsubscribe();
}
public static void main(String[] args) throws InterruptedException {
reset();
TestSubscriber<Object> ts = TestSubscriber.create();
IO.serverSocket(12345).readTimeoutMs(10000).bufferSize(8).create()
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> g) {
return g //
.compose(Bytes.collect()) //
.doOnNext(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
System.out.println(Thread.currentThread().getName() + ": "
+ new String(bytes).trim());
}
}) //
.onErrorResumeNext(Observable.<byte[]> empty()) //
.subscribeOn(scheduler);
}
}).subscribeOn(scheduler) //
.subscribe(ts);
Thread.sleep(10000000);
}
@Test
public void soundService_findById() {
String SOUND_UUID = "myUuid";
SoundModel soundModel = new SoundModel();
soundModel.setUuid(SOUND_UUID);
when(soundRepository.findById(SOUND_UUID)).thenReturn(Observable.just(soundModel));
TestSubscriber<SoundModel> tester = new TestSubscriber<>();
soundService.findById(SOUND_UUID).subscribe(tester);
verify(soundRepository).findById(SOUND_UUID);
tester.assertValue(soundModel);
tester.assertCompleted();
tester.assertNoErrors();
}
@Test
public void givenObservable_whenUsingToSortedListWithComparator_thenObtainedAnInverseSortedList() {
// given
Observable<Integer> sourceObservable = Observable.range(10, 5);
TestSubscriber<List> subscriber = TestSubscriber.create();
// when
Observable<List<Integer>> listObservable = sourceObservable.toSortedList((int1, int2) -> int2 - int1);
listObservable.subscribe(subscriber);
// then
subscriber.assertCompleted();
subscriber.assertNoErrors();
subscriber.assertValueCount(1);
subscriber.assertValue(Arrays.asList(14, 13, 12, 11, 10));
}
@Test
public void awaitEventWorks() throws Exception {
final TestSubscriber<String> testSubscriber = new TestSubscriber<>();
final AwaitableEventSubscriberDecorator<String> sub = new AwaitableEventSubscriberDecorator<>(testSubscriber);
final BehaviorSubject<String> subject = BehaviorSubject.create();
final Subscription subscription = subject.subscribe(sub);
async.run(() -> {
subject.onNext("hello");
subject.onNext("world");
subject.onNext("!");
subject.onCompleted();
});
sub.awaitEvent(Integer.MAX_VALUE);
testSubscriber.assertValues("hello", "world", "!");
testSubscriber.assertCompleted();
testSubscriber.assertNoErrors();
subscription.unsubscribe();
}
@Test
public void lightRepository_findFavourites() {
SampleModel model = new SampleModel();
model.setUuid("uuid1");
model.setEnvironmentId(108);
model.setFavourite(true);
List<SampleModel> lights = Arrays.asList(model);
when(databaseRealm.findCopyWhere(SampleModel.class, DomoticModel.FIELD_FAVOURITE,
true, DomoticModel.FIELD_NAME)).thenReturn(lights);
TestSubscriber<List<SampleModel>> tester = new TestSubscriber<>();
sampleRepository.findFavourites().subscribe(tester);
verify(databaseRealm).findCopyWhere(SampleModel.class, DomoticModel.FIELD_FAVOURITE,
true, DomoticModel.FIELD_NAME);
tester.assertValue(lights);
tester.assertCompleted();
tester.assertNoErrors();
}
@Test
public void environmentRepository_findById() {
Integer ENVIRONMENT_ID = 100;
EnvironmentModel environment = newEnvironmentModel(ENVIRONMENT_ID, "myName");
List<EnvironmentModel> environments = Arrays.asList(environment);
when(databaseRealm.findCopyWhere(EnvironmentModel.class, EnvironmentModel.FIELD_ID, ENVIRONMENT_ID, null))
.thenReturn(environments);
TestSubscriber<EnvironmentModel> tester = new TestSubscriber<>();
environmentRepository.findById(ENVIRONMENT_ID).subscribe(tester);
verify(databaseRealm).findCopyWhere(EnvironmentModel.class, EnvironmentModel.FIELD_ID, ENVIRONMENT_ID, null);
tester.assertValue(environment);
tester.assertCompleted();
tester.assertNoErrors();
}
@Test
public void validateExceptionsAreUnmodifiedWithMockClientTest() {
Class defaultExceptionClass = TargetGroupAssociationLimitException.class;
TestSubscriber testSubscriber = new TestSubscriber();
AmazonElasticLoadBalancingAsync albClient = mock(AmazonElasticLoadBalancingAsync.class);
when(albClient.describeTargetHealthAsync(any(), any())).thenThrow(defaultExceptionClass);
awsLoadBalancerConnector = getAwsLoadBalancerConnector(albClient);
awsLoadBalancerConnector.getLoadBalancer(targetGroupWithTargets).subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
List<Throwable> errors = testSubscriber.getOnErrorEvents();
assertEquals(1, errors.size());
Throwable throwable = errors.get(0);
assertFalse(throwable instanceof LoadBalancerException);
assertTrue(throwable instanceof TargetGroupAssociationLimitException);
}
@Test
public void getBurgerStream_doesNotEmit_whenNoMeat() {
new ArrangeBuilder()
.withBun(BUN)
.withTomatoSlice(TOMATOE_SLICE);
TestSubscriber<Burger> testSubscriber = new TestSubscriber<>();
mViewModel.getBurgerStream().subscribe(testSubscriber);
testSubscriber.assertNoValues();
}
@Test
public void testOnStop() throws Exception {
Subscription test = new TestSubscriber<>();
basePresenter.addSubscription(test);
basePresenter.onStop();
assertTrue(test.isUnsubscribed());
}
@Test
public void testAcceptSocketRejectsAlways()
throws UnknownHostException, IOException, InterruptedException {
reset();
TestSubscriber<Object> ts = TestSubscriber.create();
try {
int bufferSize = 4;
AtomicInteger port = new AtomicInteger();
IO.serverSocketAutoAllocatePort(Actions.setAtomic(port)) //
.readTimeoutMs(10000) //
.acceptTimeoutMs(200) //
.bufferSize(bufferSize) //
.acceptSocketIf(Functions.alwaysFalse()) //
.create() //
.subscribeOn(scheduler) //
.subscribe(ts);
Thread.sleep(300);
Socket socket = new Socket("localhost", port.get());
OutputStream out = socket.getOutputStream();
out.write("12345678901234567890".getBytes());
out.close();
socket.close();
Thread.sleep(1000);
ts.assertNoValues();
} finally {
// will close server socket
ts.unsubscribe();
}
}
@Test
public void shouldRemainConstantWhenValuesAreWithinRope() throws IOException {
// Arrange
final String cfg2 = "{\"setPoint\": 10,\n" +
" \"invert\": true,\n" +
" \"rope\": 3.0,\n" +
" \"kp\": 0.2,\n" +
" \"ki\": 0.0,\n" +
" \"kd\": 0.0,\n" +
" \"minScale\": 2,\n" +
" \"maxScale\": 5\n" +
" }";
Observable<Double> totalRPS = Observable.just(30.0, 32.0, 28.0, 31.0, 30.0, 29.0, 31.0);
AdaptiveAutoscalerConfig config = objectMapper.readValue(cfg2, new TypeReference<AdaptiveAutoscalerConfig>() {});
JobAutoScaler.StageScaler scaler = mock(JobAutoScaler.StageScaler.class);
AdaptiveAutoscaler autoScaler = new AdaptiveAutoscaler(config, scaler, 3);
TestSubscriber<Long> testSubscriber = new TestSubscriber<>();
// Act
AtomicLong numWorkers = new AtomicLong(3);
totalRPS.map(rps -> new JobAutoScaler.Event(StageScalingPolicy.ScalingReason.CPU,
1,
rps / (1.0 * numWorkers.get()),
((Long) numWorkers.get()).intValue(),
"message"))
.compose(autoScaler)
.map(x -> (Long) x)
.doOnNext(numWorkers::set)
.subscribe(testSubscriber);
// Assert
testSubscriber.assertCompleted();
testSubscriber.assertNoErrors();
testSubscriber.assertValues(3L, 3L, 3L, 3L, 3L, 3L, 3L);
}
@Test
public void shouldProvideBitmap() throws Exception {
TestSubscriber<Bitmap> testSubscriber = new TestSubscriber<>();
new SnapshotFunc().call(googleMap)
.subscribe(testSubscriber);
verify(googleMap).snapshot(argumentCaptor.capture());
argumentCaptor.getValue().onSnapshotReady(null);
testSubscriber.assertNoErrors();
testSubscriber.assertValueCount(1);
testSubscriber.assertValue(null);
}