io.reactivex.Observable#defer ( )源码实例Demo

下面列出了io.reactivex.Observable#defer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

Observable<D2Progress> download(final ProgramDataDownloadParams params) {
    Observable<D2Progress> observable = Observable.defer(() -> {
        D2ProgressManager progressManager = new D2ProgressManager(null);
        Set<ProgramOrganisationUnitLastUpdated> programOrganisationUnitSet = new HashSet<>();
        if (userOrganisationUnitLinkStore.count() == 0) {
            return Observable.just(
                    progressManager.increaseProgress(TrackedEntityInstance.class, true));
        } else {
            BooleanWrapper allOkay = new BooleanWrapper(true);

            return Observable.concat(
                    downloadSystemInfo(progressManager),
                    downloadTeis(progressManager, params, allOkay, programOrganisationUnitSet),
                    downloadRelationshipTeis(progressManager),
                    updateResource(progressManager, params, allOkay, programOrganisationUnitSet)
            );
        }
    });

    return rxCallExecutor.wrapObservableTransactionally(observable, true);

}
 
源代码2 项目: RxAndroidBle   文件: RxBleDeviceImpl.java
public Observable<RxBleConnection> establishConnection(final ConnectionSetup options) {
    return Observable.defer(new Callable<ObservableSource<RxBleConnection>>() {
        @Override
        public ObservableSource<RxBleConnection> call() {
            if (isConnected.compareAndSet(false, true)) {
                return connector.prepareConnection(options)
                        .doFinally(new Action() {
                            @Override
                            public void run() {
                                isConnected.set(false);
                            }
                        });
            } else {
                return Observable.error(new BleAlreadyConnectedException(bluetoothDevice.getAddress()));
            }
        }
    });
}
 
源代码3 项目: incubator-taverna-mobile   文件: DBHelper.java
public Observable<Boolean> getFavouriteWorkflow(final String id) {
    return Observable.defer(new Callable<ObservableSource<? extends Boolean>>() {
        @Override
        public ObservableSource<? extends Boolean> call() throws Exception {
            Workflow workflow = SQLite.select()
                    .from(Workflow.class)
                    .where(Workflow_Table.id.eq(id))
                    .querySingle();
            if (workflow != null) {
                return Observable.just(workflow.isFavourite());
            } else {
                return Observable.just(null);
            }
        }
    });
}
 
源代码4 项目: MVPArms   文件: RetrofitServiceProxyHandler.java
@Override
public Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {

    // 根据 https://zhuanlan.zhihu.com/p/40097338 对 Retrofit 进行的优化

    if (method.getReturnType() == Observable.class) {
        // 如果方法返回值是 Observable 的话,则包一层再返回,
        // 只包一层 defer 由外部去控制耗时方法以及网络请求所处线程,
        // 如此对原项目的影响为 0,且更可控。
        return Observable.defer(() -> {
            // 执行真正的 Retrofit 动态代理的方法
            return (Observable) method.invoke(getRetrofitService(), args);
        });
    } else if (method.getReturnType() == Single.class) {
        // 如果方法返回值是 Single 的话,则包一层再返回。
        return Single.defer(() -> {
            // 执行真正的 Retrofit 动态代理的方法
            return (Single) method.invoke(getRetrofitService(), args);
        });
    }

    // 返回值不是 Observable 或 Single 的话不处理。
    return method.invoke(getRetrofitService(), args);
}
 
源代码5 项目: akarnokd-misc   文件: PublishFuncExample.java
@Test
public void test() throws Exception {
    Observable<Integer> source = Observable.range(1, 5)
            .delaySubscription(1, TimeUnit.SECONDS);

        Function<Observable<Integer>, Observable<Integer>> func = o ->
            Observable.merge(o.take(1), o.takeLast(1));

        Observable<Integer> forkAndJoin = Observable.defer(() -> {
            ConnectableObservable<Integer> conn = source
                .doOnSubscribe(s -> System.out.println("Subscribed!"))
                .publish();
            Observable<Integer> result = func.apply(conn);
            conn.connect();
            return result;
        });

        forkAndJoin.subscribe(System.out::println);
        forkAndJoin.subscribe(System.out::println);
        forkAndJoin.subscribe(System.out::println);
    
        Thread.sleep(10000);
}
 
源代码6 项目: incubator-taverna-mobile   文件: DBHelper.java
@Nullable
public Observable<Workflows> syncWorkflows(final Workflows workflows) {
    return Observable.defer(new Callable<ObservableSource<? extends Workflows>>() {
        @Override
        public ObservableSource<? extends Workflows> call() throws Exception {

            for (Workflow workflow : workflows.getWorkflowList()) {
                if (!workflow.exists()) {
                    workflow.setFavourite(false);
                    workflow.save();
                } else {

                    updateWorkflow(workflow).save();
                }
            }

            return Observable.just(workflows);
        }
    });
}
 
源代码7 项目: JD-Test   文件: ImageCompressor.java
/**
 * 使用defer操作符 待订阅后再开始压缩任务
 * @param file
 * @return
 */
public Observable<File> compressToFileAsObservable(final File file) {
    return Observable.defer(new Callable<ObservableSource<? extends File>>() {
        @Override
        public Observable<File> call() {
            return Observable.just(compressToFile(file));
        }
    });
}
 
源代码8 项目: incubator-taverna-mobile   文件: DBHelper.java
public Observable<List<Workflow>> getFavouriteWorkflow() {
    return Observable.defer(new Callable<ObservableSource<? extends List<Workflow>>>() {
        @Override
        public ObservableSource<? extends List<Workflow>> call() throws Exception {
            return Observable.just(SQLite.select()
                    .from(Workflow.class)
                    .where(Workflow_Table.favourite.eq(true))
                    .queryList());
        }
    });
}
 
源代码9 项目: ocraft-s2client   文件: S2ClientVerticle.java
private Observable<WebSocket> connection() {
    return Observable.defer(() -> {
        if (!isSet(httpClient)) return Observable.empty();
        return httpClient
                .websocketStream(config().getInteger(CFG_PORT), config().getString(CFG_IP), SC2API_URI)
                .toObservable();
    });
}
 
源代码10 项目: incubator-taverna-mobile   文件: DBHelper.java
public Observable<Boolean> setFavouriteWorkflow(final String id) {
    return Observable.defer(new Callable<ObservableSource<? extends Boolean>>() {
        @Override
        public ObservableSource<? extends Boolean> call() throws Exception {
            return Observable.just(updateFavouriteWorkflow(id));
        }
    });
}
 
源代码11 项目: RxAndroidBle   文件: RxBleClientImpl.java
@Override
public Observable<RxBleScanResult> scanBleDevices(@Nullable final UUID... filterServiceUUIDs) {
    return Observable.defer(new Callable<ObservableSource<? extends RxBleScanResult>>() {
        @Override
        public ObservableSource<? extends RxBleScanResult> call() {
            scanPreconditionVerifier.verify(true);
            return initializeScan(filterServiceUUIDs);
        }
    });
}
 
源代码12 项目: dhis2-android-sdk   文件: FileResourcePostCall.java
public Observable<D2Progress> uploadFileResources(List<FileResource> filteredFileResources) {
    return Observable.defer(() -> {

        // if there is nothing to send, complete
        if (filteredFileResources.isEmpty()) {
            return Observable.empty();
        } else {
            D2ProgressManager progressManager = new D2ProgressManager(filteredFileResources.size() + 1);

            return systemInfoDownloader.downloadMetadata().andThen(Observable.create(emitter -> {
                emitter.onNext(progressManager.increaseProgress(SystemInfo.class, false));

                for (FileResource fileResource : filteredFileResources) {

                    File file = getRelatedFile(fileResource);

                    ResponseBody responseBody =
                            apiCallExecutor.executeObjectCall(fileResourceService.uploadFile(getFilePart(file)));

                    handleResponse(responseBody.string(), fileResource, file);

                    emitter.onNext(progressManager.increaseProgress(FileResource.class, true));
                }

                emitter.onComplete();
            }));
        }
    });
}
 
源代码13 项目: dhis2-android-sdk   文件: EventPostCall.java
public Observable<D2Progress> uploadEvents(List<Event> filteredEvents) {
    return Observable.defer(() -> {
        List<Event> eventsToPost = queryDataToSync(filteredEvents);

        // if there is nothing to send, return null
        if (eventsToPost.isEmpty()) {
            return Observable.empty();
        } else {
            D2ProgressManager progressManager = new D2ProgressManager(2);
            return systemInfoDownloader.downloadMetadata().andThen(Observable.create(emitter -> {

                emitter.onNext(progressManager.increaseProgress(SystemInfo.class, false));

                EventPayload eventPayload = new EventPayload();
                eventPayload.events = eventsToPost;

                String strategy = versionManager.is2_29() ? "CREATE_AND_UPDATE" : "SYNC";

                EventWebResponse webResponse = apiCallExecutor.executeObjectCallWithAcceptedErrorCodes(
                        eventService.postEvents(eventPayload, strategy), Collections.singletonList(409),
                        EventWebResponse.class);

                handleWebResponse(webResponse);
                emitter.onNext(progressManager.increaseProgress(Event.class, true));
                emitter.onComplete();
            }));
        }
    });
}
 
源代码14 项目: dhis2-android-sdk   文件: MockSmsRepository.java
@Override
public Observable<SmsSendingState> sendSms(String number, List<String> smsParts, int sendingTimeoutSeconds) {
    return Observable.defer(() -> Observable.just(
            new SmsSendingState(0, 1),
            new SmsSendingState(1, 1)
    ));
}
 
public Observable<D2Progress> uploadDataSetCompleteRegistrations(
        List<DataSetCompleteRegistration> dataSetCompleteRegistrations) {
    return Observable.defer(() -> {
        if (dataSetCompleteRegistrations.isEmpty()) {
            return Observable.empty();
        } else {
            List<DataSetCompleteRegistration> toPostDataSetCompleteRegistrations = new ArrayList<>();
            List<DataSetCompleteRegistration> toDeleteDataSetCompleteRegistrations = new ArrayList<>();

            for (DataSetCompleteRegistration dscr: dataSetCompleteRegistrations) {
                if (dscr.deleted()) {
                    toDeleteDataSetCompleteRegistrations.add(dscr);
                } else {
                    toPostDataSetCompleteRegistrations.add(dscr);
                }
            }

            D2ProgressManager progressManager = new D2ProgressManager(2);

            return systemInfoDownloader.downloadMetadata().andThen(Observable.create(emitter -> {
                emitter.onNext(progressManager.increaseProgress(SystemInfo.class, false));

                uploadInternal(progressManager, emitter, toPostDataSetCompleteRegistrations,
                        toDeleteDataSetCompleteRegistrations);
            }));
        }
    });
}
 
@NonNull
@Override
public Observable<User> logIn(@NonNull String username, @NonNull String password, @NonNull String serverUrl) {
    return Observable.defer(() -> d2.userModule().logIn(username, password, serverUrl).toObservable());
}
 
@NonNull
@Override
public Observable<Boolean> isUserLoggedIn() {
    return Observable.defer(() -> d2.userModule().isLogged().toObservable());
}
 
Observable<Observable<byte[]>> setupServerInitiatedCharacteristicRead(
        @NonNull final BluetoothGattCharacteristic characteristic, final NotificationSetupMode setupMode, final boolean isIndication
) {
    return Observable.defer(new Callable<ObservableSource<Observable<byte[]>>>() {
        @Override
        public ObservableSource<Observable<byte[]>> call() {
            synchronized (activeNotificationObservableMap) {
                final CharacteristicNotificationId id
                        = new CharacteristicNotificationId(characteristic.getUuid(), characteristic.getInstanceId());

                final ActiveCharacteristicNotification activeCharacteristicNotification = activeNotificationObservableMap.get(id);

                if (activeCharacteristicNotification != null) {
                    if (activeCharacteristicNotification.isIndication == isIndication) {
                        return activeCharacteristicNotification.notificationObservable;
                    } else {
                        return Observable.error(
                                new BleConflictingNotificationAlreadySetException(characteristic.getUuid(), !isIndication)
                        );
                    }
                }

                final byte[] enableNotificationTypeValue = isIndication ? configEnableIndication : configEnableNotification;
                final PublishSubject<?> notificationCompletedSubject = PublishSubject.create();

                final Observable<Observable<byte[]>> newObservable = setCharacteristicNotification(bluetoothGatt, characteristic, true)
                        .andThen(ObservableUtil.justOnNext(observeOnCharacteristicChangeCallbacks(gattCallback, id)))
                        .compose(setupModeTransformer(descriptorWriter, characteristic, enableNotificationTypeValue, setupMode))
                        .map(new Function<Observable<byte[]>, Observable<byte[]>>() {
                            @Override
                            public Observable<byte[]> apply(Observable<byte[]> observable) {
                                return Observable.amb(Arrays.asList(
                                        notificationCompletedSubject.cast(byte[].class),
                                        observable.takeUntil(notificationCompletedSubject)
                                ));
                            }
                        })
                        .doFinally(new Action() {
                            @SuppressLint("CheckResult")
                            @Override
                            public void run() {
                                notificationCompletedSubject.onComplete();
                                synchronized (activeNotificationObservableMap) {
                                    activeNotificationObservableMap.remove(id);
                                }
                                // teardown the notification — subscription and result are ignored
                                setCharacteristicNotification(bluetoothGatt, characteristic, false)
                                        .compose(teardownModeTransformer(descriptorWriter, characteristic, configDisable, setupMode))
                                        .subscribe(
                                                Functions.EMPTY_ACTION,
                                                Functions.emptyConsumer()
                                        );
                            }
                        })
                        .mergeWith(gattCallback.<Observable<byte[]>>observeDisconnect())
                        .replay(1)
                        .refCount();
                activeNotificationObservableMap.put(id, new ActiveCharacteristicNotification(newObservable, isIndication));
                return newObservable;
            }
        }
    });
}
 
public Observable<D2Progress> uploadTrackedEntityInstances(
        List<TrackedEntityInstance> filteredTrackedEntityInstances) {
    return Observable.defer(() -> {
        List<List<TrackedEntityInstance>> trackedEntityInstancesToPost =
            getPartitionsToSync(filteredTrackedEntityInstances);

        // if size is 0, then no need to do network request
        if (trackedEntityInstancesToPost.isEmpty()) {
            return Observable.empty();
        } else {
            D2ProgressManager progressManager = new D2ProgressManager(2);

            return systemInfoDownloader.downloadMetadata().andThen(Observable.create(emitter -> {
                emitter.onNext(progressManager.increaseProgress(SystemInfo.class, false));

                String strategy;
                if (versionManager.is2_29()) {
                    strategy = "CREATE_AND_UPDATE";
                } else {
                    strategy = "SYNC";
                }

                for (List<TrackedEntityInstance> partition : trackedEntityInstancesToPost) {
                    partition = relationshipDeleteCall.postDeletedRelationships(partition);

                    TrackedEntityInstancePayload trackedEntityInstancePayload =
                            TrackedEntityInstancePayload.create(partition);

                    try {
                        TEIWebResponse webResponse = apiCallExecutor.executeObjectCallWithAcceptedErrorCodes(
                                trackedEntityInstanceService.postTrackedEntityInstances(
                                        trackedEntityInstancePayload, strategy),
                                Collections.singletonList(409), TEIWebResponse.class);
                        teiWebResponseHandler.handleWebResponse(webResponse);
                    } catch (D2Error d2Error) {
                        markPartitionAs(partition, State.TO_UPDATE);
                    }

                }

                emitter.onNext(progressManager.increaseProgress(TrackedEntityInstance.class, true));
                emitter.onComplete();
            }));
        }
    });
}
 
源代码20 项目: akarnokd-misc   文件: ListGrouping.java
static ObservableTransformer<String, Object> group(
        Predicate<? super String> groupCheck) {
    return strings -> 
    Observable.defer(() -> {
        Group gr = new Group();
        return strings
                .flatMap(t -> {
                    if (gr.grouped != null) {
                        if (!t.equals(gr.groupKey)) {
                            List<String> g = gr.grouped;
                            if (groupCheck.test(t)) {
                                gr.groupKey = t;
                                gr.grouped = new ArrayList<>();
                                gr.grouped.add(t);
                                if (g.size() == 1) {
                                    return Observable.just(g.get(0));
                                }
                                return Observable.just(g);
                            }
                            gr.groupKey = null;
                            gr.grouped = null;
                            if (g.size() == 1) {
                                return Observable.just(g.get(0), t);
                            }
                            return Observable.just(g, t);
                        }
                        gr.grouped.add(t);
                        return Observable.empty();
                    }
                    if (groupCheck.test(t)) {
                        gr.grouped = new ArrayList<>();
                        gr.groupKey = t;
                        gr.grouped.add(t);
                        return Observable.empty();
                    }
                    return Observable.just(t);
                })
                .concatWith(Observable.defer(() -> {
                    if (gr.grouped != null) {
                        if (gr.grouped.size() == 1) {
                            return Observable.just(gr.grouped.get(0));
                        }
                        return Observable.just(gr.grouped);
                    }
                    return Observable.empty();
                }));
    });
}