下面列出了io.reactivex.Flowable#empty ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Override
public Flowable<MediaEntity.MediaResponseData> getMediaList(int imageOffset, int videoOffset, String bucketId) {
if (TextUtils.isEmpty(bucketId)) {
return Flowable.empty();
}
if (TextUtils.equals(bucketId, String.valueOf(Integer.MAX_VALUE))) {
// 图片和视频
return Flowable.create((FlowableOnSubscribe<MediaEntity.MediaResponseData>) e -> {
e.onNext(handleImageAndVideoMediaList(imageOffset, videoOffset));
e.onComplete();
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
} else if (TextUtils.equals(bucketId, String.valueOf(Integer.MIN_VALUE))) {
// 所有视频
return Flowable.create((FlowableOnSubscribe<MediaEntity.MediaResponseData>) e -> {
e.onNext(handleVideoMediaList(imageOffset, videoOffset));
e.onComplete();
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
} else {
return Flowable.create((FlowableOnSubscribe<MediaEntity.MediaResponseData>) e -> {
e.onNext(handleImageMediaList(imageOffset, videoOffset, bucketId));
e.onComplete();
}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
}
}
@Override
public Flowable<Optional<Task>> getTask(@NonNull String taskId) {
final Task task = TASKS_SERVICE_DATA.get(taskId);
if (task != null) {
return Flowable.just(Optional.of(task))
.delay(SERVICE_LATENCY_IN_MILLIS, TimeUnit.MILLISECONDS);
} else {
return Flowable.empty();
}
}
public Flowable<List<ScheduleCache>> getScheduleCollectCaches() {
try (final Realm realm = Realm.getInstance(realmConfiguration)) {
RealmResults<ScheduleCache> queryResult = realm.where(ScheduleCache.class)
.equalTo("isCollect", true)
.findAll();
if (queryResult != null) {
return Flowable.just(realm.copyFromRealm(queryResult));
} else {
return Flowable.empty();
}
}
}
@Test(timeout = 2000)
public void prepareResetsSubscriberRef() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
AtomicBoolean exceptionThrown = new AtomicBoolean(false);
AsyncResponseTransformer<SdkResponse, Void> transformer =
EventStreamAsyncResponseTransformer.builder()
.eventStreamResponseHandler(
onEventStream(p -> {
try {
p.subscribe(e -> {});
} catch (Throwable t) {
exceptionThrown.set(true);
} finally {
latch.countDown();
}
}))
.eventResponseHandler((r, e) -> null)
.executor(Executors.newFixedThreadPool(2))
.future(new CompletableFuture<>())
.build();
Flowable<ByteBuffer> bytePublisher = Flowable.empty();
CompletableFuture<Void> transformFuture = transformer.prepare();
transformer.onStream(SdkPublisher.adapt(bytePublisher));
transformFuture.join();
transformFuture = transformer.prepare();
transformer.onStream(SdkPublisher.adapt(bytePublisher));
transformFuture.join();
latch.await();
assertThat(exceptionThrown).isFalse();
}
public Flowable<UITransferDetails> getTransferDetails(String id, String abbr) {
this.abbr = abbr;
SupportedWalletFacadeType supportedCurrencyType = SupportedWalletFacadeType.valueOf(abbr);
walletFacade = wallets.get(supportedCurrencyType);
if (walletFacade == null) { return Flowable.empty(); }
return walletFacade.getTransferDetails(id)
.map(this::getUiTransferDetails);
}
private Flowable<JNode> prepareSearch(String text) {
if (text == null || text.isEmpty() || options.isEmpty()) {
return Flowable.empty();
}
TextSearchIndex index = cache.getTextIndex();
if (index == null) {
return Flowable.empty();
}
return index.buildSearch(text, options);
}
private Flowable bulk(List<Audit> audits) {
if (audits == null || audits.isEmpty()) {
return Flowable.empty();
}
return Flowable.fromPublisher(reportableCollection.bulkWrite(this.convert(audits)));
}
private Flowable<CameraUpdate> createLocationLockCameraUpdateFlowable(BooleanOrError lockState) {
if (!lockState.isTrue()) {
return Flowable.empty();
}
// The first update pans and zooms the camera to the appropriate zoom level; subsequent ones
// only pan the map.
Flowable<Point> locationUpdates = locationManager.getLocationUpdates();
return locationUpdates
.take(1)
.map(CameraUpdate::panAndZoom)
.concatWith(locationUpdates.map(CameraUpdate::pan).skip(1));
}
@Test
public void testWithEmpty() {
Publisher<String> empty = Flowable.empty();
T instance = converter().fromPublisher(empty);
if (!converter().emitAtMostOneItem()) {
int count = getAll(instance).size();
assertThat(count).isEqualTo(0);
} else {
try {
getOne(instance);
} catch (Exception e) {
assertThat(e).isInstanceOf(NoSuchElementException.class);
}
}
}
@NonNull
@Override
public Flowable<Changes> observeChangesInTables(
@NonNull Set<String> tables,
@NonNull BackpressureStrategy backpressureStrategy
) {
return Flowable.empty();
}
@NonNull
@Override
public Flowable<Changes> observeChanges(@NonNull BackpressureStrategy backpressureStrategy) {
return Flowable.empty();
}
@Test
void empty() {
RxJavaModelRepository repo = new RxJavaModelRepository(new FakeBackend(Flowable.empty()));
repo.findAll().fetch().test().awaitDone(1, TimeUnit.SECONDS).assertNoValues();
}
@Test
public void testEmpties() {
Flowable<Integer> a = Flowable.empty();
Flowable<Integer> b = Flowable.empty();
match(a, b);
}
@Override
public Flowable<Long> apply(Long start, Long request) {
return Flowable.empty();
}
@NonNull
@Override
public Flowable<Changes> observeChangesOfUris(@NonNull Set<Uri> uris, @NonNull BackpressureStrategy backpressureStrategy) {
return Flowable.empty();
}
private Flowable<GraphQLWsResponse> startSubscription(String operationId, Publisher<ExecutionResult> publisher,
WebSocketSession session) {
state.saveOperation(operationId, session, starter(publisher, session));
return Flowable.empty();
}
@Override
public Flowable<CurrencyTransferEntity> getNextTransfers(int offset) {
return Flowable.empty();
}
@Override
public Flowable<CurrencyTransferEntity> getNewTransfers() {
return Flowable.empty();
}
@Override
public Flowable<CurrencyTransferEntity> getNextTransfers(int offset) {
return Flowable.empty();
}
@Override
public Flowable<TransferDetails> getTransferDetails(String id) {
return Flowable.empty();
}