io.reactivex.Single#subscribe ( )源码实例Demo

下面列出了io.reactivex.Single#subscribe ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

@Test
public void testUnsubscribe() throws ExecutionException, InterruptedException {
    T future = originalFutureTestHelper.createRunningFuture();

    Single<String> single = toSingle(future);
    Consumer<String> onSuccess = mockAction();
    Consumer<Throwable> onError = mockAction();


    Disposable disposable = single.subscribe(
        onSuccess,
        onError
    );

    disposable.dispose();
    assertTrue(disposable.isDisposed());

    originalFutureTestHelper.finishRunningFuture();
    Thread.sleep(10); //do not know how to wait for something to not happen

    verifyZeroInteractions(onSuccess);
    verifyZeroInteractions(onError);
}
 
@Test
public void testConvertToSingleFinished() throws Exception {
    T completable = originalFutureTestHelper.createFinishedFuture();

    Single<String> single = toSingle(completable);
    Consumer<String> onSuccess = mockAction();
    Consumer<Throwable> onError = mockAction();

    single.subscribe(v -> {
            onSuccess.accept(v);
            latch.countDown();
        },
        onError);

    latch.await();

    verify(onSuccess).accept(VALUE);
    verifyZeroInteractions(onError);

    assertSame(completable, toFuture(single));
}
 
@Test
public void oneSubscriptionShouldNotCancelFuture() throws Exception {
    T future = originalFutureTestHelper.createRunningFuture();

    Single<String> single = toSingle(future).toObservable().publish().refCount().singleOrError();
    Consumer<String> onSuccess = mockAction();
    Consumer<Throwable> onError = mockAction();

    single.subscribe(v -> {
        onSuccess.accept(v);
        latch.countDown();
    }, onError);
    verifyZeroInteractions(onSuccess);
    verifyZeroInteractions(onError);

    single.subscribe(v -> {}).dispose();

    originalFutureTestHelper.finishRunningFuture();
    latch.await();

    //wait for the result
    verify(onSuccess).accept(VALUE);
    verifyZeroInteractions(onError);
}
 
private void deployReporterVerticle(Collection<Reporter> reporters) {
    Single<String> deployment = RxHelper.deployVerticle(vertx, applicationContext.getBean(AuditReporterVerticle.class));

    deployment.subscribe(id -> {
        // Deployed
        deploymentId = id;
        if (!reporters.isEmpty()) {
            for (io.gravitee.reporter.api.Reporter reporter : reporters) {
                try {
                    logger.info("Starting reporter: {}", reporter);
                    reporter.start();
                } catch (Exception ex) {
                    logger.error("Unexpected error while starting reporter", ex);
                }
            }
        } else {
            logger.info("\tThere is no reporter to start");
        }
    }, err -> {
        // Could not deploy
        logger.error("Reporter service can not be started", err);
    });
}
 
源代码5 项目: mimi-reader   文件: EditFiltersActivity.java
protected void loadFilters(String boardName) {
    final Single<List<Filter>> loadFiltersObservable;
    if (TextUtils.isEmpty(boardName)) {
        loadFiltersObservable = FilterTableConnection.fetchFilters();
    } else {
        loadFiltersObservable = FilterTableConnection.fetchFiltersByBoard(boardName);
    }

    filtersSubscription = loadFiltersObservable.subscribe(
            filters -> {
                if (filterListAdapter == null) {
                    filterListAdapter = new FilterListAdapter(filters);
                    filterList.setAdapter(filterListAdapter);
                } else {
                    filterListAdapter.setFilters(filters);
                }
            },
            throwable -> Log.e(LOG_TAG, "Error fetching filters", throwable));

}
 
@Override
public void afterPropertiesSet() throws Exception {
    logger.info("Initializing reporters for domain {}", domain.getName());
    logger.info("\t Starting reporter verticle for domain {}", domain.getName());

    Single<String> deployment = RxHelper.deployVerticle(vertx, applicationContext.getBean(AuditReporterVerticle.class));
    deployment.subscribe(id -> {
        // Deployed
        deploymentId = id;
        // Start reporters
        List<Reporter> reporters = reporterRepository.findByDomain(domain.getId()).blockingGet();
        if (!reporters.isEmpty()) {
            reporters.forEach(reporter -> startReporterProvider(reporter));
            logger.info("Reporters loaded for domain {}", domain.getName());
        } else {
            logger.info("\tThere is no reporter to start");
        }
    }, err -> {
        // Could not deploy
        logger.error("Reporter service can not be started", err);
    });
}
 
源代码7 项目: cxf   文件: ReactiveIOInvoker.java
protected AsyncResponseImpl handleSingle(Message inMessage, Single<?> single) {
    final AsyncResponseImpl asyncResponse = new AsyncResponseImpl(inMessage);
    Disposable d = single.subscribe(asyncResponse::resume, t -> handleThrowable(asyncResponse, t));
    if (d == null) {
        throw new IllegalStateException("Subscribe did not return a Disposable");
    }
    return asyncResponse;
}
 
源代码8 项目: vertx-rx   文件: RxWebClientExamples.java
public void sendFlowable(WebClient client) {

    Flowable<Buffer> body = getPayload();

    Single<HttpResponse<Buffer>> single = client
      .post(8080, "myserver.mycompany.com", "/some-uri")
      .rxSendStream(body);
    single.subscribe(resp -> {
      System.out.println(resp.statusCode());
      System.out.println(resp.body());
    });
  }
 
源代码9 项目: edslite   文件: PreviewFragment.java
private void loadImage(Rect regionRect)
{		
	if(_currentImagePath == null)
		return;
	Logger.debug(TAG + ": loading image");
	if(regionRect == null)
		showLoading();
	Single<LoadedImage> loadImageTaskObservable = LoadedImage.createObservable(getActivity().getApplicationContext(), _currentImagePath, _viewRect, regionRect).
			subscribeOn(Schedulers.io()).
			observeOn(AndroidSchedulers.mainThread()).
			compose(bindToLifecycle());
	if(GlobalConfig.isTest())
	{
		loadImageTaskObservable = loadImageTaskObservable.
				doOnSubscribe(sub -> TEST_LOAD_IMAGE_TASK_OBSERVABLE.onNext(true)).
				doFinally(() -> TEST_LOAD_IMAGE_TASK_OBSERVABLE.onNext(false));
	}

	loadImageTaskObservable.subscribe(res -> {
				if(regionRect == null)
				{
					_mainImageView.setImage(
							res.getImageData(),
							res.getSampleSize(),
							res.getRotation(),
							res.getFlipX(),
							res.getFlipY());
					_isOptimSupported = res.isOptimSupported();
					showImage();
				}
				else
					_mainImageView.setOptimImage(res.getImageData(), res.getSampleSize());

			}, err -> {
				if(!(err instanceof CancellationException))
					Logger.showAndLog(getActivity(), err);
			});
}
 
源代码10 项目: vertx-rx   文件: HelperTest.java
@Test
public void testToSingleObserverFailure() {
  Promise<String> promise = Promise.promise();
  SingleObserver<String> observer = SingleHelper.toObserver(promise);
  RuntimeException cause = new RuntimeException();
  Single<String> s = Single.error(cause);
  s.subscribe(observer);
  assertTrue(promise.future().failed());
  assertSame(cause, promise.future().cause());
}
 
@Test
public void test_Single() {
	Single<String> single = Single.just("FRUITS");
	TestObserver<String> testObserver = new TestObserver<>();

	single.subscribe(testObserver);

	testObserver.assertComplete();
	testObserver.assertValueCount(1);
	testObserver.assertNoErrors();
	testObserver.assertValues("FRUITS");
}
 
源代码12 项目: telegram-bot   文件: UpdateHandlerImpl.java
private void sendDmiPhoto(Long chatId, Single<List<DmiCity>> dmiCities, String mode) {
	dmiCities.subscribe(cities -> cities.stream().findFirst().ifPresent(dmiCity -> {
		Single<ResponseBody> weatherImage = dmiApi.getWeatherImage(String.valueOf(dmiCity.getId()), mode);
		weatherImage.subscribe(rb -> {
			SendPhoto sendPhoto = new SendPhoto(chatId, rb.bytes());
			telegramBot.execute(sendPhoto);
		});
	}));
}
 
源代码13 项目: objectbox-java   文件: QueryObserverTest.java
@Test
public void testSingle() {
    publisher.setQueryResult(listResult);
    Single single = RxQuery.single(mockQuery.getQuery());
    single.subscribe((SingleObserver) this);
    assertLatchCountedDown(latch, 2);
    assertEquals(1, receivedChanges.size());
    assertEquals(2, receivedChanges.get(0).size());

    receivedChanges.clear();
    publisher.publish();
    assertNoMoreResults();
}
 
源代码14 项目: vertx-rx   文件: RxWebClientExamples.java
public void simpleGet(WebClient client) {

    // Create the RxJava single for an HttpRequest
    // at this point no HTTP request has been sent to the server
    Single<HttpResponse<Buffer>> single = client
      .get(8080, "myserver.mycompany.com", "/some-uri")
      .rxSend();

    // Send a request upon subscription of the Single
    single.subscribe(response -> System.out.println("Received 1st response with status code" + response.statusCode()), error -> System.out.println("Something went wrong " + error.getMessage()));

    // Send another request
    single.subscribe(response -> System.out.println("Received 2nd response with status code" + response.statusCode()), error -> System.out.println("Something went wrong " + error.getMessage()));
  }
 
源代码15 项目: Mockery   文件: Rx2RetrofitInterceptorTest.java
private void checkDelaySingle(Single single, long millisDelayed) {
  long startNanos = System.nanoTime();

  TestObserver subscriber = new TestObserver();
  single.subscribe(subscriber);
  subscriber.awaitTerminalEvent();

  long tookMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);

  assertTrue("Mismatch delayed. TookMs: " + tookMs
      + " MillisDelayed: " + millisDelayed, tookMs >= millisDelayed);
}
 
源代码16 项目: rxjava2-jdbc   文件: NonBlockingPoolTest.java
@Test
public void testMaxIdleTimeResetIfUsed() throws InterruptedException {
    TestScheduler s = new TestScheduler();
    AtomicInteger count = new AtomicInteger();
    AtomicInteger disposed = new AtomicInteger();
    Pool<Integer> pool = NonBlockingPool //
            .factory(() -> count.incrementAndGet()) //
            .healthCheck(n -> true) //
            .maxSize(1) //
            .maxIdleTime(2, TimeUnit.MINUTES) //
            .disposer(n -> disposed.incrementAndGet()) //
            .scheduler(s) //
            .build();
    Single<Member<Integer>> member = pool.member() //
            .doOnSuccess(System.out::println) //
            .doOnSuccess(m -> m.checkin());
    member.subscribe();
    s.triggerActions();
    assertEquals(0, disposed.get());
    s.advanceTimeBy(1, TimeUnit.MINUTES);
    s.triggerActions();
    member.subscribe();
    s.advanceTimeBy(1, TimeUnit.MINUTES);
    s.triggerActions();
    assertEquals(0, disposed.get());
    s.advanceTimeBy(1, TimeUnit.MINUTES);
    s.triggerActions();
    assertEquals(1, disposed.get());
}
 
源代码17 项目: rxmqtt   文件: PahoObservableMqttClientITCase.java
@Test
public void itCanPublish() throws Throwable {

    AsyncPahoUtils.connect(this.asyncClient);

    final CountDownLatch latch = new CountDownLatch(2);
    final AtomicReference<IMqttDeliveryToken> token = new AtomicReference<>();
    final AtomicReference<PublishToken> pubToken = new AtomicReference<>();

    // Callback to monitor delivery completion
    this.asyncClient.setCallback(new MqttCallback() {

        @Override
        public void messageArrived(final String topic,
                final MqttMessage message)
                throws Exception {
        }

        @Override
        public void deliveryComplete(final IMqttDeliveryToken t) {
            token.set(t);
            latch.countDown();
        }

        @Override
        public void connectionLost(final Throwable cause) {
        }
    });

    // Publish the message
    final PublishMessage msg = PublishMessage
            .create(new byte[] { 'a', 'b', 'c' }, 1, false);
    final Single<PublishToken> obs = this.observableClient.publish(TOPIC,
            msg);

    // Subscribe for result
    obs.subscribe(r -> {
        pubToken.set(r);
        latch.countDown();
    });

    // Await for async completion
    latch.await();
    final IMqttDeliveryToken iMqttDeliveryToken = token.get();
    final PublishToken publishToken = pubToken.get();
    Assert.assertNotNull(iMqttDeliveryToken);
    Assert.assertNotNull(publishToken);
    Assert.assertNotNull(publishToken.getClientId());
    Assert.assertEquals(iMqttDeliveryToken.getClient().getClientId(),
            publishToken.getClientId());
    Assert.assertNotNull(publishToken.getMessageId());
    Assert.assertEquals(iMqttDeliveryToken.getMessageId(),
            publishToken.getMessageId());
    Assert.assertNotNull(publishToken.getTopics());
    Assert.assertArrayEquals(iMqttDeliveryToken.getTopics(),
            publishToken.getTopics());
    Assert.assertNotNull(publishToken.getMessageId());
    Assert.assertEquals(iMqttDeliveryToken.getMessageId(),
            publishToken.getMessageId());

}
 
@SuppressWarnings("CheckResult")
@Test
public void shouldCallOnErrorIfExceptionOccurred() {
    //noinspection unchecked
    final PreparedOperation<Object, Optional<Object>, Object> preparedOperation = mock(PreparedOperation.class);

    StorIOException expectedException = new StorIOException("test exception");

    when(preparedOperation.executeAsBlocking()).thenThrow(expectedException);

    TestObserver<Optional<Object>> testObserver = new TestObserver<Optional<Object>>();

    Single<Optional<Object>> single =
            Single.create(new SingleOnSubscribeExecuteAsBlockingOptional<Object, Object>(preparedOperation));

    verifyZeroInteractions(preparedOperation);

    single.subscribe(testObserver);

    testObserver.assertError(expectedException);
    testObserver.assertNotComplete();

    verify(preparedOperation).executeAsBlocking();
}
 
@SuppressWarnings("CheckResult")
@Test
public void shouldCallOnErrorIfExceptionOccurred() {
    //noinspection unchecked
    final PreparedOperation<Object, Object, Object> preparedOperation = mock(PreparedOperation.class);

    StorIOException expectedException = new StorIOException("test exception");

    when(preparedOperation.executeAsBlocking()).thenThrow(expectedException);

    TestObserver<Object> testObserver = new TestObserver<Object>();

    Single<Object> single = Single.create(new SingleOnSubscribeExecuteAsBlocking<Object, Object, Object>(preparedOperation));

    verifyZeroInteractions(preparedOperation);

    single.subscribe(testObserver);

    testObserver.assertError(expectedException);
    testObserver.assertNotComplete();

    verify(preparedOperation).executeAsBlocking();
}
 
源代码20 项目: science-journal   文件: BluetoothSensor.java
@Override
protected SensorRecorder makeScalarControl(
    final StreamConsumer c,
    final SensorEnvironment environment,
    Context context,
    final SensorStatusListener listener) {
  Single<BleFlow> whenFlow =
      environment.getConnectedBleClient().map(client -> client.getFlowFor(address)).cache();

  return new AbstractSensorRecorder() {
    @Override
    public void startObserving() {
      // make BLE connection
      listener.onSourceStatus(getId(), SensorStatusListener.STATUS_CONNECTING);
      whenFlow.subscribe(
          flow -> {
            bleFlowListener =
                createBleFlowListener(c, environment.getDefaultClock(), listener, flow);
            flow.resetAndAddListener(bleFlowListener, true)
                .connect()
                .lookupService(serviceSpec.getServiceId());
            BleFlow.run(flow);
          });
    }

    @Override
    public void stopObserving() {
      whenFlow.subscribe(
          flow -> {
            // Don't reset service map: should still be valid from above, and it doesn't work
            // on ChromeBooks
            flow.resetAndAddListener(bleFlowListener, false);
            if (notificationSubscribed) {
              flow.lookupService(serviceSpec.getServiceId())
                  .lookupCharacteristic(serviceSpec.getServiceId(), serviceSpec.getValueId())
                  .disableNotification();
              BleFlow.run(flow);
            } else {
              flow.disconnect();
              BleFlow.run(flow);
            }
          });
    }
  };
}