下面列出了io.reactivex.Observable#create ( ) 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
public Observable<PermissionResult> request(final List<String> permissions) {
return Observable.create(new ObservableOnSubscribe<PermissionResult>() {
@Override
public void subscribe(final ObservableEmitter<PermissionResult> emitter) throws Exception {
runtimePermission
.request(permissions)
.onResponse(new ResponseCallback() {
@Override
public void onResponse(PermissionResult result) {
if (result.isAccepted()) {
emitter.onNext(result);
emitter.onComplete();
} else {
emitter.onError(new Error(result));
}
}
}).ask();
}
});
}
public static void main(String[] args) {
String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May"};
List<String> months = Arrays.asList(monthArray);
Observable<Object> observable = Observable.create(subscriber -> {
try {
System.out.println("creating ");
for (String data : months) {
subscriber.onNext(data);
}
subscriber.onComplete();
} catch (Exception e) {
// TODO: handle exception
subscriber.onError(e);
}
});
observable.subscribe(item -> System.out.println("month:-" + item));
observable.subscribe(item -> System.out.println("month:-" + item));
}
private Observable<Integer> getOriginalObservable() {
final List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5, 6);
return Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
for (Integer integer : integers) {
if (!emitter.isDisposed()) {
emitter.onNext(integer);
}
}
if (!emitter.isDisposed()) {
emitter.onComplete();
}
}
});
}
public Publisher<Score> talkScores(final String title) {
Observable<Score> observable = Observable.create(e -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(() -> {
Score s = Score.builder()
.title(title)
.score(Integer.valueOf((int) Math.floor(Math.random()*10)))
.build();
e.onNext(s);
}, 0, 2, TimeUnit.SECONDS);
});
ConnectableObservable connectableObservable = observable.share().publish();
connectableObservable.connect();
return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}
/**
* @param query
* @return
*/
@NonNull
@CheckReturnValue
public static Observable<DataSnapshot> changes(@NonNull final Query query) {
return Observable.create(new ObservableOnSubscribe<DataSnapshot>() {
@Override
public void subscribe(
@NonNull final ObservableEmitter<DataSnapshot> emit) throws Exception {
final ValueEventListener listener = new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
if (!emit.isDisposed()) {
emit.onNext(dataSnapshot);
}
}
@Override
public void onCancelled(DatabaseError e) {
if (!emit.isDisposed()) {
emit.onError(e.toException());
}
}
};
emit.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(listener);
}
});
query.addValueEventListener(listener);
}
});
}
public Observable<Data> getData() {
return Observable.create(emitter -> {
if (data != null) {
emitter.onNext(data);
}
emitter.onComplete();
});
}
public Observable<Boolean> getBoolean(final String key, final boolean defaultValue) {
return Observable.create(new ObservableOnSubscribe<Boolean>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Boolean> e) throws Exception {
e.onNext(sharedPreferences.getBoolean(key, defaultValue));
e.onComplete();
}
});
}
@Override
public void getWebsite(final int userId) {
Observable<String> observableWebsite = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String website = PredatorDatabase.getInstance()
.getWebsiteForUser(userId);
if (!TextUtils.isEmpty(website)) {
emitter.onNext(website);
} else {
emitter.onError(new NullPointerException("No website available for this user."));
}
emitter.onComplete();
}
});
observableWebsite.subscribeOn(Schedulers.io());
observableWebsite.observeOn(AndroidSchedulers.mainThread());
mCompositeDisposable.add(observableWebsite.subscribeWith(new DisposableObserver<String>(){
@Override
public void onNext(String value) {
Logger.d("onNext: getWebsite: " + value);
mView.websiteAvailable(value);
}
@Override
public void onError(Throwable e) {
Logger.e(TAG, "onError: " + e.getMessage(), e);
mView.websiteUnavailable();
}
@Override
public void onComplete() {
Logger.d(TAG, "onComplete: fetched user website");
}
}));
}
private Observable<String> getObservable(final String value) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
// simulate long task
Thread.sleep(3000);
emitter.onNext(value);
emitter.onComplete();
}
});
}
/**
* 根据时间读取缓存
*
* @param type 保存的类型
* @param key 缓存key
* @param time 保存时间
*/
public <T> Observable<T> load(final Type type, final String key, final long time) {
return Observable.create(new SimpleSubscribe<T>() {
@Override
T execute() {
return cacheCore.load(type, key, time);
}
});
}
/**
* 保存
*
* @param key 缓存key
* @param value 缓存Value
*/
public <T> Observable<Boolean> save(final String key, final T value) {
return Observable.create(new SimpleSubscribe<Boolean>() {
@Override
Boolean execute() throws Throwable {
cacheCore.save(key, value);
return true;
}
});
}
public RxProcess(ProcessFactory processFactory, ProcessKiller processKiller, String... commands) {
this.processCreator = Observable.create(e -> {
final Process process = processFactory.start(commands);
e.setCancellable(() -> {
if (RXSDebug.isDebug()) Timber.tag(TAG).v("cancel()");
processKiller.kill(process);
});
e.onNext(process);
process.waitFor();
e.onComplete();
});
}
@Override
public Observable<String> uploadAvatar(String imgPath) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("http://p2.wmpic.me/article/2015/04/15/1429062874_HiUlpSXT.jpeg");
e.onComplete();
}
});
}
private static Observable<String> getFileNames() {
return Observable.create(emitter -> {
Files.walkFileTree(DIRECTORY,
new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path path, BasicFileAttributes attr) {
// ...
return FileVisitResult.CONTINUE;
}
});
// ...
});
}
public static void main(String[] args) {
Observable observable = Observable.create(observer -> {
observer.onNext("I am Hot Observable " + Math.random()*100);
observer.onComplete();
});
observable.subscribe(consumer -> System.out.println("message:-" + consumer));
observable.subscribe(consumer -> System.out.println("message:-" + consumer));
}
public static Observable<Contact> fetch (@NonNull final LimitColumn columnLimitChoice, @NonNull final Context context) {
return Observable.create(new ObservableOnSubscribe<Contact>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Contact> e) throws Exception {
new RxContacts(context).fetch(columnLimitChoice, e);
}
});
}
Observable<BookContentBean> analyzeBookContent(final String s, final BaseChapterBean chapterBean, final BaseChapterBean nextChapterBean, BookShelfBean bookShelfBean, Map<String, String> headerMap) {
return Observable.create(e -> {
if (TextUtils.isEmpty(s)) {
e.onError(new Throwable(MApplication.getInstance().getString(R.string.get_content_error) + chapterBean.getDurChapterUrl()));
return;
}
if (TextUtils.isEmpty(baseUrl)) {
baseUrl = NetworkUtils.getAbsoluteURL(bookShelfBean.getBookInfoBean().getChapterUrl(), chapterBean.getDurChapterUrl());
}
//if (StringUtils.isJsonType(s) && !MApplication.getInstance().getDonateHb()) {
// e.onError(new Throwable(MApplication.getInstance().getString(R.string.donate_s)));
// e.onComplete();
// return;
//}
Debug.printLog(tag, "┌成功获取正文页");
Debug.printLog(tag, "└" + baseUrl);
BookContentBean bookContentBean = new BookContentBean();
bookContentBean.setDurChapterIndex(chapterBean.getDurChapterIndex());
bookContentBean.setDurChapterUrl(chapterBean.getDurChapterUrl());
bookContentBean.setTag(tag);
AnalyzeRule analyzer = new AnalyzeRule(bookShelfBean);
WebContentBean webContentBean = analyzeBookContent(analyzer, s, chapterBean.getDurChapterUrl(), baseUrl);
bookContentBean.setDurChapterContent(webContentBean.content);
/*
* 处理分页
*/
if (!TextUtils.isEmpty(webContentBean.nextUrl)) {
List<String> usedUrlList = new ArrayList<>();
usedUrlList.add(chapterBean.getDurChapterUrl());
BaseChapterBean nextChapter;
if (nextChapterBean != null) {
nextChapter = nextChapterBean;
} else {
nextChapter = DbHelper.getDaoSession().getBookChapterBeanDao().queryBuilder()
.where(BookChapterBeanDao.Properties.NoteUrl.eq(chapterBean.getNoteUrl()),
BookChapterBeanDao.Properties.DurChapterIndex.eq(chapterBean.getDurChapterIndex() + 1))
.build().unique();
}
while (!TextUtils.isEmpty(webContentBean.nextUrl) && !usedUrlList.contains(webContentBean.nextUrl)) {
usedUrlList.add(webContentBean.nextUrl);
if (nextChapter != null && NetworkUtils.getAbsoluteURL(baseUrl, webContentBean.nextUrl).equals(NetworkUtils.getAbsoluteURL(baseUrl, nextChapter.getDurChapterUrl()))) {
break;
}
AnalyzeUrl analyzeUrl = new AnalyzeUrl(webContentBean.nextUrl, headerMap, tag);
try {
String body;
Response<String> response = BaseModelImpl.getInstance().getResponseO(analyzeUrl).blockingFirst();
body = response.body();
webContentBean = analyzeBookContent(analyzer, body, webContentBean.nextUrl, baseUrl);
if (!TextUtils.isEmpty(webContentBean.content)) {
bookContentBean.setDurChapterContent(bookContentBean.getDurChapterContent() + "\n" + webContentBean.content);
}
} catch (Exception exception) {
if (!e.isDisposed()) {
e.onError(exception);
}
}
}
}
e.onNext(bookContentBean);
e.onComplete();
});
}
Observable<BookContentBean> analyzeBookContent(final String s, final BaseChapterBean chapterBean, final BaseChapterBean nextChapterBean, BookShelfBean bookShelfBean, Map<String, String> headerMap) {
return Observable.create(e -> {
if (TextUtils.isEmpty(s)) {
e.onError(new Throwable(MApplication.getInstance().getString(R.string.get_content_error) + chapterBean.getDurChapterUrl()));
return;
}
if (TextUtils.isEmpty(baseUrl)) {
baseUrl = NetworkUtils.getAbsoluteURL(bookShelfBean.getBookInfoBean().getChapterUrl(), chapterBean.getDurChapterUrl());
}
if (StringUtils.isJsonType(s) && !MApplication.getInstance().getDonateHb()) {
e.onError(new VipThrowable());
e.onComplete();
return;
}
Debug.printLog(tag, "┌成功获取正文页");
Debug.printLog(tag, "└" + baseUrl);
BookContentBean bookContentBean = new BookContentBean();
bookContentBean.setDurChapterIndex(chapterBean.getDurChapterIndex());
bookContentBean.setDurChapterUrl(chapterBean.getDurChapterUrl());
bookContentBean.setTag(tag);
AnalyzeRule analyzer = new AnalyzeRule(bookShelfBean);
WebContentBean webContentBean = analyzeBookContent(analyzer, s, chapterBean.getDurChapterUrl(), baseUrl);
bookContentBean.setDurChapterContent(webContentBean.content);
/*
* 处理分页
*/
if (!TextUtils.isEmpty(webContentBean.nextUrl)) {
List<String> usedUrlList = new ArrayList<>();
usedUrlList.add(chapterBean.getDurChapterUrl());
BaseChapterBean nextChapter;
if (nextChapterBean != null) {
nextChapter = nextChapterBean;
} else {
nextChapter = DbHelper.getDaoSession().getBookChapterBeanDao().queryBuilder()
.where(BookChapterBeanDao.Properties.NoteUrl.eq(chapterBean.getNoteUrl()),
BookChapterBeanDao.Properties.DurChapterIndex.eq(chapterBean.getDurChapterIndex() + 1))
.build().unique();
}
while (!TextUtils.isEmpty(webContentBean.nextUrl) && !usedUrlList.contains(webContentBean.nextUrl)) {
usedUrlList.add(webContentBean.nextUrl);
if (nextChapter != null && NetworkUtils.getAbsoluteURL(baseUrl, webContentBean.nextUrl).equals(NetworkUtils.getAbsoluteURL(baseUrl, nextChapter.getDurChapterUrl()))) {
break;
}
AnalyzeUrl analyzeUrl = new AnalyzeUrl(webContentBean.nextUrl, headerMap, tag);
try {
String body;
Response<String> response = BaseModelImpl.getInstance().getResponseO(analyzeUrl).blockingFirst();
body = response.body();
webContentBean = analyzeBookContent(analyzer, body, webContentBean.nextUrl, baseUrl);
if (!TextUtils.isEmpty(webContentBean.content)) {
bookContentBean.setDurChapterContent(bookContentBean.getDurChapterContent() + "\n" + webContentBean.content);
}
} catch (Exception exception) {
if (!e.isDisposed()) {
e.onError(exception);
}
}
}
}
e.onNext(bookContentBean);
e.onComplete();
});
}
/**
* @param query
* @return
*/
@NonNull
@CheckReturnValue
public static Observable<ChildEvent> childEvents(@NonNull final Query query) {
return Observable.create(new QueryChildEventsOnSubscribe(query));
}
public StockTickerPublisher() {
Observable<StockPriceUpdate> stockPriceUpdateObservable = Observable.create(emitter -> {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(newStockTick(emitter), 0, 2, TimeUnit.SECONDS);
});
ConnectableObservable<StockPriceUpdate> connectableObservable = stockPriceUpdateObservable.share().publish();
connectableObservable.connect();
publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
}