org.junit.jupiter.api.Tag#io.reactivex.schedulers.Schedulers源码实例Demo

下面列出了org.junit.jupiter.api.Tag#io.reactivex.schedulers.Schedulers 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: bitshares_wallet   文件: MarketTickerRepository.java
private void fetchFromNetwork(final LiveData<List<BitsharesMarketTicker>> dbSource) {
    result.addSource(dbSource, newData -> result.setValue(Resource.loading(newData)));
    // 向远程获取数据,并进行存储
    Flowable.just(0)
            .subscribeOn(Schedulers.io())
            .map(integer -> { // 获取asset list
                fetchMarketTicker();
                return 0;
            }).observeOn(AndroidSchedulers.mainThread())
            .subscribe(retCode -> {
                LiveData<List<BitsharesMarketTicker>> listLiveData = bitsharesDao.queryMarketTicker();
                result.removeSource(dbSource);
                result.addSource(listLiveData, newData -> result.setValue(Resource.success(newData)));
            }, throwable -> {
                result.removeSource(dbSource);
                result.addSource(dbSource, newData -> result.setValue(Resource.error(throwable.getMessage(), newData)));
            });
}
 
源代码2 项目: Readhub   文件: WaitPresenter.java
@Override
public void getData() {
    WaitDataBase.getDatabase().waitDao().findAll()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<Wait>>() {
                @Override
                public void accept(List<Wait> waitList) throws Exception {
                    if (isRefresh) {
                        mSwipeRefreshLayout.setRefreshing(false);
                        isRefresh = false;
                        iTopicView.setRefresh(waitList);
                    } else {
                        iTopicView.hideLoading();
                        iTopicView.getTopicEntity(waitList);

                    }

                }
            });
}
 
源代码3 项目: bitshares_wallet   文件: SendFragment.java
private void processGetTransferToId(final String strAccount, final TextView textViewTo) {
    Flowable.just(strAccount)
            .subscribeOn(Schedulers.io())
            .map(accountName -> {
                account_object accountObject = BitsharesWalletWraper.getInstance().get_account_object(accountName);
                if (accountObject == null) {
                    throw new ErrorCodeException(ErrorCode.ERROR_NO_ACCOUNT_OBJECT, "it can't find the account");
                }

                return accountObject;
            }).observeOn(AndroidSchedulers.mainThread())
            .subscribe(accountObject -> {
                if (getActivity() != null && getActivity().isFinishing() == false) {
                    textViewTo.setText("#" + accountObject.id.get_instance());
                }
            }, throwable -> {
                if (throwable instanceof NetworkStatusException || throwable instanceof ErrorCodeException) {
                    if (getActivity() != null && getActivity().isFinishing() == false) {
                        textViewTo.setText("#none");
                    }
                } else {
                    throw Exceptions.propagate(throwable);
                }
            });
}
 
源代码4 项目: RxCommand   文件: RxCommandTest.java
@Test
public void executionObservables_notAllowingConcurrent_onlyExecutionOnce() {
    RxCommand<String> command = RxCommand.create(o -> Observable.just((String) o)
            .subscribeOn(Schedulers.newThread())
            .delay(10, TimeUnit.MILLISECONDS)
    );

    TestObserver<Observable<String>> testObserver = new TestObserver<>();
    command.executionObservables().subscribe(testObserver);

    command.execute("1");
    command.execute("2");
    command.execute("3");

    // wait
    try {
        Thread.sleep(30);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    testObserver.assertValueCount(1);
    testObserver.assertNoErrors();
    testObserver.assertNotComplete();
}
 
@Override
public void removeTopic(ThreadPageInfo info, final OnHttpCallBack<String> callBack) {
    initFieldMap();
    mFieldMap.put("page", String.valueOf(info.getPage()));
    mFieldMap.put("tidarray", String.valueOf(info.getTid()));
    mService.post(mFieldMap)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .compose(getLifecycleProvider().<String>bindUntilEvent(FragmentEvent.DETACH))
            .subscribe(new BaseSubscriber<String>() {
                @Override
                public void onNext(@NonNull String s) {
                    if (s.contains("操作成功")) {
                        callBack.onSuccess("操作成功!");
                    } else {
                        callBack.onError("操作失败!");
                    }
                }
            });
}
 
public Single<String> createTransaction(ETHWallet from, BigInteger gasPrice, BigInteger gasLimit, String data, String password) {

        final Web3j web3j = Web3j.build(new HttpService(networkRepository.getDefaultNetwork().rpcServerUrl));

        return networkRepository.getLastTransactionNonce(web3j, from.address)
                .flatMap(nonce -> getRawTransaction(nonce, gasPrice, gasLimit, BigInteger.ZERO, data))
                .flatMap(rawTx -> signEncodeRawTransaction(rawTx, password, from, networkRepository.getDefaultNetwork().chainId))
                .flatMap(signedMessage -> Single.fromCallable( () -> {
                    EthSendTransaction raw = web3j
                            .ethSendRawTransaction(Numeric.toHexString(signedMessage))
                            .send();
                    if (raw.hasError()) {
                        throw new Exception(raw.getError().getMessage());
                    }
                    return raw.getTransactionHash();
                })).subscribeOn(Schedulers.io());

    }
 
源代码7 项目: xDrip-plus   文件: BlueJayService.java
boolean sendOtaChunk(final UUID uuid, final byte[] bytes) {
    if (I.connection == null || !I.isConnected) return false;
    I.connection.writeCharacteristic(uuid, bytes)
            .observeOn(Schedulers.io())
            .subscribeOn(Schedulers.io())
            .subscribe(

                    characteristicValue -> {
                        if (D)
                            UserError.Log.d(TAG, "Wrote record request request: " + bytesToHex(characteristicValue));
                        busy = false;
                    }, throwable -> {
                        UserError.Log.e(TAG, "Failed to write record request: " + throwable);
                        if (throwable instanceof BleGattCharacteristicException) {
                            final int status = ((BleGattCharacteristicException) throwable).getStatus();
                            UserError.Log.e(TAG, "Got status message: " + Helper.getStatusName(status));
                        } else {
                            if (throwable instanceof BleDisconnectedException) {
                                changeState(CLOSE);
                            }
                            UserError.Log.d(TAG, "Throwable in Record End write: " + throwable);
                        }
                    });
    return true; // only that we didn't fail in setup
}
 
源代码8 项目: RxPaper2   文件: RxPaperBookTest.java
@Test
public void testDestroy() throws Exception {
    RxPaperBook book = RxPaperBook.with("DESTROY", Schedulers.trampoline());
    final String key = "hello";
    final String key2 = "you";
    final ComplexObject value = ComplexObject.random();
    book.write(key, value).subscribe();
    book.write(key2, value).subscribe();
    final TestObserver<Void> destroySubscriber = book.destroy().test();
    destroySubscriber.awaitTerminalEvent();
    destroySubscriber.assertComplete();
    destroySubscriber.assertNoErrors();
    destroySubscriber.assertValueCount(0);
    Assert.assertFalse(book.book.contains(key));
    Assert.assertFalse(book.book.contains(key2));
}
 
源代码9 项目: okuki   文件: DataManager.java
public void loadMore() {
    if (!loading.get()) {
        setIsLoading(true);
        doLoad(pageSize, items.size())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnError(error -> setIsLoading(false))
                .subscribe(
                        list -> {
                            if (!list.isEmpty()) {
                                items.addAll(list);
                            }
                            setIsLoading(false);
                        },
                        Errors.log());
    }
}
 
源代码10 项目: BetterWay   文件: MyService.java
/**
 * 初始化日期
 */
private void initDates() {
    io.reactivex.Observable.create(new ObservableOnSubscribe<List<Schedule>>() {
        @Override
        public void subscribe(ObservableEmitter<List<Schedule>> e) throws Exception {
            MainModel mainModel = MainModel.getInstance();
            List<Schedule> schedules = mainModel.inquiryAllSchedule(getApplication());
            if (schedules.size() != 0) {
                e.onNext(schedules);
            }
        }
    }).observeOn(Schedulers.io())
            .subscribeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<Schedule>>() {
                @Override
                public void accept(List<Schedule> list) throws Exception {
                    for (Schedule schedule : list) {
                        dates.add(schedule.getStartTime());
                        LogUtil.i(TAG, dates.size() + "个");
                    }
                }
            });
}
 
源代码11 项目: Aurora   文件: RxUtils.java
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view,final boolean isLoadMore) {
    return new ObservableTransformer<T, T>() {
        @Override
        public Observable<T> apply(Observable<T> observable) {
            return observable.subscribeOn(Schedulers.io())
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(@NonNull Disposable disposable) throws Exception {
                            if (!isLoadMore){
                                view.showLoading();
                            }
                        }
                    })
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doAfterTerminate(new Action() {
                        @Override
                        public void run() {
                            view.hideLoading();//隐藏进度条
                        }
                    }).compose(RxLifecycleUtils.bindToLifecycle(view));
        }
    };
}
 
源代码12 项目: RxShell   文件: RxProcess.java
@SuppressLint("NewApi")
public Single<Boolean> isAlive() {
    if (RXSDebug.isDebug()) Timber.tag(TAG).v("isAlive()");
    return Single
            .create((SingleEmitter<Boolean> emitter) -> {
                if (ApiWrap.hasOreo()) {
                    emitter.onSuccess(process.isAlive());
                } else {
                    try {
                        process.exitValue();
                        emitter.onSuccess(false);
                    } catch (IllegalThreadStateException e) {
                        emitter.onSuccess(true);
                    }
                }
            })
            .subscribeOn(Schedulers.io());
}
 
源代码13 项目: Protein   文件: ShotLikeListPresenter.java
@Override
public void fetchMoreData() {
    if (TextUtils.isEmpty(getNextPageUrl()))
        return;

    view.showLoadingMore(true);
    repository.listShotLikesForUserOfNextPage(getNextPageUrl())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .compose(((LifecycleProvider<FragmentEvent>) view).bindUntilEvent(FragmentEvent.DESTROY_VIEW))
            .subscribe(listResponse -> {
                view.showLoadingMore(false);
                view.showMoreData(generateEpoxyModels(listResponse.body()));
                setNextPageUrl(new PageLinks(listResponse).getNext());
            }, throwable -> {
                view.showLoadingMore(false);
                view.showSnackbar(throwable.getMessage());
                throwable.printStackTrace();
            });
}
 
源代码14 项目: alpha-wallet-android   文件: FunctionActivity.java
private void getAttrs()
{
    try
    {
        attrs = viewModel.getAssetDefinitionService().getTokenAttrs(token, tokenId, 1);
        //add extra tokenIds if required
        addMultipleTokenIds(attrs);
    }
    catch (Exception e)
    {
        e.printStackTrace();
    }

    // Fetch attributes local to this action and add them to the injected token properties
    Map<String, TSAction> functions = viewModel.getAssetDefinitionService().getTokenFunctionMap(token.tokenInfo.chainId, token.getAddress());
    action = functions.get(actionMethod);
    List<Attribute> localAttrs = (action != null && action.attributes != null) ? new ArrayList<>(action.attributes.values()) : null;

    viewModel.getAssetDefinitionService().resolveAttrs(token, tokenIds, localAttrs)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(this::onAttr, this::onError, () -> displayFunction(attrs.toString()))
                .isDisposed();
}
 
源代码15 项目: Android-Allocine-Api   文件: MainActivity.java
public void execute() {
    final AllocineApi allocineApi = new AllocineApi(okHttpClient);


    allocineApi.movieList(AllocineApi.MovieListFilter.NOW_SHOWING, AllocineApi.Profile.SMALL, AllocineApi.MovieListOrder.TOPRANK, 20, 1)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<Movie>>() {
                @Override
                public void accept(List<Movie> movies) throws Exception {
                    textView.setText(movies.toString());
                    Log.d("MainActivity", movies.toString());
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.e("tag", throwable.getLocalizedMessage(), throwable);
                }
            });
}
 
源代码16 项目: mimi-reader   文件: FourChanConnector.java
@Override
public Single<List<ChanArchive>> fetchArchives() {
    return api.fetchArchives("https://raw.githubusercontent.com/ccd0/4chan-x/master/src/Archive/archives.json")
            .observeOn(Schedulers.io())
            .toObservable()
            .flatMapIterable((Function<List<FourChanArchive>, Iterable<FourChanArchive>>) fourChanArchives -> fourChanArchives)
            .flatMap((Function<FourChanArchive, ObservableSource<ChanArchive>>) fourChanArchive -> {
                final ChanArchive chanArchive = new ChanArchive.Builder()
                        .boards(fourChanArchive.getBoards())
                        .files(fourChanArchive.getFiles())
                        .domain(fourChanArchive.getDomain())
                        .http(fourChanArchive.getHttp())
                        .https(fourChanArchive.getHttps())
                        .software(fourChanArchive.getSoftware())
                        .uid(fourChanArchive.getUid())
                        .name(fourChanArchive.getName())
                        .reports(fourChanArchive.getReports())
                        .build();

                return Observable.just(chanArchive);
            })
            .toList();
}
 
@Override
protected Single createInstanceFailingAsynchronously(RuntimeException e) {
    return Single.just("X")
            .delay(DELAY, TimeUnit.MILLISECONDS)
            .map(x -> {
                throw e;
            })
            .observeOn(Schedulers.io());
}
 
源代码18 项目: edslite   文件: ContainersDocumentProviderBase.java
@Override
public String copyDocument(String sourceDocumentId, String targetParentDocumentId) throws FileNotFoundException
{
    try
    {
        return Single.<String>create(em -> {
            Path srcPath = getLocationsManager().
                    getLocation(getLocationUriFromDocumentId(sourceDocumentId)).
                    getCurrentPath();
            Location dstLocation = getLocationsManager().
                    getLocation(getLocationUriFromDocumentId(targetParentDocumentId));
            Directory dest = dstLocation.
                    getCurrentPath().
                    getDirectory();
            Location res = dstLocation.copy();
            if(srcPath.isDirectory())
                res.setCurrentPath(dest.createDirectory(srcPath.getDirectory().getName()).getPath());
            else if(srcPath.isFile())
                res.setCurrentPath(Util.copyFile(srcPath.getFile(), dest).getPath());
            Context context = getContext();
            if(context!=null)
                context.getContentResolver().notifyChange(getUriFromLocation(res), null);

            em.onSuccess(getDocumentIdFromLocation(res));
        }).
                subscribeOn(Schedulers.io()).
                blockingGet();
    }
    catch (Exception e)
    {
        Logger.log(e);
        throw new IllegalArgumentException("Copy failed", e);
    }
}
 
源代码19 项目: Toutiao   文件: MediaHomeActivity.java
private void initData() {
    Intent intent = getIntent();
    this.mediaId = intent.getStringExtra(ARG_MEDIAID);
    if (TextUtils.isEmpty(mediaId)) {
        onError();
        return;
    }

    RetrofitFactory.getRetrofit().create(IMobileMediaApi.class)
            .getMediaProfile(mediaId)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .as(this.bindAutoDispose())
            .subscribe(bean -> {
                String name = bean.getData().getName();
                initToolBar(toolbar, true, name);
                List<MediaProfileBean.DataBean.TopTabBean> topTab = bean.getData().getTop_tab();
                if (null != topTab && topTab.size() < 0) {
                    onError();
                    return;
                }
                initTabLayout(bean.getData());
            }, throwable -> {
                onError();
                ErrorAction.print(throwable);
            });
}
 
源代码20 项目: aptoide-client-v8   文件: RoomEventPersistence.java
@Override public rx.Observable<List<Event>> getAll() {
  return RxJavaInterop.toV1Observable(eventDAO.getAll()
      .subscribeOn(Schedulers.io())
      .flatMap(roomEvents -> {
        try {
          return Observable.just(mapper.map(roomEvents));
        } catch (IOException e) {
          return Observable.error(e);
        }
      }), BackpressureStrategy.BUFFER);
}
 
private void testNetworks(String method)
{
    foundNetwork = false;
    networkCount = ethereumNetworkRepository.getAvailableNetworkList().length;
    //test all the networks

    disposable = Observable.fromCallable(this::getNetworkIds)
            .flatMapIterable(networkId -> networkId)
            .filter(networkId -> !foundNetwork)
            .flatMap(networkId -> fetchTokensInteract.getContractResponse(importOrder.contractAddress, networkId, method))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::testNetworkResult, this::onTestError);
}
 
源代码22 项目: Yuan-SxMusic   文件: SearchContentPresenter.java
@Override
public void searchMore(String seek, int offset) {
    addRxSubscribe(
            mModel.search(seek, offset)
                    .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
                    .subscribeWith(new BaseObserver<SearchSong>(mView, false, true) {
                        @Override
                        public void onNext(SearchSong searchSong) {
                            super.onNext(searchSong);
                            if (searchSong.getCode() == 0) {
                                ArrayList<SearchSong.DataBean.SongBean.ListBean> songListBeans =
                                        (ArrayList<SearchSong.DataBean.SongBean.ListBean>) searchSong.getData().getSong().getList();
                                if (songListBeans.size() == 0) {
                                    mView.searchMoreError();
                                } else {
                                    mView.searchMoreSuccess(songListBeans);
                                }
                            } else {
                                mView.searchMoreError();
                            }
                        }

                        @Override
                        public void onError(Throwable e){
                            super.onError(e);
                            mView.showSearcherMoreNetworkError();
                        }
                    }));
}
 
源代码23 项目: xDrip-plus   文件: ScanMeister.java
public synchronized void scan() {
    extendWakeLock((scanSeconds + 1) * Constants.SECOND_IN_MS);
    stopScan("Scan start");
    UserError.Log.d(TAG, "startScan called: hunting: " + address + " " + name);

    ScanFilter filter = this.customFilter;
    if (filter == null) {
        final ScanFilter.Builder builder = new ScanFilter.Builder();
        if (address != null) {
            try {
                builder.setDeviceAddress(address);
            } catch (IllegalArgumentException e) {
                UserError.Log.wtf(TAG, "Invalid bluetooth address: " + address);
            }
        }
        // TODO scanning by name doesn't build a filter
        filter = builder.build();
    } else {
        UserError.Log.d(TAG,"Overriding with custom filter");
    }

    scanSubscription = new Subscription(rxBleClient.scanBleDevices(
            new ScanSettings.Builder()
                    .setCallbackType(ScanSettings.CALLBACK_TYPE_ALL_MATCHES)
                    .setScanMode(ScanSettings.SCAN_MODE_LOW_LATENCY)
                    .build(), legacyNoFilterWorkaround ? ScanFilter.empty() : filter)
            .timeout(scanSeconds, TimeUnit.SECONDS) // is unreliable
            .subscribeOn(Schedulers.io())
            .subscribe(this::onScanResult, this::onScanFailure));

    Inevitable.task(STOP_SCAN_TASK_ID, scanSeconds * Constants.SECOND_IN_MS, this::stopScanWithTimeoutCallback);
}
 
源代码24 项目: RxPaper2   文件: RxPaperBookTest.java
@Test
public void testWrite() throws Exception {
    RxPaperBook book = RxPaperBook.with("WRITE", Schedulers.trampoline());
    final String key = "hello";
    final Completable write = book.write(key, ComplexObject.random());
    Assert.assertFalse(book.book.contains(key));
    final TestObserver<Void> testSubscriber = write.test();
    testSubscriber.awaitTerminalEvent();
    testSubscriber.assertComplete();
    testSubscriber.assertNoErrors();
    Assert.assertTrue(book.book.contains(key));
}
 
源代码25 项目: Reactive-Android-Programming   文件: Sandbox.java
private static void demo4() throws InterruptedException {
    Observable.just("One", "Two", "Three")
            .subscribeOn(Schedulers.single())
            .doOnNext(i -> log("doOnNext", i))
            .subscribeOn(Schedulers.newThread())
            .doOnNext(i -> log("doOnNext", i))
            .subscribeOn(Schedulers.io())
            .subscribe(i -> log("subscribe", i));

    WAIT_LATCH.await();
}
 
源代码26 项目: MBEStyle   文件: IconPresenter.java
public void calcIconTotal() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> flowableEmitter) throws Exception {
            XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable);
            int total = 0;

            while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) {
                if (xml.getEventType() == XmlPullParser.START_TAG) {
                    if (xml.getName().startsWith("item")) {
                        total++;
                    }
                }
                xml.next();
            }

            flowableEmitter.onNext(total);
        }
    }, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    mView.setIconTotal(integer);
                }
            });
}
 
源代码27 项目: ShareLoginPayUtil   文件: QQLoginInstance.java
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
    mSubscribe = Flowable.create(new FlowableOnSubscribe<QQUser>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<QQUser> qqUserEmitter) {
            OkHttpClient client = new OkHttpClient();
            Request request = new Request.Builder().url(buildUserInfoUrl(token, URL)).build();

            try {
                Response response = client.newCall(request).execute();
                JSONObject jsonObject = new JSONObject(response.body().string());
                QQUser user = QQUser.parse(token.getOpenid(), jsonObject);
                qqUserEmitter.onNext(user);
                qqUserEmitter.onComplete();
            } catch (IOException | JSONException e) {
                ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
                qqUserEmitter.onError(e);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<QQUser>() {
                @Override
                public void accept(@NonNull QQUser qqUser) {
                    mLoginListener.loginSuccess(
                            new LoginResultData(LoginPlatform.QQ, token, qqUser));
                    LoginUtil.recycle();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(@NonNull Throwable throwable) {
                    mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
                    LoginUtil.recycle();
                }
            });
}
 
源代码28 项目: alpha-wallet-android   文件: AmountEntryItem.java
public void startEthereumTicker(Token token)
{
    disposable = Observable.interval(0, CHECK_ETHPRICE_INTERVAL, TimeUnit.SECONDS)
            .doOnNext(l -> tokenRepository
                    .getTokenTicker(token)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(this::onTicker, this::onError)).subscribe();
}
 
源代码29 项目: Upchain-wallet   文件: ConfirmationViewModel.java
public void createTokenTransfer(String password, String to, String contractAddress,
                                BigInteger amount, BigInteger gasPrice, BigInteger gasLimit) {
    progress.postValue(true);
    createTransactionInteract.createERC20Transfer(defaultWallet.getValue(), to, contractAddress, amount, gasPrice, gasLimit, password)
            .subscribeOn(Schedulers.io())
            .subscribe(this::onCreateTransaction, this::onError);
}
 
源代码30 项目: NGA-CLIENT-VER-OPEN-SOURCE   文件: BaseRxTask.java
public void get(String url, OnHttpCallBack<String> callBack) {
    Observable<String> observable = mService.get(url)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());

    if (mLifecycleProvider != null) {
        observable = observable.compose(mLifecycleProvider.bindUntilEvent(FragmentEvent.DETACH));
    }

    observable.subscribe(new BaseSubscriber<String>() {

        @Override
        public void onError(@NonNull Throwable throwable) {
            mSubscription = null;
            callBack.onError(throwable.getMessage());
        }

        @Override
        public void onComplete() {
            mSubscription = null;
        }

        @Override
        public void onNext(@NonNull String s) {
            mSubscription = null;
            callBack.onSuccess(s);
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            super.onSubscribe(subscription);
            mSubscription = subscription;
        }
    });
}