下面列出了怎么用io.reactivex.FlowableTransformer的API类实例代码及写法,或者点击链接到github查看源代码。
@Test
public void testCompletionThrows() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.completion(new Completion2<String, Integer>() {
@Override
public void accept(String state, Emitter<Integer> emitter) {
throw new ThrowingException();
}
}) //
.requestBatchSize(1) //
.build();
Flowable.just(1) //
.compose(sm) //
.test() //
.assertValues(1) //
.assertError(ThrowingException.class);
}
@Override
public Flowable<SHPage> getAcgNews(int pageIndex) {
return mRepositoryManager.obtainRetrofitService(AcgNewsService.class)
.getISHNews(pageIndex, 15, 3)
.compose(new FlowableTransformer<SHResponse<SHPage>, SHPage>() {
@Override
public Flowable<SHPage> apply(Flowable<SHResponse<SHPage>> httpResponseFlowable) {
return httpResponseFlowable
.flatMap(new Function<SHResponse<SHPage>, Flowable<SHPage>>() {
@Override
public Flowable<SHPage> apply(SHResponse<SHPage> response) {
LogUtil.d(response.toString());
if (!TextUtils.isEmpty(response.getErrMsg())) {
return Flowable.error(new ApiException(response.getErrMsg()));
} else if (response.getData() != null) {
return RxUtil.createData(response.getData());
} else {
return Flowable.error(new ApiException("数据加载失败"));
}
}
});
}
});
}
@Test
public void testOnNextThrowsWithBurstSourceThatTerminatesWithError() {
List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
try {
RxJavaPlugins.setErrorHandler(Consumers.addTo(list));
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(new Transition2<String, Integer, Integer>() {
@Override
public String apply(String state, Integer value, Emitter<Integer> emitter) {
throw new ThrowingException();
}
}) //
.requestBatchSize(10) //
.build();
Burst.item(1).error(new RuntimeException()) //
.compose(sm) //
.test() //
.assertNoValues() //
.assertError(ThrowingException.class);
assertEquals(1, list.size());
} finally {
RxJavaPlugins.reset();
}
}
@Test
public void errorActionThrows() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.errored(new Errored<String, Integer>() {
@Override
public void accept(String state, Throwable error, Emitter<Integer> emitter) {
throw new ThrowingException();
}
}) //
.build();
Flowable.<Integer> error(new RuntimeException()) //
.compose(sm) //
.test() //
.assertNoValues() //
.assertError(ThrowingException.class);
}
@Test
public void noActionTransition() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(new Transition2<String, Integer, Integer>() {
@Override
public String apply(String state, Integer value, Emitter<Integer> emitter) {
return state;
}
}) //
.build();
Flowable.just(1, 2) //
.compose(sm) //
.test() //
.assertNoValues() //
.assertComplete();
}
public <R> Disposable subscribe(Consumer<R> onNext, Consumer<Throwable> onError, Action onCompleted, FlowableTransformer<T, R> transformer)
{
Flowable flowable = build(false);
if (transformer != null)
flowable = flowable.compose(transformer);
if (onNext == null)
onNext = data -> {};
if (onError == null)
onError = error -> { throw new OnErrorNotImplementedException(error); };
if (onCompleted == null)
onCompleted = () -> {};
Consumer<R> actualOnNext = onNext;
if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled)
actualOnNext = RxBusUtil.wrapQueueConsumer(onNext, mQueuer);
flowable = applySchedular(flowable);
Disposable disposable = flowable.subscribe(actualOnNext, onError, onCompleted);
if (mBoundObject != null)
RxDisposableManager.addDisposable(mBoundObject, disposable);
return disposable;
}
@Test
public void testOnNextThrowsWithBurstSource() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(new Transition2<String, Integer, Integer>() {
@Override
public String apply(String state, Integer value, Emitter<Integer> emitter) {
throw new ThrowingException();
}
}) //
.requestBatchSize(10) //
.build();
Burst.items(1, 2, 3).create() //
.compose(sm) //
.test() //
.assertNoValues() //
.assertError(ThrowingException.class);
}
public static <R extends Result> FlowableTransformer<R, R> forFlowable() {
return upstream -> upstream.onErrorResumeNext(throwable -> {
if(throwable instanceof StatusException) {
StatusException statusException = (StatusException) throwable;
if(statusException.getStatus().hasResolution()) {
return Flowable.just((R) statusException.getResult());
} else {
return Flowable.error(throwable);
}
} else {
return Flowable.error(throwable);
}
});
}
/**
* 绑定一个 Activity、Fragment 的生命周期,自动释放资源
* <br/>
* 例如:网络请求时绑定{@link ActivityEvent#STOP} 或 {@link FragmentEvent#STOP},
* onStop()时会自动取消网络请求.
*
* @param event 事件类型
* @see ActivityEvent
* @see FragmentEvent
*/
@SuppressWarnings("SpellCheckingInspection")
@Override
public <Type> FlowableTransformer<Type, Type> bindEventWithFlowable(final int event) {
final Flowable<Integer> observable = mBehaviorSubject
.toFlowable(BackpressureStrategy.LATEST)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer == event;
}
})
.take(1);
return new FlowableTransformer<Type, Type>() {
@Override
public Publisher<Type> apply(@io.reactivex.annotations.NonNull Flowable<Type> upstream) {
return upstream.takeUntil(observable);
}
};
}
/**
* 绑定一个 Activity、Fragment 的生命周期,自动释放资源
* <br/>
* 例如:网络请求时绑定{@link ActivityEvent#STOP} 或 {@link FragmentEvent#STOP},
* onStop()时会自动取消网络请求.
*
* @param event 事件类型
* @see ActivityEvent
* @see FragmentEvent
*/
@SuppressWarnings("SpellCheckingInspection")
@Override
public <Type> FlowableTransformer<Type, Type> bindEventWithFlowable(final int event) {
final Flowable<Integer> observable = mBehaviorSubject
.toFlowable(BackpressureStrategy.LATEST)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer == event;
}
})
.take(1);
return new FlowableTransformer<Type, Type>() {
@Override
public Publisher<Type> apply(@io.reactivex.annotations.NonNull Flowable<Type> upstream) {
return upstream.takeUntil(observable);
}
};
}
@Test
public void errorActionPassThrough() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.errored(new Errored<String, Integer>() {
@Override
public void accept(String state, Throwable error, Emitter<Integer> emitter) {
emitter.onError_(error);
}
}) //
.build();
Flowable.<Integer> error(new ThrowingException()) //
.compose(sm) //
.test() //
.assertNoValues() //
.assertError(ThrowingException.class);
}
@Test
public void testPassThroughEmitterOnNextAfterCompletion() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.completion(new Completion2<String, Integer>() {
@Override
public void accept(String state, Emitter<Integer> emitter) {
emitter.onComplete_();
emitter.onNext_(8);
}
}) //
.requestBatchSize(1) //
.build();
Flowable.just(1, 2, 3, 4, 5, 6) //
.compose(sm) //
.test() //
.assertValues(1, 2, 3, 4, 5, 6) //
.assertComplete();
}
public static <T, R extends Number> FlowableTransformer<T, Pair<T, Statistics>> collectStats(
final Function<? super T, ? extends R> function) {
return new FlowableTransformer<T, Pair<T, Statistics>>() {
@Override
public Flowable<Pair<T, Statistics>> apply(Flowable<T> source) {
return source.scan(Pair.create((T) null, Statistics.create()),
new BiFunction<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
@Override
public Pair<T, Statistics> apply(Pair<T, Statistics> pair, T t) throws Exception {
return Pair.create(t, pair.b().add(function.apply(t)));
}
}).skip(1);
}
};
}
@Test
public void testPassThroughEmitterCompletesTwice() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.completion(new Completion2<String, Integer>() {
@Override
public void accept(String state, Emitter<Integer> emitter) {
emitter.onComplete_();
emitter.onComplete_();
}
}) //
.requestBatchSize(1) //
.build();
Flowable.just(1, 2, 3, 4, 5, 6) //
.compose(sm) //
.test() //
.assertValues(1, 2, 3, 4, 5, 6) //
.assertComplete();
}
@Test
public void testPassThroughWithCustomCompletion() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.completion(new Completion2<String, Integer>() {
@Override
public void accept(String state, Emitter<Integer> emitter) {
emitter.onComplete_();
}
}) //
.requestBatchSize(1) //
.build();
Flowable.just(1, 2, 3, 4, 5, 6) //
.compose(sm) //
.test() //
.assertValues(1, 2, 3, 4, 5, 6) //
.assertComplete();
}
public static <T> FlowableTransformer<T, T> rebatchRequests(final int minRequest, final long maxRequest,
final boolean constrainFirstRequestMin) {
Preconditions.checkArgument(minRequest <= maxRequest, "minRequest cannot be greater than maxRequest");
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> source) {
if (minRequest == maxRequest && constrainFirstRequestMin) {
return source.rebatchRequests(minRequest);
} else {
return source
.compose(Transformers.<T>minRequest(constrainFirstRequestMin ? minRequest : 1, minRequest))
.compose(Transformers.<T>maxRequest(maxRequest));
}
}
};
}
@Test
public void testCancelFromTransition() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(new Transition2<String, Integer, Integer>() {
@Override
public String apply(String state, Integer value, Emitter<Integer> emitter) {
emitter.cancel_();
return state;
}
}) //
.requestBatchSize(10) //
.build();
Burst.items(1, 2, 3).create() //
.compose(sm) //
.test() //
.assertNoValues() //
.assertNotTerminated();
}
public FlowableUseCase(final UseCaseExecutor useCaseExecutor,
final PostExecutionThread postExecutionThread) {
super(useCaseExecutor, postExecutionThread);
schedulersTransformer = new FlowableTransformer<R, R>() {
@Override
public Flowable<R> apply(Flowable<R> rObservable) {
return rObservable.subscribeOn(useCaseExecutor.getScheduler())
.observeOn(postExecutionThread.getScheduler());
}
};
}
public <T> FlowableTransformer<T, CacheResult<T>> transformFlowable(final String key, final Type type, final IFlowableStrategy strategy) {
return new FlowableTransformer<T, CacheResult<T>>() {
@Override
public Publisher<CacheResult<T>> apply(Flowable<T> flowable) {
return strategy.flow(RxCache.this, key, flowable, type);
}
};
}
private static FlowableTransformer<Integer, Integer> passThrough(int batchSize) {
return StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.requestBatchSize(batchSize) //
.build();
}
public static <T> FlowableTransformer<T, T> normalTransformer() {
return observable -> observable.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
// .observeOn(Schedulers.io())
// .onErrorResumeNext(e -> { 可能会卡顿,舍弃了
// Throwable throwable;
// if (NetworkUtils.isNetworkAvailable()) {
// throwable = e;
// } else {
// throwable = new ApiException(CodeConstant.CODE_NO_NET, e.getMessage());
// }
// return Flowable.error(throwable);
// })
.observeOn(AndroidSchedulers.mainThread());
}
/**
* 统一线程切换
*
* @return
*/
public static <T> FlowableTransformer<T, T> getFlowableScheduler() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream
.onErrorResumeNext(new ServerResultErrorFunc2<T>())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
public static <T> FlowableTransformer<T, T> getFlowableScheduler(final Function<? super Flowable<Throwable>, ? extends Publisher<?>> retryWhenHandler) {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream
.retryWhen(retryWhenHandler)
.onErrorResumeNext(new ServerResultErrorFunc2<T>())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
public static <T> FlowableTransformer<T, T> getFlowableScheduler(final Function<? super Flowable<Throwable>, ? extends Publisher<?>> retryWhenHandler, final Function<? super Throwable, ? extends Publisher<? extends T>> resumeFunction) {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream
.retryWhen(retryWhenHandler)
.onErrorResumeNext(resumeFunction)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
public static <T>FlowableTransformer<T,T> flScheduers(){
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidRequestBatchSize() {
FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
.initialState("") //
.transition(PASS_THROUGH_TRANSITION) //
.requestBatchSize(-10) //
.build();
Flowable.just(1).compose(sm).test();
}
public <T> FlowableTransformer<T, T> applyFlowableMainThread() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> flowable) {
return flowable.observeOn(AndroidSchedulers.mainThread());
}
};
}
public static <State, In, Out> FlowableTransformer<In, Out> create(Callable<? extends State> initialState,
Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
BackpressureStrategy backpressureStrategy, int requestBatchSize) {
return new TransformerStateMachine<State, In, Out>(initialState, transition, completion, backpressureStrategy,
requestBatchSize);
}
public static <State, In, Out> FlowableTransformer<In, Out> stateMachine(Callable<? extends State> initialState,
Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
BackpressureStrategy backpressureStrategy, int requestBatchSize) {
return TransformerStateMachine.create(initialState, transition, completion, backpressureStrategy,
requestBatchSize);
}
public static <T> FlowableTransformer<T, T> mapLast(final Function<? super T, ? extends T> function) {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return new FlowableMapLast<T>(upstream, function);
}
};
}