io.reactivex.Observable#create ( )源码实例Demo

下面列出了io.reactivex.Observable#create ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。

源代码1 项目: RuntimePermission   文件: RxPermissions.java
public Observable<PermissionResult> request(final List<String> permissions) {
    return Observable.create(new ObservableOnSubscribe<PermissionResult>() {
        @Override
        public void subscribe(final ObservableEmitter<PermissionResult> emitter) throws Exception {
            runtimePermission
                    .request(permissions)
                    .onResponse(new ResponseCallback() {
                        @Override
                        public void onResponse(PermissionResult result) {
                            if (result.isAccepted()) {
                                emitter.onNext(result);
                                emitter.onComplete();
                            } else {
                                emitter.onError(new Error(result));
                            }
                        }
                    }).ask();
        }
    });
}
 
public static void main(String[] args) {
	String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May"};
	List<String> months = Arrays.asList(monthArray);

	Observable<Object> observable = Observable.create(subscriber -> {
		try {
			System.out.println("creating ");
			for (String data : months) {
				subscriber.onNext(data);
			}
			subscriber.onComplete();
		} catch (Exception e) {
			// TODO: handle exception
			subscriber.onError(e);
		}
	});
	observable.subscribe(item -> System.out.println("month:-" + item));
	observable.subscribe(item -> System.out.println("month:-" + item));

}
 
源代码3 项目: RxAndroid-Sample   文件: ExampleUnitTest.java
private Observable<Integer> getOriginalObservable() {
    final List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6);

    return Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) {
                    for (Integer integer : integers) {

                        if (!emitter.isDisposed()) {
                            emitter.onNext(integer);
                        }
                    }

                    if (!emitter.isDisposed()) {
                        emitter.onComplete();
                    }
                }

            });
}
 
源代码4 项目: dive-into-graphql-in-java   文件: Subscription.java
public Publisher<Score> talkScores(final String title) {
    Observable<Score> observable = Observable.create(e -> {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(() -> {
            Score s = Score.builder()
                           .title(title)
                           .score(Integer.valueOf((int) Math.floor(Math.random()*10)))
                           .build();
            e.onNext(s);
        }, 0, 2, TimeUnit.SECONDS);
    });

    ConnectableObservable connectableObservable = observable.share().publish();
    connectableObservable.connect();
    return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
 
源代码5 项目: rxfirebase   文件: RxValue.java
/**
 * @param query
 * @return
 */
@NonNull
@CheckReturnValue
public static Observable<DataSnapshot> changes(@NonNull final Query query) {
    return Observable.create(new ObservableOnSubscribe<DataSnapshot>() {
        @Override
        public void subscribe(
                @NonNull final ObservableEmitter<DataSnapshot> emit) throws Exception {
            final ValueEventListener listener = new ValueEventListener() {
                @Override
                public void onDataChange(DataSnapshot dataSnapshot) {
                    if (!emit.isDisposed()) {
                        emit.onNext(dataSnapshot);
                    }
                }

                @Override
                public void onCancelled(DatabaseError e) {
                    if (!emit.isDisposed()) {
                        emit.onError(e.toException());
                    }
                }
            };

            emit.setCancellable(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    query.removeEventListener(listener);
                }
            });

            query.addValueEventListener(listener);
        }
    });
}
 
源代码6 项目: RxJava2-Android-Samples   文件: DiskDataSource.java
public Observable<Data> getData() {
    return Observable.create(emitter -> {
        if (data != null) {
            emitter.onNext(data);
        }
        emitter.onComplete();
    });
}
 
源代码7 项目: RxSharedPreferences   文件: RxSharedPreferences.java
public Observable<Boolean> getBoolean(final String key, final boolean defaultValue) {
    return Observable.create(new ObservableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Boolean> e) throws Exception {
            e.onNext(sharedPreferences.getBoolean(key, defaultValue));
            e.onComplete();
        }
    });
}
 
源代码8 项目: Capstone-Project   文件: UserProfilePresenter.java
@Override
public void getWebsite(final int userId) {
    Observable<String> observableWebsite = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            String website = PredatorDatabase.getInstance()
                    .getWebsiteForUser(userId);
            if (!TextUtils.isEmpty(website)) {
                emitter.onNext(website);
            } else {
                emitter.onError(new NullPointerException("No website available for this user."));
            }
            emitter.onComplete();
        }
    });

    observableWebsite.subscribeOn(Schedulers.io());
    observableWebsite.observeOn(AndroidSchedulers.mainThread());

    mCompositeDisposable.add(observableWebsite.subscribeWith(new DisposableObserver<String>(){
        @Override
        public void onNext(String value) {
            Logger.d("onNext: getWebsite: " + value);
            mView.websiteAvailable(value);
        }

        @Override
        public void onError(Throwable e) {
            Logger.e(TAG, "onError: " + e.getMessage(), e);
            mView.websiteUnavailable();
        }

        @Override
        public void onComplete() {
            Logger.d(TAG, "onComplete: fetched user website");
        }
    }));
}
 
源代码9 项目: RxJavaPriorityScheduler   文件: MainActivity.java
private Observable<String> getObservable(final String value) {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            // simulate long task
            Thread.sleep(3000);
            emitter.onNext(value);
            emitter.onComplete();
        }
    });
}
 
源代码10 项目: RxEasyHttp   文件: RxCache.java
/**
 * 根据时间读取缓存
 *
 * @param type 保存的类型
 * @param key  缓存key
 * @param time 保存时间
 */
public <T> Observable<T> load(final Type type, final String key, final long time) {
    return Observable.create(new SimpleSubscribe<T>() {
        @Override
        T execute() {
            return cacheCore.load(type, key, time);
        }
    });
}
 
源代码11 项目: RxEasyHttp   文件: RxCache.java
/**
 * 保存
 *
 * @param key   缓存key
 * @param value 缓存Value
 */
public <T> Observable<Boolean> save(final String key, final T value) {
    return Observable.create(new SimpleSubscribe<Boolean>() {
        @Override
        Boolean execute() throws Throwable {
            cacheCore.save(key, value);
            return true;
        }
    });
}
 
源代码12 项目: RxShell   文件: RxProcess.java
public RxProcess(ProcessFactory processFactory, ProcessKiller processKiller, String... commands) {
    this.processCreator = Observable.create(e -> {
        final Process process = processFactory.start(commands);
        e.setCancellable(() -> {
            if (RXSDebug.isDebug()) Timber.tag(TAG).v("cancel()");
            processKiller.kill(process);
        });
        e.onNext(process);
        process.waitFor();
        e.onComplete();
    });
}
 
源代码13 项目: smart-farmer-android   文件: RegisterModelImpl.java
@Override
public Observable<String> uploadAvatar(String imgPath) {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
            e.onNext("http://p2.wmpic.me/article/2015/04/15/1429062874_HiUlpSXT.jpeg");
            e.onComplete();
        }
    });
}
 
源代码14 项目: rxjava2-lab   文件: Code8.java
private static Observable<String> getFileNames() {
    return Observable.create(emitter -> {
        Files.walkFileTree(DIRECTORY,
            new SimpleFileVisitor<Path>() {
                @Override
                public FileVisitResult visitFile(Path path, BasicFileAttributes attr) {
                    // ...
                    return FileVisitResult.CONTINUE;
                }
            });
        // ...
    });
}
 
public static void main(String[] args) {

		Observable observable = Observable.create(observer -> {
			observer.onNext("I am Hot Observable " + Math.random()*100);
			observer.onComplete();
		});

		observable.subscribe(consumer -> System.out.println("message:-" + consumer));
		observable.subscribe(consumer -> System.out.println("message:-" + consumer));
	}
 
源代码16 项目: MultiContactPicker   文件: RxContacts.java
public static Observable<Contact> fetch (@NonNull final LimitColumn columnLimitChoice, @NonNull final Context context) {
    return Observable.create(new ObservableOnSubscribe<Contact>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Contact> e) throws Exception {
            new RxContacts(context).fetch(columnLimitChoice, e);
        }
    });
}
 
源代码17 项目: a   文件: BookContent.java
Observable<BookContentBean> analyzeBookContent(final String s, final BaseChapterBean chapterBean, final BaseChapterBean nextChapterBean, BookShelfBean bookShelfBean, Map<String, String> headerMap) {
    return Observable.create(e -> {
        if (TextUtils.isEmpty(s)) {
            e.onError(new Throwable(MApplication.getInstance().getString(R.string.get_content_error) + chapterBean.getDurChapterUrl()));
            return;
        }
        if (TextUtils.isEmpty(baseUrl)) {
            baseUrl = NetworkUtils.getAbsoluteURL(bookShelfBean.getBookInfoBean().getChapterUrl(), chapterBean.getDurChapterUrl());
        }
        //if (StringUtils.isJsonType(s) && !MApplication.getInstance().getDonateHb()) {
        //    e.onError(new Throwable(MApplication.getInstance().getString(R.string.donate_s)));
        //    e.onComplete();
        //    return;
        //}
        Debug.printLog(tag, "┌成功获取正文页");
        Debug.printLog(tag, "└" + baseUrl);
        BookContentBean bookContentBean = new BookContentBean();
        bookContentBean.setDurChapterIndex(chapterBean.getDurChapterIndex());
        bookContentBean.setDurChapterUrl(chapterBean.getDurChapterUrl());
        bookContentBean.setTag(tag);
        AnalyzeRule analyzer = new AnalyzeRule(bookShelfBean);
        WebContentBean webContentBean = analyzeBookContent(analyzer, s, chapterBean.getDurChapterUrl(), baseUrl);
        bookContentBean.setDurChapterContent(webContentBean.content);

        /*
         * 处理分页
         */
        if (!TextUtils.isEmpty(webContentBean.nextUrl)) {
            List<String> usedUrlList = new ArrayList<>();
            usedUrlList.add(chapterBean.getDurChapterUrl());
            BaseChapterBean nextChapter;
            if (nextChapterBean != null) {
                nextChapter = nextChapterBean;
            } else {
                nextChapter = DbHelper.getDaoSession().getBookChapterBeanDao().queryBuilder()
                        .where(BookChapterBeanDao.Properties.NoteUrl.eq(chapterBean.getNoteUrl()),
                                BookChapterBeanDao.Properties.DurChapterIndex.eq(chapterBean.getDurChapterIndex() + 1))
                        .build().unique();
            }

            while (!TextUtils.isEmpty(webContentBean.nextUrl) && !usedUrlList.contains(webContentBean.nextUrl)) {
                usedUrlList.add(webContentBean.nextUrl);
                if (nextChapter != null && NetworkUtils.getAbsoluteURL(baseUrl, webContentBean.nextUrl).equals(NetworkUtils.getAbsoluteURL(baseUrl, nextChapter.getDurChapterUrl()))) {
                    break;
                }
                AnalyzeUrl analyzeUrl = new AnalyzeUrl(webContentBean.nextUrl, headerMap, tag);
                try {
                    String body;
                    Response<String> response = BaseModelImpl.getInstance().getResponseO(analyzeUrl).blockingFirst();
                    body = response.body();
                    webContentBean = analyzeBookContent(analyzer, body, webContentBean.nextUrl, baseUrl);
                    if (!TextUtils.isEmpty(webContentBean.content)) {
                        bookContentBean.setDurChapterContent(bookContentBean.getDurChapterContent() + "\n" + webContentBean.content);
                    }
                } catch (Exception exception) {
                    if (!e.isDisposed()) {
                        e.onError(exception);
                    }
                }
            }
        }
        e.onNext(bookContentBean);
        e.onComplete();
    });
}
 
源代码18 项目: MyBookshelf   文件: BookContent.java
Observable<BookContentBean> analyzeBookContent(final String s, final BaseChapterBean chapterBean, final BaseChapterBean nextChapterBean, BookShelfBean bookShelfBean, Map<String, String> headerMap) {
    return Observable.create(e -> {
        if (TextUtils.isEmpty(s)) {
            e.onError(new Throwable(MApplication.getInstance().getString(R.string.get_content_error) + chapterBean.getDurChapterUrl()));
            return;
        }
        if (TextUtils.isEmpty(baseUrl)) {
            baseUrl = NetworkUtils.getAbsoluteURL(bookShelfBean.getBookInfoBean().getChapterUrl(), chapterBean.getDurChapterUrl());
        }
        if (StringUtils.isJsonType(s) && !MApplication.getInstance().getDonateHb()) {
            e.onError(new VipThrowable());
            e.onComplete();
            return;
        }
        Debug.printLog(tag, "┌成功获取正文页");
        Debug.printLog(tag, "└" + baseUrl);
        BookContentBean bookContentBean = new BookContentBean();
        bookContentBean.setDurChapterIndex(chapterBean.getDurChapterIndex());
        bookContentBean.setDurChapterUrl(chapterBean.getDurChapterUrl());
        bookContentBean.setTag(tag);
        AnalyzeRule analyzer = new AnalyzeRule(bookShelfBean);
        WebContentBean webContentBean = analyzeBookContent(analyzer, s, chapterBean.getDurChapterUrl(), baseUrl);
        bookContentBean.setDurChapterContent(webContentBean.content);

        /*
         * 处理分页
         */
        if (!TextUtils.isEmpty(webContentBean.nextUrl)) {
            List<String> usedUrlList = new ArrayList<>();
            usedUrlList.add(chapterBean.getDurChapterUrl());
            BaseChapterBean nextChapter;
            if (nextChapterBean != null) {
                nextChapter = nextChapterBean;
            } else {
                nextChapter = DbHelper.getDaoSession().getBookChapterBeanDao().queryBuilder()
                        .where(BookChapterBeanDao.Properties.NoteUrl.eq(chapterBean.getNoteUrl()),
                                BookChapterBeanDao.Properties.DurChapterIndex.eq(chapterBean.getDurChapterIndex() + 1))
                        .build().unique();
            }

            while (!TextUtils.isEmpty(webContentBean.nextUrl) && !usedUrlList.contains(webContentBean.nextUrl)) {
                usedUrlList.add(webContentBean.nextUrl);
                if (nextChapter != null && NetworkUtils.getAbsoluteURL(baseUrl, webContentBean.nextUrl).equals(NetworkUtils.getAbsoluteURL(baseUrl, nextChapter.getDurChapterUrl()))) {
                    break;
                }
                AnalyzeUrl analyzeUrl = new AnalyzeUrl(webContentBean.nextUrl, headerMap, tag);
                try {
                    String body;
                    Response<String> response = BaseModelImpl.getInstance().getResponseO(analyzeUrl).blockingFirst();
                    body = response.body();
                    webContentBean = analyzeBookContent(analyzer, body, webContentBean.nextUrl, baseUrl);
                    if (!TextUtils.isEmpty(webContentBean.content)) {
                        bookContentBean.setDurChapterContent(bookContentBean.getDurChapterContent() + "\n" + webContentBean.content);
                    }
                } catch (Exception exception) {
                    if (!e.isDisposed()) {
                        e.onError(exception);
                    }
                }
            }
        }
        e.onNext(bookContentBean);
        e.onComplete();
    });
}
 
源代码19 项目: rxfirebase   文件: RxFirebaseDatabase.java
/**
 * @param query
 * @return
 */
@NonNull
@CheckReturnValue
public static Observable<ChildEvent> childEvents(@NonNull final Query query) {
    return Observable.create(new QueryChildEventsOnSubscribe(query));
}
 
public StockTickerPublisher() {
    Observable<StockPriceUpdate> stockPriceUpdateObservable = Observable.create(emitter -> {

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(newStockTick(emitter), 0, 2, TimeUnit.SECONDS);

    });

    ConnectableObservable<StockPriceUpdate> connectableObservable = stockPriceUpdateObservable.share().publish();
    connectableObservable.connect();

    publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}