下面列出了android.os.DeadObjectException#io.reactivex.Observable 实例代码,或者点击链接到github查看源代码,也可以在右侧发表评论。
@Test
void distinct_test() {
Observable<LocalDate> dates = Observable.just(
LocalDate.of(2018, 1, 3),
LocalDate.of(2018, 3, 4),
LocalDate.of(2018, 1, 5),
LocalDate.of(2018, 11, 3)
);
// get distinct months
dates.map(LocalDate::getMonth)
.distinct()
.subscribe(System.out::println);
System.out.println("############################");
// get distinct days
dates.distinct(LocalDate::getMonth)
.subscribe(System.out::println);
}
private String combineName(List<Permission> permissions) {
return Observable.fromIterable(permissions)
.map(new Function<Permission, String>() {
@Override
public String apply(Permission permission) throws Exception {
return permission.name;
}
}).collectInto(new StringBuilder(), new BiConsumer<StringBuilder, String>() {
@Override
public void accept(StringBuilder s, String s2) throws Exception {
if (s.length() == 0) {
s.append(s2);
} else {
s.append(", ").append(s2);
}
}
}).blockingGet().toString();
}
@Test public void completeCompletesInner() {
Observable<Message> messages = Observable.just(new Message("Bob", "Hello"));
final AtomicInteger seen = new AtomicInteger();
WindowIfChanged.create(messages, userSelector)
.switchMap(
new Function<GroupedObservable<String, Message>, Observable<Notification<String>>>() {
@Override public Observable<Notification<String>> apply(
GroupedObservable<String, Message> group) {
final int count = seen.incrementAndGet();
return group.map(new Function<Message, String>() {
@Override public String apply(Message message) throws Exception {
return count + " " + message;
}
}).materialize();
}
})
.test()
.assertValues( //
Notification.createOnNext("1 Bob Hello"), //
Notification.<String>createOnComplete()) //
.assertComplete();
}
@Test
public void test() {
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(800);
emitter.onNext("B");
Thread.sleep(400);
emitter.onNext("C");
Thread.sleep(1200);
emitter.onNext("D");
emitter.onComplete();
});
source.timeout(1, TimeUnit.SECONDS)
.subscribe(
item -> System.out.println("onNext: " + item),
error -> System.out.println("onError: " + error),
() -> System.out.println("onComplete will not be printed!"));
}
@Test
public void givenSubscriberAndError_whenRetryConditionallyOnError_thenRetryConfirmed() {
TestObserver<String> testObserver = new TestObserver<>();
AtomicInteger atomicCounter = new AtomicInteger(0);
Observable
.<String>error(() -> {
atomicCounter.incrementAndGet();
return UNKNOWN_ERROR;
})
.retry((integer, throwable) -> integer < 4)
.subscribe(testObserver);
testObserver.assertError(UNKNOWN_ERROR);
testObserver.assertNotComplete();
testObserver.assertNoValues();
assertTrue("should call 4 times", atomicCounter.get() == 4);
}
@Override
protected void observeValidCreate(@NonNull EditText inputView, @NonNull Observable<String> createStream) {
final long listId = getArguments().getLong(EXTRA_LIST_ID);
final String currentListName = Select.column(ITEM_LIST.NAME)
.from(ITEM_LIST)
.where(ITEM_LIST.ID.is(listId))
.takeFirst()
.execute();
inputView.setText(currentListName);
inputView.setSelection(currentListName.length());
createStream.observeOn(Schedulers.io())
.flatMap(name -> Update
.table(ITEM_LIST)
.set(ITEM_LIST.NAME, name)
.where(ITEM_LIST.ID.is(listId))
.observe()
.toObservable())
.firstOrError()
.subscribe();
}
/**
* 获取目录
*/
public Observable<List<BookChapterBean>> getChapterList(final BookShelfBean bookShelfBean) {
if (bookSourceBean == null) {
return Observable.error(new NoSourceThrowable(bookShelfBean.getBookInfoBean().getName()));
}
BookChapterList bookChapterList = new BookChapterList(tag, bookSourceBean, true);
if (!TextUtils.isEmpty(bookShelfBean.getBookInfoBean().getChapterListHtml())) {
return bookChapterList.analyzeChapterList(bookShelfBean.getBookInfoBean().getChapterListHtml(), bookShelfBean, headerMap);
}
try {
AnalyzeUrl analyzeUrl = new AnalyzeUrl(bookShelfBean.getBookInfoBean().getChapterUrl(), headerMap, bookShelfBean.getNoteUrl());
return getResponseO(analyzeUrl)
.flatMap(response -> setCookie(response, tag))
.flatMap(response -> bookChapterList.analyzeChapterList(response.body(), bookShelfBean, headerMap));
} catch (Exception e) {
return Observable.error(new Throwable(String.format("url错误:%s", bookShelfBean.getBookInfoBean().getChapterUrl())));
}
}
void loginWithMnemonic(final String mnemonic, final String password) {
startLoading();
Observable.just(getSession())
.observeOn(Schedulers.computation())
.map((session) -> {
session.disconnect();
connect();
session.loginWithMnemonic(mnemonic, password);
return session;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe((session) -> {
onPostLogin();
stopLoading();
onLoginSuccess();
}, (final Throwable e) -> {
stopLoading();
GDKSession.get().disconnect();
final Integer code = getErrorCode(e.getMessage());
if (code == GDK.GA_ERROR) {
UI.toast(this, R.string.id_login_failed, Toast.LENGTH_LONG);
} else {
UI.toast(this, R.string.id_connection_failed, Toast.LENGTH_LONG);
}
});
}
@Test
public void aliasWithSpaces() {
final int count = 8;
final List<String> expected = Observable.fromIterable(insertAuthors(count))
.map(new Function<Author, String>() {
@Override
public String apply(Author v) {
return v.name;
}
})
.toList()
.blockingGet();
assertThat(Select
.column(AUTHOR.NAME.as("author name"))
.from(AUTHOR)
.execute())
.isEqualTo(expected);
}
public Observable<ActivityResultInfo> startForResult(final Intent intent) {
final PublishSubject<ActivityResultInfo> subject = PublishSubject.create();
return subject.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
int requestCode = generateRequestCode();
mSubjects.put(requestCode, subject);
startActivityForResult(intent, requestCode);
}
});
}
private Observable<BookShelfBean> toBookshelf(SearchBookBean searchBookBean) {
return Observable.create(e -> {
BookShelfBean bookShelfBean = BookshelfHelp.getBookFromSearchBook(searchBookBean);
e.onNext(bookShelfBean);
e.onComplete();
});
}
public void rx2oSerial() {
Observable.fromIterable(rows)
.flatMap(r -> {
String[] ws = r.split("\\s");
return Observable.fromArray(ws);
})
.filter(w -> w.length() > 4)
.map(w -> w.length())
.reduce(0, (a, b) -> a + b)
.subscribe()
;
}
@Test
void just_test() {
log("Just_test Before");
Observable
.just("Jan", "Feb", "Mar", "Apl", "May", "Jun")
.subscribe(ObservableTest::log);
log("Just_test After");
}
/**
* Adds an {@link Observable} and {@link Observer} to this group and subscribes to it. If an
* {@link Observable} with the same tag is already added, the previous one will be canceled and
* removed before adding and subscribing to the new one.
*/
<T> ManagedObservable<T> add(final String observerTag, final String observableTag,
Observable<T> observable, ObservableEmitter<? super T> observer) {
checkNotDestroyed();
final Map<String, ManagedObservable<?>> existingObservables =
getObservablesForObserver(observerTag);
ManagedObservable<?> previousObservable = existingObservables.get(observableTag);
if (previousObservable != null) {
cancelAndRemove(observerTag, observableTag);
}
ManagedObservable<T> managedObservable =
new ManagedObservable<>(observerTag, observableTag, observable, observer, new
Action() {
@Override
public void run() {
existingObservables.remove(observableTag);
}
});
existingObservables.put(observableTag, managedObservable);
if (!locked) {
managedObservable.unlock();
}
return managedObservable;
}
/**
* 检查昵称是否有效
* @param nickname 昵称
*/
public static Observable<Boolean> checkNickname(String nickname) {
JsonObject json = new JsonObject();
json.addProperty("nickname", nickname);
return userApi.checkNickname(json)
.map(it -> true)
.compose(process());
}
@Test
public void minFunction() {
final List<SimpleAllValuesMutable> vals = insertSimpleAllValues(9);
final Integer min = MathObservable.min(
Observable.fromIterable(vals)
.map(new Function<SimpleAllValuesMutable, Integer>() {
@Override
public Integer apply(SimpleAllValuesMutable v) {
return v.primitiveInt;
}
}))
.blockingFirst();
assertThat(Select
.column(Select.min(SIMPLE_ALL_VALUES_MUTABLE.PRIMITIVE_INT))
.from(SIMPLE_ALL_VALUES_MUTABLE)
.takeFirst()
.execute())
.isEqualTo(min);
Collections.sort(vals, new Comparator<SimpleAllValuesMutable>() {
@Override
public int compare(SimpleAllValuesMutable lhs, SimpleAllValuesMutable rhs) {
return lhs.string.compareTo(rhs.string);
}
});
assertThat(Select
.column(Select.min(SIMPLE_ALL_VALUES_MUTABLE.STRING))
.from(SIMPLE_ALL_VALUES_MUTABLE)
.takeFirst()
.execute())
.isEqualTo(vals.get(0).string);
}
@Override
public Observable<Boolean> isOptionEmpty() {
return Observable.fromCallable(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return !(mDaoSession.getOptionDao().count() > 0);
}
});
}
@Override
public Observable<AggregateTransaction> announceAggregateBonded(
Listener listener, SignedTransaction signedAggregateTransaction) {
Validate.notNull(signedAggregateTransaction, "signedAggregateTransaction is required");
Validate.isTrue(signedAggregateTransaction.getType() == TransactionType.AGGREGATE_BONDED,
"signedAggregateTransaction type must be AGGREGATE_BONDED");
Observable<TransactionAnnounceResponse> announce = transactionRepository
.announceAggregateBonded(signedAggregateTransaction);
return announce.flatMap(
r -> listener.aggregateBondedAddedOrError(signedAggregateTransaction.getSigner().getAddress(),
signedAggregateTransaction.getHash()));
}
private <T> List<Observable<Page<T>>> toPages(List<T> infos, Integer pageSize) {
List<List<T>> partitions = new ArrayList<>();
for (int i = 0; i < infos.size(); i += pageSize) {
partitions.add(infos.subList(i, Math.min(i + pageSize, infos.size())));
}
AtomicInteger pageNumber = new AtomicInteger();
return partitions.stream().map(
pageData -> Observable
.just(new Page<T>(pageData, pageNumber.incrementAndGet(), pageSize, infos.size(), partitions.size())))
.collect(Collectors.toList());
}
/**
* @param query
* @return
*/
@NonNull
@CheckReturnValue
public static Observable<DataSnapshot> changes(@NonNull final Query query) {
return Observable.create(new ObservableOnSubscribe<DataSnapshot>() {
@Override
public void subscribe(
@NonNull final ObservableEmitter<DataSnapshot> emit) throws Exception {
final ValueEventListener listener = new ValueEventListener() {
@Override
public void onDataChange(DataSnapshot dataSnapshot) {
if (!emit.isDisposed()) {
emit.onNext(dataSnapshot);
}
}
@Override
public void onCancelled(DatabaseError e) {
if (!emit.isDisposed()) {
emit.onError(e.toException());
}
}
};
emit.setCancellable(new Cancellable() {
@Override
public void cancel() throws Exception {
query.removeEventListener(listener);
}
});
query.addValueEventListener(listener);
}
});
}
/**
* 获取测试数据
* */
public Observable<String> getTestData() {
return clientApi.getWeather("101010100")
.flatMap(new Function<ResponseBody, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull ResponseBody responseBody) throws Exception {
return Observable.just(responseBody.string());
}
});
}
@Override
public boolean upgrade() {
logger.info("Applying OIDC scope upgrade");
domainService.findAll()
.flatMapObservable(Observable::fromIterable)
.flatMapSingle(this::createOrUpdateSystemScopes)
.subscribe();
return true;
}
@GET("/v3/projects/{id}/merge_requests/{merge_request_id}/comments")
Observable<MRNote> getV3ProjectsMerge_requestsCommentsMRNote(
@Path(value="id", encoded=true) String id,
@Query(value="page", encoded=true) Integer page,
@Query(value="per_page", encoded=true) Integer perPage,
@Path(value="merge_request_id", encoded=true) Integer mergeRequestId);
@Override
public Observable<WeixinChoiceListBean> getWeixinChoiceList(int page, int pageStrip, String
dttype, String key) {
return RetrofitCreateHelper.createApi(WeixinApi.class, WeixinApi.HOST).getWeixinChoiceList
(page, pageStrip, dttype, key).compose(RxHelper
.<WeixinChoiceListBean>rxSchedulerHelper());
}
@Ignore("Remove to run test")
@Test
public void cannotPlayAGuessTwice() {
Observable<Output> result = hangman.play(
Observable.fromArray("secret"),
Observable.fromArray("e", "c", "s", "c"));
assertThatThrownBy(() -> result.blockingLast())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Letter c was already played");
}
/**
* 分页查询,并按列排序
*
* @param orderColumn 排序列名
* @param ascending true为升序,false为降序
* @param offset 搜索下标
* @param count 搜索条数
*/
public Observable<List<T>> queryForPagesByOrderWithRx(final String orderColumn, final boolean ascending, final Long offset, final Long count) {
return Observable.create(new ObservableOnSubscribe<List<T>>() {
@Override
public void subscribe(ObservableEmitter<List<T>> e) throws Exception {
e.onNext(queryForPagesByOrder(orderColumn, ascending, offset, count));
e.onComplete();
}
}).compose(RxUtil.<List<T>>applySchedulers());
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_mock);
subscribe = Observable.interval(1, TimeUnit.SECONDS)
.subscribe();
}
public <T> Observable<T> execute(CallClazzProxy<? extends ApiResult<T>, T> proxy) {
return build().generateRequest()
.map(new ApiResultFunc(proxy.getType()))
.compose(isSyncRequest ? RxUtil._main() : RxUtil._io_main())
.compose(rxCache.transformer(cacheMode, proxy.getCallType()))
.retryWhen(new RetryExceptionFunc(retryCount, retryDelay, retryIncreaseDelay))
.compose(new ObservableTransformer() {
@Override
public ObservableSource apply(@NonNull Observable upstream) {
return upstream.map(new CacheResultFunc<T>());
}
});
}
private Observable<SearchBookBean> saveSearchBookBean(List<BookChapterBean> chapterBeanList) {
return Observable.create(e -> {
BookChapterBean chapterBean = chapterBeanList.get(chapterBeanList.size() - 1);
SearchBookBean searchBookBean = DbHelper.getDaoSession().getSearchBookBeanDao().queryBuilder()
.where(SearchBookBeanDao.Properties.NoteUrl.eq(chapterBean.getNoteUrl()))
.unique();
if (searchBookBean != null) {
searchBookBean.setLastChapter(chapterBean.getDurChapterName());
searchBookBean.setAddTime(System.currentTimeMillis());
DbHelper.getDaoSession().getSearchBookBeanDao().insertOrReplace(searchBookBean);
e.onNext(searchBookBean);
}
e.onComplete();
});
}
private void testZip() {
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer a, String b) throws Exception {
return a + b;
}
}).compose(RxUtil.applySchedulers()).subscribe(new Consumer<String>() {
@Override
public void accept(String o) throws Exception {
Log.d(TAG, o);
}
});
}