下面列出了io.reactivex.Single#subscribe ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
public void testUnsubscribe() throws ExecutionException, InterruptedException {
T future = originalFutureTestHelper.createRunningFuture();
Single<String> single = toSingle(future);
Consumer<String> onSuccess = mockAction();
Consumer<Throwable> onError = mockAction();
Disposable disposable = single.subscribe(
onSuccess,
onError
);
disposable.dispose();
assertTrue(disposable.isDisposed());
originalFutureTestHelper.finishRunningFuture();
Thread.sleep(10); //do not know how to wait for something to not happen
verifyZeroInteractions(onSuccess);
verifyZeroInteractions(onError);
}
@Test
public void testConvertToSingleFinished() throws Exception {
T completable = originalFutureTestHelper.createFinishedFuture();
Single<String> single = toSingle(completable);
Consumer<String> onSuccess = mockAction();
Consumer<Throwable> onError = mockAction();
single.subscribe(v -> {
onSuccess.accept(v);
latch.countDown();
},
onError);
latch.await();
verify(onSuccess).accept(VALUE);
verifyZeroInteractions(onError);
assertSame(completable, toFuture(single));
}
@Test
public void oneSubscriptionShouldNotCancelFuture() throws Exception {
T future = originalFutureTestHelper.createRunningFuture();
Single<String> single = toSingle(future).toObservable().publish().refCount().singleOrError();
Consumer<String> onSuccess = mockAction();
Consumer<Throwable> onError = mockAction();
single.subscribe(v -> {
onSuccess.accept(v);
latch.countDown();
}, onError);
verifyZeroInteractions(onSuccess);
verifyZeroInteractions(onError);
single.subscribe(v -> {}).dispose();
originalFutureTestHelper.finishRunningFuture();
latch.await();
//wait for the result
verify(onSuccess).accept(VALUE);
verifyZeroInteractions(onError);
}
private void deployReporterVerticle(Collection<Reporter> reporters) {
Single<String> deployment = RxHelper.deployVerticle(vertx, applicationContext.getBean(AuditReporterVerticle.class));
deployment.subscribe(id -> {
// Deployed
deploymentId = id;
if (!reporters.isEmpty()) {
for (io.gravitee.reporter.api.Reporter reporter : reporters) {
try {
logger.info("Starting reporter: {}", reporter);
reporter.start();
} catch (Exception ex) {
logger.error("Unexpected error while starting reporter", ex);
}
}
} else {
logger.info("\tThere is no reporter to start");
}
}, err -> {
// Could not deploy
logger.error("Reporter service can not be started", err);
});
}
protected void loadFilters(String boardName) {
final Single<List<Filter>> loadFiltersObservable;
if (TextUtils.isEmpty(boardName)) {
loadFiltersObservable = FilterTableConnection.fetchFilters();
} else {
loadFiltersObservable = FilterTableConnection.fetchFiltersByBoard(boardName);
}
filtersSubscription = loadFiltersObservable.subscribe(
filters -> {
if (filterListAdapter == null) {
filterListAdapter = new FilterListAdapter(filters);
filterList.setAdapter(filterListAdapter);
} else {
filterListAdapter.setFilters(filters);
}
},
throwable -> Log.e(LOG_TAG, "Error fetching filters", throwable));
}
@Override
public void afterPropertiesSet() throws Exception {
logger.info("Initializing reporters for domain {}", domain.getName());
logger.info("\t Starting reporter verticle for domain {}", domain.getName());
Single<String> deployment = RxHelper.deployVerticle(vertx, applicationContext.getBean(AuditReporterVerticle.class));
deployment.subscribe(id -> {
// Deployed
deploymentId = id;
// Start reporters
List<Reporter> reporters = reporterRepository.findByDomain(domain.getId()).blockingGet();
if (!reporters.isEmpty()) {
reporters.forEach(reporter -> startReporterProvider(reporter));
logger.info("Reporters loaded for domain {}", domain.getName());
} else {
logger.info("\tThere is no reporter to start");
}
}, err -> {
// Could not deploy
logger.error("Reporter service can not be started", err);
});
}
protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
if (d == null) {
throw new IllegalStateException("Subscribe did not return a Disposable");
}
return asyncResponse;
}
public void sendFlowable(WebClient client) {
Flowable<Buffer> body = getPayload();
Single<HttpResponse<Buffer>> single = client
.post(8080, "myserver.mycompany.com", "/some-uri")
.rxSendStream(body);
single.subscribe(resp -> {
System.out.println(resp.statusCode());
System.out.println(resp.body());
});
}
private void loadImage(Rect regionRect)
{
if(_currentImagePath == null)
return;
Logger.debug(TAG + ": loading image");
if(regionRect == null)
showLoading();
Single<LoadedImage> loadImageTaskObservable = LoadedImage.createObservable(getActivity().getApplicationContext(), _currentImagePath, _viewRect, regionRect).
subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread()).
compose(bindToLifecycle());
if(GlobalConfig.isTest())
{
loadImageTaskObservable = loadImageTaskObservable.
doOnSubscribe(sub -> TEST_LOAD_IMAGE_TASK_OBSERVABLE.onNext(true)).
doFinally(() -> TEST_LOAD_IMAGE_TASK_OBSERVABLE.onNext(false));
}
loadImageTaskObservable.subscribe(res -> {
if(regionRect == null)
{
_mainImageView.setImage(
res.getImageData(),
res.getSampleSize(),
res.getRotation(),
res.getFlipX(),
res.getFlipY());
_isOptimSupported = res.isOptimSupported();
showImage();
}
else
_mainImageView.setOptimImage(res.getImageData(), res.getSampleSize());
}, err -> {
if(!(err instanceof CancellationException))
Logger.showAndLog(getActivity(), err);
});
}
@Test
public void testToSingleObserverFailure() {
Promise<String> promise = Promise.promise();
SingleObserver<String> observer = SingleHelper.toObserver(promise);
RuntimeException cause = new RuntimeException();
Single<String> s = Single.error(cause);
s.subscribe(observer);
assertTrue(promise.future().failed());
assertSame(cause, promise.future().cause());
}
@Test
public void test_Single() {
Single<String> single = Single.just("FRUITS");
TestObserver<String> testObserver = new TestObserver<>();
single.subscribe(testObserver);
testObserver.assertComplete();
testObserver.assertValueCount(1);
testObserver.assertNoErrors();
testObserver.assertValues("FRUITS");
}
private void sendDmiPhoto(Long chatId, Single<List<DmiCity>> dmiCities, String mode) {
dmiCities.subscribe(cities -> cities.stream().findFirst().ifPresent(dmiCity -> {
Single<ResponseBody> weatherImage = dmiApi.getWeatherImage(String.valueOf(dmiCity.getId()), mode);
weatherImage.subscribe(rb -> {
SendPhoto sendPhoto = new SendPhoto(chatId, rb.bytes());
telegramBot.execute(sendPhoto);
});
}));
}
@Test
public void testSingle() {
publisher.setQueryResult(listResult);
Single single = RxQuery.single(mockQuery.getQuery());
single.subscribe((SingleObserver) this);
assertLatchCountedDown(latch, 2);
assertEquals(1, receivedChanges.size());
assertEquals(2, receivedChanges.get(0).size());
receivedChanges.clear();
publisher.publish();
assertNoMoreResults();
}
public void simpleGet(WebClient client) {
// Create the RxJava single for an HttpRequest
// at this point no HTTP request has been sent to the server
Single<HttpResponse<Buffer>> single = client
.get(8080, "myserver.mycompany.com", "/some-uri")
.rxSend();
// Send a request upon subscription of the Single
single.subscribe(response -> System.out.println("Received 1st response with status code" + response.statusCode()), error -> System.out.println("Something went wrong " + error.getMessage()));
// Send another request
single.subscribe(response -> System.out.println("Received 2nd response with status code" + response.statusCode()), error -> System.out.println("Something went wrong " + error.getMessage()));
}
private void checkDelaySingle(Single single, long millisDelayed) {
long startNanos = System.nanoTime();
TestObserver subscriber = new TestObserver();
single.subscribe(subscriber);
subscriber.awaitTerminalEvent();
long tookMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
assertTrue("Mismatch delayed. TookMs: " + tookMs
+ " MillisDelayed: " + millisDelayed, tookMs >= millisDelayed);
}
@Test
public void testMaxIdleTimeResetIfUsed() throws InterruptedException {
TestScheduler s = new TestScheduler();
AtomicInteger count = new AtomicInteger();
AtomicInteger disposed = new AtomicInteger();
Pool<Integer> pool = NonBlockingPool //
.factory(() -> count.incrementAndGet()) //
.healthCheck(n -> true) //
.maxSize(1) //
.maxIdleTime(2, TimeUnit.MINUTES) //
.disposer(n -> disposed.incrementAndGet()) //
.scheduler(s) //
.build();
Single<Member<Integer>> member = pool.member() //
.doOnSuccess(System.out::println) //
.doOnSuccess(m -> m.checkin());
member.subscribe();
s.triggerActions();
assertEquals(0, disposed.get());
s.advanceTimeBy(1, TimeUnit.MINUTES);
s.triggerActions();
member.subscribe();
s.advanceTimeBy(1, TimeUnit.MINUTES);
s.triggerActions();
assertEquals(0, disposed.get());
s.advanceTimeBy(1, TimeUnit.MINUTES);
s.triggerActions();
assertEquals(1, disposed.get());
}
@Test
public void itCanPublish() throws Throwable {
AsyncPahoUtils.connect(this.asyncClient);
final CountDownLatch latch = new CountDownLatch(2);
final AtomicReference<IMqttDeliveryToken> token = new AtomicReference<>();
final AtomicReference<PublishToken> pubToken = new AtomicReference<>();
// Callback to monitor delivery completion
this.asyncClient.setCallback(new MqttCallback() {
@Override
public void messageArrived(final String topic,
final MqttMessage message)
throws Exception {
}
@Override
public void deliveryComplete(final IMqttDeliveryToken t) {
token.set(t);
latch.countDown();
}
@Override
public void connectionLost(final Throwable cause) {
}
});
// Publish the message
final PublishMessage msg = PublishMessage
.create(new byte[] { 'a', 'b', 'c' }, 1, false);
final Single<PublishToken> obs = this.observableClient.publish(TOPIC,
msg);
// Subscribe for result
obs.subscribe(r -> {
pubToken.set(r);
latch.countDown();
});
// Await for async completion
latch.await();
final IMqttDeliveryToken iMqttDeliveryToken = token.get();
final PublishToken publishToken = pubToken.get();
Assert.assertNotNull(iMqttDeliveryToken);
Assert.assertNotNull(publishToken);
Assert.assertNotNull(publishToken.getClientId());
Assert.assertEquals(iMqttDeliveryToken.getClient().getClientId(),
publishToken.getClientId());
Assert.assertNotNull(publishToken.getMessageId());
Assert.assertEquals(iMqttDeliveryToken.getMessageId(),
publishToken.getMessageId());
Assert.assertNotNull(publishToken.getTopics());
Assert.assertArrayEquals(iMqttDeliveryToken.getTopics(),
publishToken.getTopics());
Assert.assertNotNull(publishToken.getMessageId());
Assert.assertEquals(iMqttDeliveryToken.getMessageId(),
publishToken.getMessageId());
}
@SuppressWarnings("CheckResult")
@Test
public void shouldCallOnErrorIfExceptionOccurred() {
//noinspection unchecked
final PreparedOperation<Object, Optional<Object>, Object> preparedOperation = mock(PreparedOperation.class);
StorIOException expectedException = new StorIOException("test exception");
when(preparedOperation.executeAsBlocking()).thenThrow(expectedException);
TestObserver<Optional<Object>> testObserver = new TestObserver<Optional<Object>>();
Single<Optional<Object>> single =
Single.create(new SingleOnSubscribeExecuteAsBlockingOptional<Object, Object>(preparedOperation));
verifyZeroInteractions(preparedOperation);
single.subscribe(testObserver);
testObserver.assertError(expectedException);
testObserver.assertNotComplete();
verify(preparedOperation).executeAsBlocking();
}
@SuppressWarnings("CheckResult")
@Test
public void shouldCallOnErrorIfExceptionOccurred() {
//noinspection unchecked
final PreparedOperation<Object, Object, Object> preparedOperation = mock(PreparedOperation.class);
StorIOException expectedException = new StorIOException("test exception");
when(preparedOperation.executeAsBlocking()).thenThrow(expectedException);
TestObserver<Object> testObserver = new TestObserver<Object>();
Single<Object> single = Single.create(new SingleOnSubscribeExecuteAsBlocking<Object, Object, Object>(preparedOperation));
verifyZeroInteractions(preparedOperation);
single.subscribe(testObserver);
testObserver.assertError(expectedException);
testObserver.assertNotComplete();
verify(preparedOperation).executeAsBlocking();
}
@Override
protected SensorRecorder makeScalarControl(
final StreamConsumer c,
final SensorEnvironment environment,
Context context,
final SensorStatusListener listener) {
Single<BleFlow> whenFlow =
environment.getConnectedBleClient().map(client -> client.getFlowFor(address)).cache();
return new AbstractSensorRecorder() {
@Override
public void startObserving() {
// make BLE connection
listener.onSourceStatus(getId(), SensorStatusListener.STATUS_CONNECTING);
whenFlow.subscribe(
flow -> {
bleFlowListener =
createBleFlowListener(c, environment.getDefaultClock(), listener, flow);
flow.resetAndAddListener(bleFlowListener, true)
.connect()
.lookupService(serviceSpec.getServiceId());
BleFlow.run(flow);
});
}
@Override
public void stopObserving() {
whenFlow.subscribe(
flow -> {
// Don't reset service map: should still be valid from above, and it doesn't work
// on ChromeBooks
flow.resetAndAddListener(bleFlowListener, false);
if (notificationSubscribed) {
flow.lookupService(serviceSpec.getServiceId())
.lookupCharacteristic(serviceSpec.getServiceId(), serviceSpec.getValueId())
.disableNotification();
BleFlow.run(flow);
} else {
flow.disconnect();
BleFlow.run(flow);
}
});
}
};
}