类io.reactivex.FlowableTransformer源码实例Demo

下面列出了怎么用io.reactivex.FlowableTransformer的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: rxjava2-extras   文件: FlowableStateMachineTest.java
@Test
public void testCompletionThrows() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(PASS_THROUGH_TRANSITION) //
            .completion(new Completion2<String, Integer>() {
                @Override
                public void accept(String state, Emitter<Integer> emitter) {
                    throw new ThrowingException();
                }
            }) //
            .requestBatchSize(1) //
            .build();
    Flowable.just(1) //
            .compose(sm) //
            .test() //
            .assertValues(1) //
            .assertError(ThrowingException.class);
}
 
源代码2 项目: AcgClub   文件: ISHNewsModel.java
@Override
public Flowable<SHPage> getAcgNews(int pageIndex) {
  return mRepositoryManager.obtainRetrofitService(AcgNewsService.class)
      .getISHNews(pageIndex, 15, 3)
      .compose(new FlowableTransformer<SHResponse<SHPage>, SHPage>() {
        @Override
        public Flowable<SHPage> apply(Flowable<SHResponse<SHPage>> httpResponseFlowable) {
          return httpResponseFlowable
              .flatMap(new Function<SHResponse<SHPage>, Flowable<SHPage>>() {
                @Override
                public Flowable<SHPage> apply(SHResponse<SHPage> response) {
                  LogUtil.d(response.toString());
                  if (!TextUtils.isEmpty(response.getErrMsg())) {
                    return Flowable.error(new ApiException(response.getErrMsg()));
                  } else if (response.getData() != null) {
                    return RxUtil.createData(response.getData());
                  } else {
                    return Flowable.error(new ApiException("数据加载失败"));
                  }
                }
              });
        }
      });
}
 
源代码3 项目: rxjava2-extras   文件: FlowableStateMachineTest.java
@Test
public void testOnNextThrowsWithBurstSourceThatTerminatesWithError() {
    List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
    try {
        RxJavaPlugins.setErrorHandler(Consumers.addTo(list));
        FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
                .initialState("") //
                .transition(new Transition2<String, Integer, Integer>() {

                    @Override
                    public String apply(String state, Integer value, Emitter<Integer> emitter) {
                        throw new ThrowingException();
                    }
                }) //
                .requestBatchSize(10) //
                .build();
        Burst.item(1).error(new RuntimeException()) //
                .compose(sm) //
                .test() //
                .assertNoValues() //
                .assertError(ThrowingException.class);
        assertEquals(1, list.size());
    } finally {
        RxJavaPlugins.reset();
    }
}
 
源代码4 项目: rxjava2-extras   文件: FlowableStateMachineTest.java
@Test
public void errorActionThrows() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(PASS_THROUGH_TRANSITION) //
            .errored(new Errored<String, Integer>() {
                @Override
                public void accept(String state, Throwable error, Emitter<Integer> emitter) {
                    throw new ThrowingException();
                }
            }) //
            .build();
    Flowable.<Integer> error(new RuntimeException()) //
            .compose(sm) //
            .test() //
            .assertNoValues() //
            .assertError(ThrowingException.class);
}
 
源代码5 项目: rxjava2-extras   文件: FlowableStateMachineTest.java
@Test
public void noActionTransition() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(new Transition2<String, Integer, Integer>() {
                @Override
                public String apply(String state, Integer value, Emitter<Integer> emitter) {
                    return state;
                }
            }) //
            .build();
    Flowable.just(1, 2) //
            .compose(sm) //
            .test() //
            .assertNoValues() //
            .assertComplete();
}
 
源代码6 项目: RxBus2   文件: RxBusBuilder.java
public <R> Disposable subscribe(Consumer<R> onNext, Consumer<Throwable> onError, Action onCompleted, FlowableTransformer<T, R> transformer)
{
    Flowable flowable = build(false);
    if (transformer != null)
        flowable = flowable.compose(transformer);

    if (onNext == null)
        onNext = data -> {};
    if (onError == null)
        onError = error -> { throw new OnErrorNotImplementedException(error); };
    if (onCompleted == null)
        onCompleted = () -> {};

    Consumer<R> actualOnNext = onNext;
    if (mQueuer != null && mQueueSubscriptionSafetyCheckEnabled)
        actualOnNext = RxBusUtil.wrapQueueConsumer(onNext, mQueuer);

    flowable = applySchedular(flowable);
    Disposable disposable = flowable.subscribe(actualOnNext, onError, onCompleted);
    if (mBoundObject != null)
        RxDisposableManager.addDisposable(mBoundObject, disposable);
    return disposable;
}
 
源代码7 项目: rxjava2-extras   文件: FlowableStateMachineTest.java
@Test
public void testOnNextThrowsWithBurstSource() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(new Transition2<String, Integer, Integer>() {

                @Override
                public String apply(String state, Integer value, Emitter<Integer> emitter) {
                    throw new ThrowingException();
                }
            }) //
            .requestBatchSize(10) //
            .build();
    Burst.items(1, 2, 3).create() //
            .compose(sm) //
            .test() //
            .assertNoValues() //
            .assertError(ThrowingException.class);
}
 
public static <R extends Result> FlowableTransformer<R, R> forFlowable() {
    return upstream -> upstream.onErrorResumeNext(throwable -> {
        if(throwable instanceof StatusException) {
            StatusException statusException = (StatusException) throwable;

            if(statusException.getStatus().hasResolution()) {
                return Flowable.just((R) statusException.getResult());
            } else {
                return Flowable.error(throwable);
            }

        } else {
            return Flowable.error(throwable);
        }
    });
}
 
源代码9 项目: BaseProject   文件: RxAutoCleanDelegate.java
/**
 * 绑定一个 Activity、Fragment 的生命周期,自动释放资源
 * <br/>
 * 例如:网络请求时绑定{@link ActivityEvent#STOP} 或 {@link FragmentEvent#STOP},
 * onStop()时会自动取消网络请求.
 *
 * @param event 事件类型
 * @see ActivityEvent
 * @see FragmentEvent
 */
@SuppressWarnings("SpellCheckingInspection")
@Override
public <Type> FlowableTransformer<Type, Type> bindEventWithFlowable(final int event) {
    final Flowable<Integer> observable = mBehaviorSubject
            .toFlowable(BackpressureStrategy.LATEST)
            .filter(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer == event;
                }
            })
            .take(1);

    return new FlowableTransformer<Type, Type>() {
        @Override
        public Publisher<Type> apply(@io.reactivex.annotations.NonNull Flowable<Type> upstream) {
            return upstream.takeUntil(observable);
        }
    };
}
 
源代码10 项目: BaseProject   文件: RxAutoCleanDelegate.java
/**
 * 绑定一个 Activity、Fragment 的生命周期,自动释放资源
 * <br/>
 * 例如:网络请求时绑定{@link ActivityEvent#STOP} 或 {@link FragmentEvent#STOP},
 * onStop()时会自动取消网络请求.
 *
 * @param event 事件类型
 * @see ActivityEvent
 * @see FragmentEvent
 */
@SuppressWarnings("SpellCheckingInspection")
@Override
public <Type> FlowableTransformer<Type, Type> bindEventWithFlowable(final int event) {
    final Flowable<Integer> observable = mBehaviorSubject
            .toFlowable(BackpressureStrategy.LATEST)
            .filter(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
                    return integer == event;
                }
            })
            .take(1);

    return new FlowableTransformer<Type, Type>() {
        @Override
        public Publisher<Type> apply(@io.reactivex.annotations.NonNull Flowable<Type> upstream) {
            return upstream.takeUntil(observable);
        }
    };
}
 
源代码11 项目: rxjava2-extras   文件: FlowableStateMachineTest.java
@Test
public void errorActionPassThrough() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(PASS_THROUGH_TRANSITION) //
            .errored(new Errored<String, Integer>() {
                @Override
                public void accept(String state, Throwable error, Emitter<Integer> emitter) {
                    emitter.onError_(error);
                }
            }) //
            .build();
    Flowable.<Integer> error(new ThrowingException()) //
            .compose(sm) //
            .test() //
            .assertNoValues() //
            .assertError(ThrowingException.class);
}
 
源代码12 项目: rxjava2-extras   文件: FlowableStateMachineTest.java
@Test
public void testPassThroughEmitterOnNextAfterCompletion() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(PASS_THROUGH_TRANSITION) //
            .completion(new Completion2<String, Integer>() {
                @Override
                public void accept(String state, Emitter<Integer> emitter) {
                    emitter.onComplete_();
                    emitter.onNext_(8);
                }
            }) //
            .requestBatchSize(1) //
            .build();
    Flowable.just(1, 2, 3, 4, 5, 6) //
            .compose(sm) //
            .test() //
            .assertValues(1, 2, 3, 4, 5, 6) //
            .assertComplete();
}
 
源代码13 项目: rxjava2-extras   文件: Transformers.java
public static <T, R extends Number> FlowableTransformer<T, Pair<T, Statistics>> collectStats(
        final Function<? super T, ? extends R> function) {
    return new FlowableTransformer<T, Pair<T, Statistics>>() {

        @Override
        public Flowable<Pair<T, Statistics>> apply(Flowable<T> source) {
            return source.scan(Pair.create((T) null, Statistics.create()),
                    new BiFunction<Pair<T, Statistics>, T, Pair<T, Statistics>>() {
                        @Override
                        public Pair<T, Statistics> apply(Pair<T, Statistics> pair, T t) throws Exception {
                            return Pair.create(t, pair.b().add(function.apply(t)));
                        }
                    }).skip(1);
        }
    };
}
 
源代码14 项目: rxjava2-extras   文件: FlowableStateMachineTest.java
@Test
public void testPassThroughEmitterCompletesTwice() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(PASS_THROUGH_TRANSITION) //
            .completion(new Completion2<String, Integer>() {
                @Override
                public void accept(String state, Emitter<Integer> emitter) {
                    emitter.onComplete_();
                    emitter.onComplete_();
                }
            }) //
            .requestBatchSize(1) //
            .build();
    Flowable.just(1, 2, 3, 4, 5, 6) //
            .compose(sm) //
            .test() //
            .assertValues(1, 2, 3, 4, 5, 6) //
            .assertComplete();
}
 
源代码15 项目: rxjava2-extras   文件: FlowableStateMachineTest.java
@Test
public void testPassThroughWithCustomCompletion() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(PASS_THROUGH_TRANSITION) //
            .completion(new Completion2<String, Integer>() {
                @Override
                public void accept(String state, Emitter<Integer> emitter) {
                    emitter.onComplete_();
                }
            }) //
            .requestBatchSize(1) //
            .build();
    Flowable.just(1, 2, 3, 4, 5, 6) //
            .compose(sm) //
            .test() //
            .assertValues(1, 2, 3, 4, 5, 6) //
            .assertComplete();
}
 
源代码16 项目: rxjava2-extras   文件: Transformers.java
public static <T> FlowableTransformer<T, T> rebatchRequests(final int minRequest, final long maxRequest,
        final boolean constrainFirstRequestMin) {
    Preconditions.checkArgument(minRequest <= maxRequest, "minRequest cannot be greater than maxRequest");
    return new FlowableTransformer<T, T>() {

        @Override
        public Publisher<T> apply(Flowable<T> source) {
            if (minRequest == maxRequest && constrainFirstRequestMin) {
                return source.rebatchRequests(minRequest);
            } else {
                return source
                        .compose(Transformers.<T>minRequest(constrainFirstRequestMin ? minRequest : 1, minRequest))
                        .compose(Transformers.<T>maxRequest(maxRequest));
            }
        }
    };
}
 
源代码17 项目: rxjava2-extras   文件: FlowableStateMachineTest.java
@Test
public void testCancelFromTransition() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(new Transition2<String, Integer, Integer>() {

                @Override
                public String apply(String state, Integer value, Emitter<Integer> emitter) {
                   emitter.cancel_();
                   return state;
                }
            }) //
            .requestBatchSize(10) //
            .build();
    Burst.items(1, 2, 3).create() //
            .compose(sm) //
            .test() //
            .assertNoValues() //
            .assertNotTerminated();
}
 
源代码18 项目: EasyMVP   文件: FlowableUseCase.java
public FlowableUseCase(final UseCaseExecutor useCaseExecutor,
                       final PostExecutionThread postExecutionThread) {
    super(useCaseExecutor, postExecutionThread);
    schedulersTransformer = new FlowableTransformer<R, R>() {
        @Override
        public Flowable<R> apply(Flowable<R> rObservable) {
            return rObservable.subscribeOn(useCaseExecutor.getScheduler())
                    .observeOn(postExecutionThread.getScheduler());
        }
    };
}
 
源代码19 项目: RxCache   文件: RxCache.java
public <T> FlowableTransformer<T, CacheResult<T>> transformFlowable(final String key, final Type type, final IFlowableStrategy strategy) {
    return new FlowableTransformer<T, CacheResult<T>>() {
        @Override
        public Publisher<CacheResult<T>> apply(Flowable<T> flowable) {
            return strategy.flow(RxCache.this, key, flowable, type);
        }
    };
}
 
源代码20 项目: rxjava2-extras   文件: FlowableStateMachineTest.java
private static FlowableTransformer<Integer, Integer> passThrough(int batchSize) {

        return StateMachine2.builder() //
                .initialState("") //
                .transition(PASS_THROUGH_TRANSITION) //
                .requestBatchSize(batchSize) //
                .build();
    }
 
源代码21 项目: CrazyDaily   文件: RxTransformerUtil.java
public static <T> FlowableTransformer<T, T> normalTransformer() {
        return observable -> observable.subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
//                .observeOn(Schedulers.io())
//                .onErrorResumeNext(e -> {  可能会卡顿,舍弃了
//                    Throwable throwable;
//                    if (NetworkUtils.isNetworkAvailable()) {
//                        throwable = e;
//                    } else {
//                        throwable = new ApiException(CodeConstant.CODE_NO_NET, e.getMessage());
//                    }
//                    return Flowable.error(throwable);
//                })
                .observeOn(AndroidSchedulers.mainThread());
    }
 
源代码22 项目: XDroid-Databinding   文件: XApi.java
/**
 * 统一线程切换
 *
 * @return
 */
public static <T> FlowableTransformer<T, T> getFlowableScheduler() {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream
                    .onErrorResumeNext(new ServerResultErrorFunc2<T>())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码23 项目: XDroid-Databinding   文件: XApi.java
public static <T> FlowableTransformer<T, T> getFlowableScheduler(final Function<? super Flowable<Throwable>, ? extends Publisher<?>> retryWhenHandler) {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream
                    .retryWhen(retryWhenHandler)
                    .onErrorResumeNext(new ServerResultErrorFunc2<T>())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码24 项目: XDroid-Databinding   文件: XApi.java
public static <T> FlowableTransformer<T, T> getFlowableScheduler(final Function<? super Flowable<Throwable>, ? extends Publisher<?>> retryWhenHandler, final Function<? super Throwable, ? extends Publisher<? extends T>> resumeFunction) {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream
                    .retryWhen(retryWhenHandler)
                    .onErrorResumeNext(resumeFunction)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码25 项目: ViewPagerHelper   文件: RxUtils.java
public static <T>FlowableTransformer<T,T> flScheduers(){
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return upstream.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码26 项目: rxjava2-extras   文件: FlowableStateMachineTest.java
@Test(expected = IllegalArgumentException.class)
public void testInvalidRequestBatchSize() {
    FlowableTransformer<Integer, Integer> sm = StateMachine2.builder() //
            .initialState("") //
            .transition(PASS_THROUGH_TRANSITION) //
            .requestBatchSize(-10) //
            .build();
    Flowable.just(1).compose(sm).test();
}
 
源代码27 项目: RxJava2-Android-Samples   文件: RxSchedulers.java
public <T> FlowableTransformer<T, T> applyFlowableMainThread() {
    return new FlowableTransformer<T, T>() {
        @Override
        public Publisher<T> apply(Flowable<T> flowable) {
            return flowable.observeOn(AndroidSchedulers.mainThread());
        }
    };
}
 
源代码28 项目: rxjava2-extras   文件: TransformerStateMachine.java
public static <State, In, Out> FlowableTransformer<In, Out> create(Callable<? extends State> initialState,
        Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
        BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
        BackpressureStrategy backpressureStrategy, int requestBatchSize) {
    return new TransformerStateMachine<State, In, Out>(initialState, transition, completion, backpressureStrategy,
            requestBatchSize);
}
 
源代码29 项目: rxjava2-extras   文件: Transformers.java
public static <State, In, Out> FlowableTransformer<In, Out> stateMachine(Callable<? extends State> initialState,
        Function3<? super State, ? super In, ? super FlowableEmitter<Out>, ? extends State> transition,
        BiPredicate<? super State, ? super FlowableEmitter<Out>> completion,
        BackpressureStrategy backpressureStrategy, int requestBatchSize) {
    return TransformerStateMachine.create(initialState, transition, completion, backpressureStrategy,
            requestBatchSize);
}
 
源代码30 项目: rxjava2-extras   文件: Transformers.java
public static <T> FlowableTransformer<T, T> mapLast(final Function<? super T, ? extends T> function) {
    return new FlowableTransformer<T, T>() {

        @Override
        public Publisher<T> apply(Flowable<T> upstream) {
            return new FlowableMapLast<T>(upstream, function);
        }

    };

}
 
 类所在包
 同包方法