下面列出了io.reactivex.Observable#defer ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
Observable<D2Progress> download(final ProgramDataDownloadParams params) {
Observable<D2Progress> observable = Observable.defer(() -> {
D2ProgressManager progressManager = new D2ProgressManager(null);
Set<ProgramOrganisationUnitLastUpdated> programOrganisationUnitSet = new HashSet<>();
if (userOrganisationUnitLinkStore.count() == 0) {
return Observable.just(
progressManager.increaseProgress(TrackedEntityInstance.class, true));
} else {
BooleanWrapper allOkay = new BooleanWrapper(true);
return Observable.concat(
downloadSystemInfo(progressManager),
downloadTeis(progressManager, params, allOkay, programOrganisationUnitSet),
downloadRelationshipTeis(progressManager),
updateResource(progressManager, params, allOkay, programOrganisationUnitSet)
);
}
});
return rxCallExecutor.wrapObservableTransactionally(observable, true);
}
public Observable<RxBleConnection> establishConnection(final ConnectionSetup options) {
return Observable.defer(new Callable<ObservableSource<RxBleConnection>>() {
@Override
public ObservableSource<RxBleConnection> call() {
if (isConnected.compareAndSet(false, true)) {
return connector.prepareConnection(options)
.doFinally(new Action() {
@Override
public void run() {
isConnected.set(false);
}
});
} else {
return Observable.error(new BleAlreadyConnectedException(bluetoothDevice.getAddress()));
}
}
});
}
public Observable<Boolean> getFavouriteWorkflow(final String id) {
return Observable.defer(new Callable<ObservableSource<? extends Boolean>>() {
@Override
public ObservableSource<? extends Boolean> call() throws Exception {
Workflow workflow = SQLite.select()
.from(Workflow.class)
.where(Workflow_Table.id.eq(id))
.querySingle();
if (workflow != null) {
return Observable.just(workflow.isFavourite());
} else {
return Observable.just(null);
}
}
});
}
@Override
public Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {
// 根据 https://zhuanlan.zhihu.com/p/40097338 对 Retrofit 进行的优化
if (method.getReturnType() == Observable.class) {
// 如果方法返回值是 Observable 的话,则包一层再返回,
// 只包一层 defer 由外部去控制耗时方法以及网络请求所处线程,
// 如此对原项目的影响为 0,且更可控。
return Observable.defer(() -> {
// 执行真正的 Retrofit 动态代理的方法
return (Observable) method.invoke(getRetrofitService(), args);
});
} else if (method.getReturnType() == Single.class) {
// 如果方法返回值是 Single 的话,则包一层再返回。
return Single.defer(() -> {
// 执行真正的 Retrofit 动态代理的方法
return (Single) method.invoke(getRetrofitService(), args);
});
}
// 返回值不是 Observable 或 Single 的话不处理。
return method.invoke(getRetrofitService(), args);
}
@Test
public void test() throws Exception {
Observable<Integer> source = Observable.range(1, 5)
.delaySubscription(1, TimeUnit.SECONDS);
Function<Observable<Integer>, Observable<Integer>> func = o ->
Observable.merge(o.take(1), o.takeLast(1));
Observable<Integer> forkAndJoin = Observable.defer(() -> {
ConnectableObservable<Integer> conn = source
.doOnSubscribe(s -> System.out.println("Subscribed!"))
.publish();
Observable<Integer> result = func.apply(conn);
conn.connect();
return result;
});
forkAndJoin.subscribe(System.out::println);
forkAndJoin.subscribe(System.out::println);
forkAndJoin.subscribe(System.out::println);
Thread.sleep(10000);
}
@Nullable
public Observable<Workflows> syncWorkflows(final Workflows workflows) {
return Observable.defer(new Callable<ObservableSource<? extends Workflows>>() {
@Override
public ObservableSource<? extends Workflows> call() throws Exception {
for (Workflow workflow : workflows.getWorkflowList()) {
if (!workflow.exists()) {
workflow.setFavourite(false);
workflow.save();
} else {
updateWorkflow(workflow).save();
}
}
return Observable.just(workflows);
}
});
}
/**
* 使用defer操作符 待订阅后再开始压缩任务
* @param file
* @return
*/
public Observable<File> compressToFileAsObservable(final File file) {
return Observable.defer(new Callable<ObservableSource<? extends File>>() {
@Override
public Observable<File> call() {
return Observable.just(compressToFile(file));
}
});
}
public Observable<List<Workflow>> getFavouriteWorkflow() {
return Observable.defer(new Callable<ObservableSource<? extends List<Workflow>>>() {
@Override
public ObservableSource<? extends List<Workflow>> call() throws Exception {
return Observable.just(SQLite.select()
.from(Workflow.class)
.where(Workflow_Table.favourite.eq(true))
.queryList());
}
});
}
private Observable<WebSocket> connection() {
return Observable.defer(() -> {
if (!isSet(httpClient)) return Observable.empty();
return httpClient
.websocketStream(config().getInteger(CFG_PORT), config().getString(CFG_IP), SC2API_URI)
.toObservable();
});
}
public Observable<Boolean> setFavouriteWorkflow(final String id) {
return Observable.defer(new Callable<ObservableSource<? extends Boolean>>() {
@Override
public ObservableSource<? extends Boolean> call() throws Exception {
return Observable.just(updateFavouriteWorkflow(id));
}
});
}
@Override
public Observable<RxBleScanResult> scanBleDevices(@Nullable final UUID... filterServiceUUIDs) {
return Observable.defer(new Callable<ObservableSource<? extends RxBleScanResult>>() {
@Override
public ObservableSource<? extends RxBleScanResult> call() {
scanPreconditionVerifier.verify(true);
return initializeScan(filterServiceUUIDs);
}
});
}
public Observable<D2Progress> uploadFileResources(List<FileResource> filteredFileResources) {
return Observable.defer(() -> {
// if there is nothing to send, complete
if (filteredFileResources.isEmpty()) {
return Observable.empty();
} else {
D2ProgressManager progressManager = new D2ProgressManager(filteredFileResources.size() + 1);
return systemInfoDownloader.downloadMetadata().andThen(Observable.create(emitter -> {
emitter.onNext(progressManager.increaseProgress(SystemInfo.class, false));
for (FileResource fileResource : filteredFileResources) {
File file = getRelatedFile(fileResource);
ResponseBody responseBody =
apiCallExecutor.executeObjectCall(fileResourceService.uploadFile(getFilePart(file)));
handleResponse(responseBody.string(), fileResource, file);
emitter.onNext(progressManager.increaseProgress(FileResource.class, true));
}
emitter.onComplete();
}));
}
});
}
public Observable<D2Progress> uploadEvents(List<Event> filteredEvents) {
return Observable.defer(() -> {
List<Event> eventsToPost = queryDataToSync(filteredEvents);
// if there is nothing to send, return null
if (eventsToPost.isEmpty()) {
return Observable.empty();
} else {
D2ProgressManager progressManager = new D2ProgressManager(2);
return systemInfoDownloader.downloadMetadata().andThen(Observable.create(emitter -> {
emitter.onNext(progressManager.increaseProgress(SystemInfo.class, false));
EventPayload eventPayload = new EventPayload();
eventPayload.events = eventsToPost;
String strategy = versionManager.is2_29() ? "CREATE_AND_UPDATE" : "SYNC";
EventWebResponse webResponse = apiCallExecutor.executeObjectCallWithAcceptedErrorCodes(
eventService.postEvents(eventPayload, strategy), Collections.singletonList(409),
EventWebResponse.class);
handleWebResponse(webResponse);
emitter.onNext(progressManager.increaseProgress(Event.class, true));
emitter.onComplete();
}));
}
});
}
@Override
public Observable<SmsSendingState> sendSms(String number, List<String> smsParts, int sendingTimeoutSeconds) {
return Observable.defer(() -> Observable.just(
new SmsSendingState(0, 1),
new SmsSendingState(1, 1)
));
}
public Observable<D2Progress> uploadDataSetCompleteRegistrations(
List<DataSetCompleteRegistration> dataSetCompleteRegistrations) {
return Observable.defer(() -> {
if (dataSetCompleteRegistrations.isEmpty()) {
return Observable.empty();
} else {
List<DataSetCompleteRegistration> toPostDataSetCompleteRegistrations = new ArrayList<>();
List<DataSetCompleteRegistration> toDeleteDataSetCompleteRegistrations = new ArrayList<>();
for (DataSetCompleteRegistration dscr: dataSetCompleteRegistrations) {
if (dscr.deleted()) {
toDeleteDataSetCompleteRegistrations.add(dscr);
} else {
toPostDataSetCompleteRegistrations.add(dscr);
}
}
D2ProgressManager progressManager = new D2ProgressManager(2);
return systemInfoDownloader.downloadMetadata().andThen(Observable.create(emitter -> {
emitter.onNext(progressManager.increaseProgress(SystemInfo.class, false));
uploadInternal(progressManager, emitter, toPostDataSetCompleteRegistrations,
toDeleteDataSetCompleteRegistrations);
}));
}
});
}
@NonNull
@Override
public Observable<User> logIn(@NonNull String username, @NonNull String password, @NonNull String serverUrl) {
return Observable.defer(() -> d2.userModule().logIn(username, password, serverUrl).toObservable());
}
@NonNull
@Override
public Observable<Boolean> isUserLoggedIn() {
return Observable.defer(() -> d2.userModule().isLogged().toObservable());
}
Observable<Observable<byte[]>> setupServerInitiatedCharacteristicRead(
@NonNull final BluetoothGattCharacteristic characteristic, final NotificationSetupMode setupMode, final boolean isIndication
) {
return Observable.defer(new Callable<ObservableSource<Observable<byte[]>>>() {
@Override
public ObservableSource<Observable<byte[]>> call() {
synchronized (activeNotificationObservableMap) {
final CharacteristicNotificationId id
= new CharacteristicNotificationId(characteristic.getUuid(), characteristic.getInstanceId());
final ActiveCharacteristicNotification activeCharacteristicNotification = activeNotificationObservableMap.get(id);
if (activeCharacteristicNotification != null) {
if (activeCharacteristicNotification.isIndication == isIndication) {
return activeCharacteristicNotification.notificationObservable;
} else {
return Observable.error(
new BleConflictingNotificationAlreadySetException(characteristic.getUuid(), !isIndication)
);
}
}
final byte[] enableNotificationTypeValue = isIndication ? configEnableIndication : configEnableNotification;
final PublishSubject<?> notificationCompletedSubject = PublishSubject.create();
final Observable<Observable<byte[]>> newObservable = setCharacteristicNotification(bluetoothGatt, characteristic, true)
.andThen(ObservableUtil.justOnNext(observeOnCharacteristicChangeCallbacks(gattCallback, id)))
.compose(setupModeTransformer(descriptorWriter, characteristic, enableNotificationTypeValue, setupMode))
.map(new Function<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> apply(Observable<byte[]> observable) {
return Observable.amb(Arrays.asList(
notificationCompletedSubject.cast(byte[].class),
observable.takeUntil(notificationCompletedSubject)
));
}
})
.doFinally(new Action() {
@SuppressLint("CheckResult")
@Override
public void run() {
notificationCompletedSubject.onComplete();
synchronized (activeNotificationObservableMap) {
activeNotificationObservableMap.remove(id);
}
// teardown the notification — subscription and result are ignored
setCharacteristicNotification(bluetoothGatt, characteristic, false)
.compose(teardownModeTransformer(descriptorWriter, characteristic, configDisable, setupMode))
.subscribe(
Functions.EMPTY_ACTION,
Functions.emptyConsumer()
);
}
})
.mergeWith(gattCallback.<Observable<byte[]>>observeDisconnect())
.replay(1)
.refCount();
activeNotificationObservableMap.put(id, new ActiveCharacteristicNotification(newObservable, isIndication));
return newObservable;
}
}
});
}
public Observable<D2Progress> uploadTrackedEntityInstances(
List<TrackedEntityInstance> filteredTrackedEntityInstances) {
return Observable.defer(() -> {
List<List<TrackedEntityInstance>> trackedEntityInstancesToPost =
getPartitionsToSync(filteredTrackedEntityInstances);
// if size is 0, then no need to do network request
if (trackedEntityInstancesToPost.isEmpty()) {
return Observable.empty();
} else {
D2ProgressManager progressManager = new D2ProgressManager(2);
return systemInfoDownloader.downloadMetadata().andThen(Observable.create(emitter -> {
emitter.onNext(progressManager.increaseProgress(SystemInfo.class, false));
String strategy;
if (versionManager.is2_29()) {
strategy = "CREATE_AND_UPDATE";
} else {
strategy = "SYNC";
}
for (List<TrackedEntityInstance> partition : trackedEntityInstancesToPost) {
partition = relationshipDeleteCall.postDeletedRelationships(partition);
TrackedEntityInstancePayload trackedEntityInstancePayload =
TrackedEntityInstancePayload.create(partition);
try {
TEIWebResponse webResponse = apiCallExecutor.executeObjectCallWithAcceptedErrorCodes(
trackedEntityInstanceService.postTrackedEntityInstances(
trackedEntityInstancePayload, strategy),
Collections.singletonList(409), TEIWebResponse.class);
teiWebResponseHandler.handleWebResponse(webResponse);
} catch (D2Error d2Error) {
markPartitionAs(partition, State.TO_UPDATE);
}
}
emitter.onNext(progressManager.increaseProgress(TrackedEntityInstance.class, true));
emitter.onComplete();
}));
}
});
}
static ObservableTransformer<String, Object> group(
Predicate<? super String> groupCheck) {
return strings ->
Observable.defer(() -> {
Group gr = new Group();
return strings
.flatMap(t -> {
if (gr.grouped != null) {
if (!t.equals(gr.groupKey)) {
List<String> g = gr.grouped;
if (groupCheck.test(t)) {
gr.groupKey = t;
gr.grouped = new ArrayList<>();
gr.grouped.add(t);
if (g.size() == 1) {
return Observable.just(g.get(0));
}
return Observable.just(g);
}
gr.groupKey = null;
gr.grouped = null;
if (g.size() == 1) {
return Observable.just(g.get(0), t);
}
return Observable.just(g, t);
}
gr.grouped.add(t);
return Observable.empty();
}
if (groupCheck.test(t)) {
gr.grouped = new ArrayList<>();
gr.groupKey = t;
gr.grouped.add(t);
return Observable.empty();
}
return Observable.just(t);
})
.concatWith(Observable.defer(() -> {
if (gr.grouped != null) {
if (gr.grouped.size() == 1) {
return Observable.just(gr.grouped.get(0));
}
return Observable.just(gr.grouped);
}
return Observable.empty();
}));
});
}