下面列出了怎么用io.reactivex.rxjava3.functions.Action的API类实例代码及写法,或者点击链接到github查看源代码。
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
* stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This Completable will be subscribed on the specified {@link Scheduler}. This will
* result in calling the provided Action on the specified scheduler every time an effect is
* dispatched to the created effect transformer.
*
* @param doEffect the {@link Action} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} that the action should be run on
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromAction(
final Action doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(F f) throws Exception {
return scheduler == null
? Completable.fromAction(doEffect)
: Completable.fromAction(doEffect).subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
* the stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This will result in calling the consumer on the specified scheduler, and passing it
* the requested effect object.
*
* @param doEffect the {@link Consumer} to be run every time the effect is requested
* @param scheduler the {@link Scheduler} to be used when invoking the consumer
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromConsumer(
final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
return new ObservableTransformer<F, E>() {
@Override
public ObservableSource<E> apply(Observable<F> effectStream) {
return effectStream
.flatMapCompletable(
new Function<F, CompletableSource>() {
@Override
public CompletableSource apply(final F effect) {
Completable completable =
Completable.fromAction(
new Action() {
@Override
public void run() throws Throwable {
doEffect.accept(effect);
}
});
return scheduler == null ? completable : completable.subscribeOn(scheduler);
}
})
.toObservable();
}
};
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Log.d(TAG, "onCreate()");
setContentView(R.layout.activity_main);
// Specifically bind this until onPause()
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "Unsubscribing subscription from onCreate()");
}
})
.compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Log.i(TAG, "Started in onCreate(), running until onPause(): " + num);
}
});
}
@Override
protected void onStart() {
super.onStart();
Log.d(TAG, "onStart()");
// Using automatic unsubscription, this should determine that the correct time to
// unsubscribe is onStop (the opposite of onStart).
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "Unsubscribing subscription from onStart()");
}
})
.compose(this.<Long>bindToLifecycle())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Log.i(TAG, "Started in onStart(), running until in onStop(): " + num);
}
});
}
@Override
protected void onResume() {
super.onResume();
Log.d(TAG, "onResume()");
// `this.<Long>` is necessary if you're compiling on JDK7 or below.
//
// If you're using JDK8+, then you can safely remove it.
Observable.interval(1, TimeUnit.SECONDS)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
Log.i(TAG, "Unsubscribing subscription from onResume()");
}
})
.compose(this.<Long>bindUntilEvent(ActivityEvent.DESTROY))
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long num) throws Exception {
Log.i(TAG, "Started in onResume(), running until in onDestroy(): " + num);
}
});
}
@SuppressWarnings("unchecked")
public static Object enter(final Object thiz, final int argc, final Object arg0, final Object arg1, final Object arg2) {
if (arg0 == null || arg0.getClass().getName().startsWith("io.reactivex.rxjava3.internal") || arg0 instanceof TracingConsumer)
return NULL;
if (!isTracingEnabled) {
isTracingEnabled = true;
TracingRxJava3Utils.enableTracing();
}
if (arg0 instanceof Observer)
return new TracingObserver<>((Observer<?>)arg0, "observer", GlobalTracer.get());
if (!(arg0 instanceof Consumer))
return NULL;
final TracingConsumer<Object> tracingConsumer;
if (argc == 1)
tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, "consumer", GlobalTracer.get());
else if (argc == 2)
tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, (Consumer<Throwable>)arg1, "consumer", GlobalTracer.get());
else if (argc == 3)
tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, (Consumer<Throwable>)arg1, (Action)arg2, "consumer", GlobalTracer.get());
else
tracingConsumer = null;
if (tracingConsumer != null)
((Observable<Object>)thiz).subscribe(tracingConsumer);
return null;
}
private static Action onComplete(final List<String> completeList, final MockTracer tracer) {
return new Action() {
@Override
public void run() {
logger.fine("onComplete()");
assertNotNull(tracer.scopeManager().active());
completeList.add(COMPLETED);
}
};
}
@Test public void refCountToUpstream() {
PublishProcessor<String> subject = PublishProcessor.create();
final AtomicInteger count = new AtomicInteger();
Flowable<String> flowable = subject //
.doOnSubscribe(new Consumer<Subscription>() {
@Override public void accept(Subscription subscription) {
count.incrementAndGet();
}
}) //
.doOnCancel(new Action() {
@Override public void run() {
count.decrementAndGet();
}
}) //
.compose(ReplayingShare.<String>instance());
TestSubscriber<String> disposable1 = flowable.subscribeWith(new TestSubscriber<String>());
assertEquals(1, count.get());
TestSubscriber<String> disposable2 = flowable.subscribeWith(new TestSubscriber<String>());
assertEquals(1, count.get());
TestSubscriber<String> disposable3 = flowable.subscribeWith(new TestSubscriber<String>());
assertEquals(1, count.get());
disposable1.cancel();
assertEquals(1, count.get());
disposable3.cancel();
assertEquals(1, count.get());
disposable2.cancel();
assertEquals(0, count.get());
}
@Test public void refCountToUpstream() {
PublishSubject<String> subject = PublishSubject.create();
final AtomicInteger count = new AtomicInteger();
Observable<String> observable = subject //
.doOnSubscribe(new Consumer<Disposable>() {
@Override public void accept(Disposable disposable) throws Exception {
count.incrementAndGet();
}
}) //
.doOnDispose(new Action() {
@Override public void run() throws Exception {
count.decrementAndGet();
}
}) //
.compose(ReplayingShare.<String>instance());
Disposable disposable1 = observable.subscribeWith(new TestObserver<String>());
assertEquals(1, count.get());
Disposable disposable2 = observable.subscribeWith(new TestObserver<String>());
assertEquals(1, count.get());
Disposable disposable3 = observable.subscribeWith(new TestObserver<String>());
assertEquals(1, count.get());
disposable1.dispose();
assertEquals(1, count.get());
disposable3.dispose();
assertEquals(1, count.get());
disposable2.dispose();
assertEquals(0, count.get());
}
public static <I, O> Connectable<I, O> fromTransformer(
@NonNull final ObservableTransformer<I, O> transformer) {
checkNotNull(transformer);
return new Connectable<I, O>() {
@Nonnull
@Override
public Connection<I> connect(com.spotify.mobius.functions.Consumer<O> output) {
final PublishSubject<I> subject = PublishSubject.create();
final Disposable disposable =
subject
.compose(transformer)
.subscribe(
new Consumer<O>() {
@Override
public void accept(O value) throws Throwable {
output.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
RxJavaPlugins.onError(error);
}
},
new Action() {
@Override
public void run() throws Throwable {}
});
return new Connection<I>() {
public void accept(I effect) {
subject.onNext(effect);
}
@Override
public void dispose() {
disposable.dispose();
}
};
}
};
}
@NonNull
public static <I, O> ObservableTransformer<I, O> toTransformer(
final Connectable<I, O> connectable) {
return new ObservableTransformer<I, O>() {
@Override
public @NonNull ObservableSource<O> apply(@NonNull Observable<I> upstream) {
return Observable.create(
new ObservableOnSubscribe<O>() {
@Override
public void subscribe(@NonNull ObservableEmitter<O> emitter) throws Throwable {
com.spotify.mobius.functions.Consumer<O> output = emitter::onNext;
final Connection<I> input = connectable.connect(output);
final Disposable disposable =
upstream.subscribe(
new Consumer<I>() {
@Override
public void accept(I value) throws Throwable {
input.accept(value);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable error) throws Throwable {
emitter.onError(error);
}
},
new Action() {
@Override
public void run() throws Throwable {
emitter.onComplete();
}
});
emitter.setCancellable(
new Cancellable() {
@Override
public void cancel() throws Throwable {
disposable.dispose();
input.dispose();
}
});
}
});
}
};
}
/**
* Add an {@link Action} for handling effects of a given type. The action will be invoked once
* for every received effect object that extends the given class.
*
* <p>Adding handlers for two effect classes where one is a super-class of the other is
* considered a collision and is not allowed. Registering the same class twice is also
* considered a collision.
*
* @param effectClass the class to handle
* @param action the action that should be invoked for the effect
* @param <G> the effect class as a type parameter
* @return this builder
* @throws IllegalArgumentException if there is a handler collision
*/
public <G extends F> RxMobius.SubtypeEffectHandlerBuilder<F, E> addAction(
final Class<G> effectClass, final Action action) {
checkNotNull(effectClass);
checkNotNull(action);
return addTransformer(effectClass, Transformers.fromAction(action));
}
/**
* Add an {@link Action} for handling effects of a given type. The action will be invoked once
* for every received effect object that extends the given class.
*
* <p>Adding handlers for two effect classes where one is a super-class of the other is
* considered a collision and is not allowed. Registering the same class twice is also
* considered a collision.
*
* @param effectClass the class to handle
* @param action the action that should be invoked for the effect
* @param scheduler the scheduler that should be used to invoke the action
* @param <G> the effect class as a type parameter
* @return this builder
* @throws IllegalArgumentException if there is a handler collision
*/
public <G extends F> RxMobius.SubtypeEffectHandlerBuilder<F, E> addAction(
final Class<G> effectClass, final Action action, Scheduler scheduler) {
checkNotNull(effectClass);
checkNotNull(action);
return addTransformer(effectClass, Transformers.fromAction(action, scheduler));
}
/**
* Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
* stream as a {@link Completable} every time it receives an effect from the upstream effects
* observable. This will result in calling the provided Action every time an effect is dispatched
* to the created effect transformer.
*
* @param doEffect the {@link Action} to be run every time the effect is requested
* @param <F> the type of Effect this transformer handles
* @param <E> these transformers are for effects that do not result in any events; however, they
* still need to share the same Event type
* @return an {@link ObservableTransformer} that can be used with a {@link
* RxMobius.SubtypeEffectHandlerBuilder}.
*/
static <F, E> ObservableTransformer<F, E> fromAction(@NonNull final Action doEffect) {
return fromAction(doEffect, null);
}