下面列出了org.junit.jupiter.api.Tag#io.reactivex.schedulers.Schedulers 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
private void fetchFromNetwork(final LiveData<List<BitsharesMarketTicker>> dbSource) {
result.addSource(dbSource, newData -> result.setValue(Resource.loading(newData)));
// 向远程获取数据,并进行存储
Flowable.just(0)
.subscribeOn(Schedulers.io())
.map(integer -> { // 获取asset list
fetchMarketTicker();
return 0;
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(retCode -> {
LiveData<List<BitsharesMarketTicker>> listLiveData = bitsharesDao.queryMarketTicker();
result.removeSource(dbSource);
result.addSource(listLiveData, newData -> result.setValue(Resource.success(newData)));
}, throwable -> {
result.removeSource(dbSource);
result.addSource(dbSource, newData -> result.setValue(Resource.error(throwable.getMessage(), newData)));
});
}
@Override
public void getData() {
WaitDataBase.getDatabase().waitDao().findAll()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<Wait>>() {
@Override
public void accept(List<Wait> waitList) throws Exception {
if (isRefresh) {
mSwipeRefreshLayout.setRefreshing(false);
isRefresh = false;
iTopicView.setRefresh(waitList);
} else {
iTopicView.hideLoading();
iTopicView.getTopicEntity(waitList);
}
}
});
}
private void processGetTransferToId(final String strAccount, final TextView textViewTo) {
Flowable.just(strAccount)
.subscribeOn(Schedulers.io())
.map(accountName -> {
account_object accountObject = BitsharesWalletWraper.getInstance().get_account_object(accountName);
if (accountObject == null) {
throw new ErrorCodeException(ErrorCode.ERROR_NO_ACCOUNT_OBJECT, "it can't find the account");
}
return accountObject;
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(accountObject -> {
if (getActivity() != null && getActivity().isFinishing() == false) {
textViewTo.setText("#" + accountObject.id.get_instance());
}
}, throwable -> {
if (throwable instanceof NetworkStatusException || throwable instanceof ErrorCodeException) {
if (getActivity() != null && getActivity().isFinishing() == false) {
textViewTo.setText("#none");
}
} else {
throw Exceptions.propagate(throwable);
}
});
}
@Test
public void executionObservables_notAllowingConcurrent_onlyExecutionOnce() {
RxCommand<String> command = RxCommand.create(o -> Observable.just((String) o)
.subscribeOn(Schedulers.newThread())
.delay(10, TimeUnit.MILLISECONDS)
);
TestObserver<Observable<String>> testObserver = new TestObserver<>();
command.executionObservables().subscribe(testObserver);
command.execute("1");
command.execute("2");
command.execute("3");
// wait
try {
Thread.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
testObserver.assertValueCount(1);
testObserver.assertNoErrors();
testObserver.assertNotComplete();
}
@Override
public void removeTopic(ThreadPageInfo info, final OnHttpCallBack<String> callBack) {
initFieldMap();
mFieldMap.put("page", String.valueOf(info.getPage()));
mFieldMap.put("tidarray", String.valueOf(info.getTid()));
mService.post(mFieldMap)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.compose(getLifecycleProvider().<String>bindUntilEvent(FragmentEvent.DETACH))
.subscribe(new BaseSubscriber<String>() {
@Override
public void onNext(@NonNull String s) {
if (s.contains("操作成功")) {
callBack.onSuccess("操作成功!");
} else {
callBack.onError("操作失败!");
}
}
});
}
public Single<String> createTransaction(ETHWallet from, BigInteger gasPrice, BigInteger gasLimit, String data, String password) {
final Web3j web3j = Web3j.build(new HttpService(networkRepository.getDefaultNetwork().rpcServerUrl));
return networkRepository.getLastTransactionNonce(web3j, from.address)
.flatMap(nonce -> getRawTransaction(nonce, gasPrice, gasLimit, BigInteger.ZERO, data))
.flatMap(rawTx -> signEncodeRawTransaction(rawTx, password, from, networkRepository.getDefaultNetwork().chainId))
.flatMap(signedMessage -> Single.fromCallable( () -> {
EthSendTransaction raw = web3j
.ethSendRawTransaction(Numeric.toHexString(signedMessage))
.send();
if (raw.hasError()) {
throw new Exception(raw.getError().getMessage());
}
return raw.getTransactionHash();
})).subscribeOn(Schedulers.io());
}
boolean sendOtaChunk(final UUID uuid, final byte[] bytes) {
if (I.connection == null || !I.isConnected) return false;
I.connection.writeCharacteristic(uuid, bytes)
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.io())
.subscribe(
characteristicValue -> {
if (D)
UserError.Log.d(TAG, "Wrote record request request: " + bytesToHex(characteristicValue));
busy = false;
}, throwable -> {
UserError.Log.e(TAG, "Failed to write record request: " + throwable);
if (throwable instanceof BleGattCharacteristicException) {
final int status = ((BleGattCharacteristicException) throwable).getStatus();
UserError.Log.e(TAG, "Got status message: " + Helper.getStatusName(status));
} else {
if (throwable instanceof BleDisconnectedException) {
changeState(CLOSE);
}
UserError.Log.d(TAG, "Throwable in Record End write: " + throwable);
}
});
return true; // only that we didn't fail in setup
}
@Test
public void testDestroy() throws Exception {
RxPaperBook book = RxPaperBook.with("DESTROY", Schedulers.trampoline());
final String key = "hello";
final String key2 = "you";
final ComplexObject value = ComplexObject.random();
book.write(key, value).subscribe();
book.write(key2, value).subscribe();
final TestObserver<Void> destroySubscriber = book.destroy().test();
destroySubscriber.awaitTerminalEvent();
destroySubscriber.assertComplete();
destroySubscriber.assertNoErrors();
destroySubscriber.assertValueCount(0);
Assert.assertFalse(book.book.contains(key));
Assert.assertFalse(book.book.contains(key2));
}
public void loadMore() {
if (!loading.get()) {
setIsLoading(true);
doLoad(pageSize, items.size())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnError(error -> setIsLoading(false))
.subscribe(
list -> {
if (!list.isEmpty()) {
items.addAll(list);
}
setIsLoading(false);
},
Errors.log());
}
}
/**
* 初始化日期
*/
private void initDates() {
io.reactivex.Observable.create(new ObservableOnSubscribe<List<Schedule>>() {
@Override
public void subscribe(ObservableEmitter<List<Schedule>> e) throws Exception {
MainModel mainModel = MainModel.getInstance();
List<Schedule> schedules = mainModel.inquiryAllSchedule(getApplication());
if (schedules.size() != 0) {
e.onNext(schedules);
}
}
}).observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<Schedule>>() {
@Override
public void accept(List<Schedule> list) throws Exception {
for (Schedule schedule : list) {
dates.add(schedule.getStartTime());
LogUtil.i(TAG, dates.size() + "个");
}
}
});
}
public static <T> ObservableTransformer<T, T> applySchedulers(final IView view,final boolean isLoadMore) {
return new ObservableTransformer<T, T>() {
@Override
public Observable<T> apply(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(@NonNull Disposable disposable) throws Exception {
if (!isLoadMore){
view.showLoading();
}
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doAfterTerminate(new Action() {
@Override
public void run() {
view.hideLoading();//隐藏进度条
}
}).compose(RxLifecycleUtils.bindToLifecycle(view));
}
};
}
@SuppressLint("NewApi")
public Single<Boolean> isAlive() {
if (RXSDebug.isDebug()) Timber.tag(TAG).v("isAlive()");
return Single
.create((SingleEmitter<Boolean> emitter) -> {
if (ApiWrap.hasOreo()) {
emitter.onSuccess(process.isAlive());
} else {
try {
process.exitValue();
emitter.onSuccess(false);
} catch (IllegalThreadStateException e) {
emitter.onSuccess(true);
}
}
})
.subscribeOn(Schedulers.io());
}
@Override
public void fetchMoreData() {
if (TextUtils.isEmpty(getNextPageUrl()))
return;
view.showLoadingMore(true);
repository.listShotLikesForUserOfNextPage(getNextPageUrl())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.compose(((LifecycleProvider<FragmentEvent>) view).bindUntilEvent(FragmentEvent.DESTROY_VIEW))
.subscribe(listResponse -> {
view.showLoadingMore(false);
view.showMoreData(generateEpoxyModels(listResponse.body()));
setNextPageUrl(new PageLinks(listResponse).getNext());
}, throwable -> {
view.showLoadingMore(false);
view.showSnackbar(throwable.getMessage());
throwable.printStackTrace();
});
}
private void getAttrs()
{
try
{
attrs = viewModel.getAssetDefinitionService().getTokenAttrs(token, tokenId, 1);
//add extra tokenIds if required
addMultipleTokenIds(attrs);
}
catch (Exception e)
{
e.printStackTrace();
}
// Fetch attributes local to this action and add them to the injected token properties
Map<String, TSAction> functions = viewModel.getAssetDefinitionService().getTokenFunctionMap(token.tokenInfo.chainId, token.getAddress());
action = functions.get(actionMethod);
List<Attribute> localAttrs = (action != null && action.attributes != null) ? new ArrayList<>(action.attributes.values()) : null;
viewModel.getAssetDefinitionService().resolveAttrs(token, tokenIds, localAttrs)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::onAttr, this::onError, () -> displayFunction(attrs.toString()))
.isDisposed();
}
public void execute() {
final AllocineApi allocineApi = new AllocineApi(okHttpClient);
allocineApi.movieList(AllocineApi.MovieListFilter.NOW_SHOWING, AllocineApi.Profile.SMALL, AllocineApi.MovieListOrder.TOPRANK, 20, 1)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<Movie>>() {
@Override
public void accept(List<Movie> movies) throws Exception {
textView.setText(movies.toString());
Log.d("MainActivity", movies.toString());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("tag", throwable.getLocalizedMessage(), throwable);
}
});
}
@Override
public Single<List<ChanArchive>> fetchArchives() {
return api.fetchArchives("https://raw.githubusercontent.com/ccd0/4chan-x/master/src/Archive/archives.json")
.observeOn(Schedulers.io())
.toObservable()
.flatMapIterable((Function<List<FourChanArchive>, Iterable<FourChanArchive>>) fourChanArchives -> fourChanArchives)
.flatMap((Function<FourChanArchive, ObservableSource<ChanArchive>>) fourChanArchive -> {
final ChanArchive chanArchive = new ChanArchive.Builder()
.boards(fourChanArchive.getBoards())
.files(fourChanArchive.getFiles())
.domain(fourChanArchive.getDomain())
.http(fourChanArchive.getHttp())
.https(fourChanArchive.getHttps())
.software(fourChanArchive.getSoftware())
.uid(fourChanArchive.getUid())
.name(fourChanArchive.getName())
.reports(fourChanArchive.getReports())
.build();
return Observable.just(chanArchive);
})
.toList();
}
@Override
protected Single createInstanceFailingAsynchronously(RuntimeException e) {
return Single.just("X")
.delay(DELAY, TimeUnit.MILLISECONDS)
.map(x -> {
throw e;
})
.observeOn(Schedulers.io());
}
@Override
public String copyDocument(String sourceDocumentId, String targetParentDocumentId) throws FileNotFoundException
{
try
{
return Single.<String>create(em -> {
Path srcPath = getLocationsManager().
getLocation(getLocationUriFromDocumentId(sourceDocumentId)).
getCurrentPath();
Location dstLocation = getLocationsManager().
getLocation(getLocationUriFromDocumentId(targetParentDocumentId));
Directory dest = dstLocation.
getCurrentPath().
getDirectory();
Location res = dstLocation.copy();
if(srcPath.isDirectory())
res.setCurrentPath(dest.createDirectory(srcPath.getDirectory().getName()).getPath());
else if(srcPath.isFile())
res.setCurrentPath(Util.copyFile(srcPath.getFile(), dest).getPath());
Context context = getContext();
if(context!=null)
context.getContentResolver().notifyChange(getUriFromLocation(res), null);
em.onSuccess(getDocumentIdFromLocation(res));
}).
subscribeOn(Schedulers.io()).
blockingGet();
}
catch (Exception e)
{
Logger.log(e);
throw new IllegalArgumentException("Copy failed", e);
}
}
private void initData() {
Intent intent = getIntent();
this.mediaId = intent.getStringExtra(ARG_MEDIAID);
if (TextUtils.isEmpty(mediaId)) {
onError();
return;
}
RetrofitFactory.getRetrofit().create(IMobileMediaApi.class)
.getMediaProfile(mediaId)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.as(this.bindAutoDispose())
.subscribe(bean -> {
String name = bean.getData().getName();
initToolBar(toolbar, true, name);
List<MediaProfileBean.DataBean.TopTabBean> topTab = bean.getData().getTop_tab();
if (null != topTab && topTab.size() < 0) {
onError();
return;
}
initTabLayout(bean.getData());
}, throwable -> {
onError();
ErrorAction.print(throwable);
});
}
@Override public rx.Observable<List<Event>> getAll() {
return RxJavaInterop.toV1Observable(eventDAO.getAll()
.subscribeOn(Schedulers.io())
.flatMap(roomEvents -> {
try {
return Observable.just(mapper.map(roomEvents));
} catch (IOException e) {
return Observable.error(e);
}
}), BackpressureStrategy.BUFFER);
}
private void testNetworks(String method)
{
foundNetwork = false;
networkCount = ethereumNetworkRepository.getAvailableNetworkList().length;
//test all the networks
disposable = Observable.fromCallable(this::getNetworkIds)
.flatMapIterable(networkId -> networkId)
.filter(networkId -> !foundNetwork)
.flatMap(networkId -> fetchTokensInteract.getContractResponse(importOrder.contractAddress, networkId, method))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::testNetworkResult, this::onTestError);
}
@Override
public void searchMore(String seek, int offset) {
addRxSubscribe(
mModel.search(seek, offset)
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new BaseObserver<SearchSong>(mView, false, true) {
@Override
public void onNext(SearchSong searchSong) {
super.onNext(searchSong);
if (searchSong.getCode() == 0) {
ArrayList<SearchSong.DataBean.SongBean.ListBean> songListBeans =
(ArrayList<SearchSong.DataBean.SongBean.ListBean>) searchSong.getData().getSong().getList();
if (songListBeans.size() == 0) {
mView.searchMoreError();
} else {
mView.searchMoreSuccess(songListBeans);
}
} else {
mView.searchMoreError();
}
}
@Override
public void onError(Throwable e){
super.onError(e);
mView.showSearcherMoreNetworkError();
}
}));
}
public synchronized void scan() {
extendWakeLock((scanSeconds + 1) * Constants.SECOND_IN_MS);
stopScan("Scan start");
UserError.Log.d(TAG, "startScan called: hunting: " + address + " " + name);
ScanFilter filter = this.customFilter;
if (filter == null) {
final ScanFilter.Builder builder = new ScanFilter.Builder();
if (address != null) {
try {
builder.setDeviceAddress(address);
} catch (IllegalArgumentException e) {
UserError.Log.wtf(TAG, "Invalid bluetooth address: " + address);
}
}
// TODO scanning by name doesn't build a filter
filter = builder.build();
} else {
UserError.Log.d(TAG,"Overriding with custom filter");
}
scanSubscription = new Subscription(rxBleClient.scanBleDevices(
new ScanSettings.Builder()
.setCallbackType(ScanSettings.CALLBACK_TYPE_ALL_MATCHES)
.setScanMode(ScanSettings.SCAN_MODE_LOW_LATENCY)
.build(), legacyNoFilterWorkaround ? ScanFilter.empty() : filter)
.timeout(scanSeconds, TimeUnit.SECONDS) // is unreliable
.subscribeOn(Schedulers.io())
.subscribe(this::onScanResult, this::onScanFailure));
Inevitable.task(STOP_SCAN_TASK_ID, scanSeconds * Constants.SECOND_IN_MS, this::stopScanWithTimeoutCallback);
}
@Test
public void testWrite() throws Exception {
RxPaperBook book = RxPaperBook.with("WRITE", Schedulers.trampoline());
final String key = "hello";
final Completable write = book.write(key, ComplexObject.random());
Assert.assertFalse(book.book.contains(key));
final TestObserver<Void> testSubscriber = write.test();
testSubscriber.awaitTerminalEvent();
testSubscriber.assertComplete();
testSubscriber.assertNoErrors();
Assert.assertTrue(book.book.contains(key));
}
private static void demo4() throws InterruptedException {
Observable.just("One", "Two", "Three")
.subscribeOn(Schedulers.single())
.doOnNext(i -> log("doOnNext", i))
.subscribeOn(Schedulers.newThread())
.doOnNext(i -> log("doOnNext", i))
.subscribeOn(Schedulers.io())
.subscribe(i -> log("subscribe", i));
WAIT_LATCH.await();
}
public void calcIconTotal() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> flowableEmitter) throws Exception {
XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable);
int total = 0;
while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) {
if (xml.getEventType() == XmlPullParser.START_TAG) {
if (xml.getName().startsWith("item")) {
total++;
}
}
xml.next();
}
flowableEmitter.onNext(total);
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mView.setIconTotal(integer);
}
});
}
@SuppressLint("CheckResult")
@Override
public void fetchUserInfo(final BaseToken token) {
mSubscribe = Flowable.create(new FlowableOnSubscribe<QQUser>() {
@Override
public void subscribe(@NonNull FlowableEmitter<QQUser> qqUserEmitter) {
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(buildUserInfoUrl(token, URL)).build();
try {
Response response = client.newCall(request).execute();
JSONObject jsonObject = new JSONObject(response.body().string());
QQUser user = QQUser.parse(token.getOpenid(), jsonObject);
qqUserEmitter.onNext(user);
qqUserEmitter.onComplete();
} catch (IOException | JSONException e) {
ShareLogger.e(ShareLogger.INFO.FETCH_USER_INOF_ERROR);
qqUserEmitter.onError(e);
}
}
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<QQUser>() {
@Override
public void accept(@NonNull QQUser qqUser) {
mLoginListener.loginSuccess(
new LoginResultData(LoginPlatform.QQ, token, qqUser));
LoginUtil.recycle();
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) {
mLoginListener.loginFailure(new Exception(throwable), ShareLogger.INFO.ERR_FETCH_CODE);
LoginUtil.recycle();
}
});
}
public void startEthereumTicker(Token token)
{
disposable = Observable.interval(0, CHECK_ETHPRICE_INTERVAL, TimeUnit.SECONDS)
.doOnNext(l -> tokenRepository
.getTokenTicker(token)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::onTicker, this::onError)).subscribe();
}
public void createTokenTransfer(String password, String to, String contractAddress,
BigInteger amount, BigInteger gasPrice, BigInteger gasLimit) {
progress.postValue(true);
createTransactionInteract.createERC20Transfer(defaultWallet.getValue(), to, contractAddress, amount, gasPrice, gasLimit, password)
.subscribeOn(Schedulers.io())
.subscribe(this::onCreateTransaction, this::onError);
}
public void get(String url, OnHttpCallBack<String> callBack) {
Observable<String> observable = mService.get(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
if (mLifecycleProvider != null) {
observable = observable.compose(mLifecycleProvider.bindUntilEvent(FragmentEvent.DETACH));
}
observable.subscribe(new BaseSubscriber<String>() {
@Override
public void onError(@NonNull Throwable throwable) {
mSubscription = null;
callBack.onError(throwable.getMessage());
}
@Override
public void onComplete() {
mSubscription = null;
}
@Override
public void onNext(@NonNull String s) {
mSubscription = null;
callBack.onSuccess(s);
}
@Override
public void onSubscribe(Subscription subscription) {
super.onSubscribe(subscription);
mSubscription = subscription;
}
});
}