下面列出了怎么用io.reactivex.observers.DisposableCompletableObserver的API类实例代码及写法,或者点击链接到github查看源代码。
@Override
public void onCreate() {
super.onCreate();
configureRxJavaErrorHandler();
Completable.fromAction(this::initLibraries).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new DisposableCompletableObserver() {
@Override
public void onComplete() {
// it worked
}
@Override
public void onError(Throwable e) {
if(BuildConfig.DEBUG) Log.e(TAG, "failed to initialize youtubedl-android", e);
Toast.makeText(getApplicationContext(), "initialization failed: " + e.getLocalizedMessage(), Toast.LENGTH_SHORT).show();
}
});
}
@Override
public CompletableObserver apply(CompletableObserver observer) throws Exception {
return new DisposableCompletableObserver() {
@Override
public void onComplete() {
if (!isDisposed()) {
observer.onError(throwable);
}
}
@Override
public void onError(Throwable ex) {
if (!isDisposed()) {
observer.onError(ex);
}
}
};
}
@Override
public CompletableObserver apply(CompletableObserver observer) throws Exception {
return new DisposableCompletableObserver() {
@Override
public void onComplete() {
if (!isDisposed()) {
observer.onError(throwable);
}
}
@Override
public void onError(Throwable ex) {
if (!isDisposed()) {
observer.onError(ex);
}
}
};
}
@Override
public SingleSource<Wallet> apply(Single<Wallet> upstream) {
Wallet wallet = upstream.blockingGet();
return passwordStore
.setPassword(wallet, password)
.onErrorResumeNext(err -> walletRepository.deleteWallet(wallet.address, password)
.lift(observer -> new DisposableCompletableObserver() {
@Override
public void onComplete() {
observer.onError(err);
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
}))
.toSingle(() -> wallet);
}
@Test
public void whenCompletableConstructed_thenCompletedSuccessfully() {
Completable completed = Completable.complete();
completed.subscribe(new DisposableCompletableObserver() {
@Override
public void onComplete() {
System.out.println("Completed!");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
});
Flowable<String> flowable = Flowable.just("request received", "user logged in");
Completable flowableCompletable = Completable.fromPublisher(flowable);
Completable singleCompletable = Single.just(1)
.ignoreElement();
completed.andThen(flowableCompletable)
.andThen(singleCompletable)
.test()
.assertComplete();
}
private void executeSending() {
startBackgroundSubmissionNotification();
Date startDate = new Date();
disposables.add(smsSender.send().doOnNext(state -> {
if (!isLastSendingStateTheSame(state.getSent(), state.getTotal())) {
reportState(State.SENDING, state.getSent(), state.getTotal());
}
}).ignoreElements().doOnComplete(() -> reportState(State.SENT, 0, 0)).andThen(
d2.smsModule().configCase().getSmsModuleConfig()
).flatMapCompletable(config -> {
if (config.isWaitingForResult()) {
reportState(State.WAITING_RESULT, 0, 0);
return smsSender.checkConfirmationSms(startDate)
.doOnError(throwable -> {
if (throwable instanceof SmsRepository.ResultResponseException) {
SmsRepository.ResultResponseIssue reason =
((SmsRepository.ResultResponseException) throwable).getReason();
if (reason == SmsRepository.ResultResponseIssue.TIMEOUT) {
reportState(State.WAITING_RESULT_TIMEOUT, 0, 0);
}
}
}).doOnComplete(() ->
reportState(State.RESULT_CONFIRMED, 0, 0));
} else {
return Completable.complete();
}
}).subscribeOn(Schedulers.newThread()
).observeOn(Schedulers.newThread()
).subscribeWith(new DisposableCompletableObserver() {
@Override
public void onError(Throwable e) {
reportError(e);
stopBackgroundSubmissionNotification();
stopEventually();
}
@Override
public void onComplete() {
reportState(State.COMPLETED, 0, 0);
stopBackgroundSubmissionNotification();
stopEventually();
}
}));
}
private Disposable updateTimeInternal(String walletAddr, boolean isBackupTime)
{
return Completable.complete()
.subscribeWith(new DisposableCompletableObserver()
{
Realm realm;
@Override
public void onStart()
{
realm = realmManager.getWalletDataRealmInstance();
realm.beginTransaction();
if (isBackupTime)
{
setKeyBackupTime(walletAddr);
}
RealmWalletData realmWallet = realm.where(RealmWalletData.class)
.equalTo("address", walletAddr)
.findFirst();
if (realmWallet != null)
{
//Always update warning time but only update backup time if a backup was made
realmWallet.setLastWarning(System.currentTimeMillis());
}
}
@Override
public void onComplete()
{
if (realm.isInTransaction()) realm.commitTransaction();
realm.close();
}
@Override
public void onError(Throwable e)
{
if (realm != null && !realm.isClosed())
{
realm.close();
}
}
});
}
@Override
public Disposable storeBlockRead(Token token, Wallet wallet)
{
return Completable.complete()
.subscribeWith(new DisposableCompletableObserver()
{
Realm realm;
@Override
public void onStart()
{
realm = realmManager.getRealmInstance(wallet);
RealmToken realmToken = realm.where(RealmToken.class)
.equalTo("address", databaseKey(token))
.equalTo("chainId", token.tokenInfo.chainId)
.findFirst();
if (realmToken != null)
{
TransactionsRealmCache.addRealm();
realm.beginTransaction();
token.setRealmLastBlock(realmToken);
}
else
{
saveToken(wallet, token, new Date());
}
}
@Override
public void onComplete()
{
if (realm.isInTransaction()) realm.commitTransaction();
TransactionsRealmCache.subRealm();
realm.close();
}
@Override
public void onError(Throwable e)
{
if (realm != null && !realm.isClosed())
{
realm.close();
}
}
});
}
@Override
public Disposable storeTokenUrl(int networkId, String address, String imageUrl)
{
return Completable.complete()
.subscribeWith(new DisposableCompletableObserver()
{
Realm realm;
@Override
public void onStart()
{
String instanceKey = address.toLowerCase() + "-" + networkId;
realm = realmManager.getAuxRealmInstance(IMAGES_DB);
RealmAuxData instance = realm.where(RealmAuxData.class)
.equalTo("instanceKey", instanceKey)
.findFirst();
TransactionsRealmCache.addRealm();
realm.beginTransaction();
if (instance == null)
{
instance = realm.createObject(RealmAuxData.class, instanceKey);
}
instance.setResult(imageUrl);
instance.setResultTime(System.currentTimeMillis());
}
@Override
public void onComplete()
{
if (realm.isInTransaction()) realm.commitTransaction();
TransactionsRealmCache.subRealm();
realm.close();
}
@Override
public void onError(Throwable e)
{
if (realm != null && !realm.isClosed())
{
realm.close();
}
}
});
}
@Override
public Disposable setTokenTerminated(Token token, NetworkInfo network, Wallet wallet)
{
return Completable.complete()
.subscribeWith(new DisposableCompletableObserver()
{
Realm realm;
@Override
public void onStart()
{
realm = realmManager.getRealmInstance(wallet);
if (!WalletUtils.isValidAddress(wallet.address)) return;
RealmToken realmToken = realm.where(RealmToken.class)
.equalTo("address", databaseKey(token))
.equalTo("chainId", network.chainId)
.findFirst();
TransactionsRealmCache.addRealm();
realm.beginTransaction();
if (realmToken == null)
{
createBlankToken(realm, token);
}
else
{
token.setIsTerminated(realmToken);
}
}
@Override
public void onComplete()
{
if (realm.isInTransaction()) realm.commitTransaction();
TransactionsRealmCache.subRealm();
realm.close();
}
@Override
public void onError(Throwable e)
{
if (realm != null && !realm.isClosed())
{
realm.close();
}
}
});
}
@Override
public TransactionResult storeAuxData(TransactionResult tResult)
{
Completable.complete()
.subscribeWith(new DisposableCompletableObserver()
{
Realm realm = null;
@Override
public void onStart()
{
if (tokensService.getCurrentAddress() == null || !WalletUtils.isValidAddress(tokensService.getCurrentAddress())) return;
if (tResult.result == null || tResult.resultTime < 0) return;
realm = realmManager.getAuxRealmInstance(tokensService.getCurrentAddress());
ContractAddress cAddr = new ContractAddress(tResult.contractChainId, tResult.contractAddress);
String databaseKey = functionKey(cAddr, tResult.tokenId, tResult.attrId);
if (tResult.result.contains(","))
{
databaseKey = eventKey(tResult);
}
RealmAuxData realmToken = realm.where(RealmAuxData.class)
.equalTo("instanceKey", databaseKey)
.equalTo("chainId", tResult.contractChainId)
.findFirst();
if (realmToken == null)
{
TransactionsRealmCache.addRealm();
realm.beginTransaction();
createAuxData(realm, tResult, databaseKey);
}
else if (tResult.result != null)
{
TransactionsRealmCache.addRealm();
realm.beginTransaction();
realmToken.setResult(tResult.result);
realmToken.setResultTime(tResult.resultTime);
}
}
@Override
public void onComplete()
{
if (realm != null)
{
if (realm.isInTransaction())
{
realm.commitTransaction();
TransactionsRealmCache.subRealm();
}
if (!realm.isClosed()) realm.close();
}
}
@Override
public void onError(Throwable e)
{
if (realm != null && !realm.isClosed())
{
if (realm.isInTransaction()) TransactionsRealmCache.subRealm();
realm.close();
}
}
}).isDisposed();
return tResult;
}
public Disposable storeTokenViewHeight(int chainId, String address, int listViewHeight)
{
return Completable.complete()
.subscribeWith(new DisposableCompletableObserver()
{
Realm realm;
@Override
public void onStart()
{
TransactionsRealmCache.addRealm();
realm = realmManager.getAuxRealmInstance(tokensService.getCurrentAddress());
//determine hash
TokenScriptFile tsf = getTokenScriptFile(chainId, address);
if (tsf == null || !tsf.exists()) return;
String hash = tsf.calcMD5();
String databaseKey = tokenSizeDBKey(chainId, address);
RealmAuxData realmToken = realm.where(RealmAuxData.class)
.equalTo("instanceKey", databaseKey)
.equalTo("chainId", chainId)
.findFirst();
realm.beginTransaction();
if (realmToken == null)
{
realmToken = realm.createObject(RealmAuxData.class, databaseKey);
}
realmToken.setChainId(chainId);
realmToken.setResult(hash);
realmToken.setResultTime(listViewHeight);
}
@Override
public void onComplete()
{
if (realm.isInTransaction()) realm.commitTransaction();
TransactionsRealmCache.subRealm();
realm.close();
}
@Override
public void onError(Throwable e)
{
TransactionsRealmCache.subRealm();
if (realm != null && !realm.isClosed())
{
realm.close();
}
}
});
}