类io.reactivex.rxjava3.functions.Action源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.functions.Action的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
 * stream as a {@link Completable} every time it receives an effect from the upstream effects
 * observable. This Completable will be subscribed on the specified {@link Scheduler}. This will
 * result in calling the provided Action on the specified scheduler every time an effect is
 * dispatched to the created effect transformer.
 *
 * @param doEffect the {@link Action} to be run every time the effect is requested
 * @param scheduler the {@link Scheduler} that the action should be run on
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     RxMobius.SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromAction(
    final Action doEffect, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream
          .flatMapCompletable(
              new Function<F, CompletableSource>() {
                @Override
                public CompletableSource apply(F f) throws Exception {
                  return scheduler == null
                      ? Completable.fromAction(doEffect)
                      : Completable.fromAction(doEffect).subscribeOn(scheduler);
                }
              })
          .toObservable();
    }
  };
}
 
源代码2 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
 * the stream as a {@link Completable} every time it receives an effect from the upstream effects
 * observable. This will result in calling the consumer on the specified scheduler, and passing it
 * the requested effect object.
 *
 * @param doEffect the {@link Consumer} to be run every time the effect is requested
 * @param scheduler the {@link Scheduler} to be used when invoking the consumer
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     RxMobius.SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromConsumer(
    final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream
          .flatMapCompletable(
              new Function<F, CompletableSource>() {
                @Override
                public CompletableSource apply(final F effect) {
                  Completable completable =
                      Completable.fromAction(
                          new Action() {
                            @Override
                            public void run() throws Throwable {
                              doEffect.accept(effect);
                            }
                          });
                  return scheduler == null ? completable : completable.subscribeOn(scheduler);
                }
              })
          .toObservable();
    }
  };
}
 
源代码3 项目: RxLifecycle   文件: MainActivity.java
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);

    Log.d(TAG, "onCreate()");

    setContentView(R.layout.activity_main);

    // Specifically bind this until onPause()
    Observable.interval(1, TimeUnit.SECONDS)
        .doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                Log.i(TAG, "Unsubscribing subscription from onCreate()");
            }
        })
        .compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long num) throws Exception {
                Log.i(TAG, "Started in onCreate(), running until onPause(): " + num);
            }
        });
}
 
源代码4 项目: RxLifecycle   文件: MainActivity.java
@Override
protected void onStart() {
    super.onStart();

    Log.d(TAG, "onStart()");

    // Using automatic unsubscription, this should determine that the correct time to
    // unsubscribe is onStop (the opposite of onStart).
    Observable.interval(1, TimeUnit.SECONDS)
        .doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                Log.i(TAG, "Unsubscribing subscription from onStart()");
            }
        })
        .compose(this.<Long>bindToLifecycle())
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long num) throws Exception {
                Log.i(TAG, "Started in onStart(), running until in onStop(): " + num);
            }
        });
}
 
源代码5 项目: RxLifecycle   文件: MainActivity.java
@Override
protected void onResume() {
    super.onResume();

    Log.d(TAG, "onResume()");

    // `this.<Long>` is necessary if you're compiling on JDK7 or below.
    //
    // If you're using JDK8+, then you can safely remove it.
    Observable.interval(1, TimeUnit.SECONDS)
        .doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                Log.i(TAG, "Unsubscribing subscription from onResume()");
            }
        })
        .compose(this.<Long>bindUntilEvent(ActivityEvent.DESTROY))
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long num) throws Exception {
                Log.i(TAG, "Started in onResume(), running until in onDestroy(): " + num);
            }
        });
}
 
源代码6 项目: java-specialagent   文件: RxJava3AgentIntercept.java
@SuppressWarnings("unchecked")
public static Object enter(final Object thiz, final int argc, final Object arg0, final Object arg1, final Object arg2) {
  if (arg0 == null || arg0.getClass().getName().startsWith("io.reactivex.rxjava3.internal") || arg0 instanceof TracingConsumer)
    return NULL;

  if (!isTracingEnabled) {
    isTracingEnabled = true;
    TracingRxJava3Utils.enableTracing();
  }

  if (arg0 instanceof Observer)
    return new TracingObserver<>((Observer<?>)arg0, "observer", GlobalTracer.get());

  if (!(arg0 instanceof Consumer))
    return NULL;

  final TracingConsumer<Object> tracingConsumer;
  if (argc == 1)
    tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, "consumer", GlobalTracer.get());
  else if (argc == 2)
    tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, (Consumer<Throwable>)arg1, "consumer", GlobalTracer.get());
  else if (argc == 3)
    tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, (Consumer<Throwable>)arg1, (Action)arg2, "consumer", GlobalTracer.get());
  else
    tracingConsumer = null;

  if (tracingConsumer != null)
    ((Observable<Object>)thiz).subscribe(tracingConsumer);

  return null;
}
 
源代码7 项目: java-specialagent   文件: RxJava3Test.java
private static Action onComplete(final List<String> completeList, final MockTracer tracer) {
  return new Action() {
    @Override
    public void run() {
      logger.fine("onComplete()");
      assertNotNull(tracer.scopeManager().active());
      completeList.add(COMPLETED);
    }
  };
}
 
@Test public void refCountToUpstream() {
  PublishProcessor<String> subject = PublishProcessor.create();

  final AtomicInteger count = new AtomicInteger();
  Flowable<String> flowable = subject //
      .doOnSubscribe(new Consumer<Subscription>() {
        @Override public void accept(Subscription subscription) {
          count.incrementAndGet();
        }
      }) //
      .doOnCancel(new Action() {
        @Override public void run() {
          count.decrementAndGet();
        }
      }) //
      .compose(ReplayingShare.<String>instance());

  TestSubscriber<String> disposable1 = flowable.subscribeWith(new TestSubscriber<String>());
  assertEquals(1, count.get());

  TestSubscriber<String> disposable2 = flowable.subscribeWith(new TestSubscriber<String>());
  assertEquals(1, count.get());

  TestSubscriber<String> disposable3 = flowable.subscribeWith(new TestSubscriber<String>());
  assertEquals(1, count.get());

  disposable1.cancel();
  assertEquals(1, count.get());

  disposable3.cancel();
  assertEquals(1, count.get());

  disposable2.cancel();
  assertEquals(0, count.get());
}
 
@Test public void refCountToUpstream() {
  PublishSubject<String> subject = PublishSubject.create();

  final AtomicInteger count = new AtomicInteger();
  Observable<String> observable = subject //
      .doOnSubscribe(new Consumer<Disposable>() {
        @Override public void accept(Disposable disposable) throws Exception {
          count.incrementAndGet();
        }
      }) //
      .doOnDispose(new Action() {
        @Override public void run() throws Exception {
          count.decrementAndGet();
        }
      }) //
      .compose(ReplayingShare.<String>instance());

  Disposable disposable1 = observable.subscribeWith(new TestObserver<String>());
  assertEquals(1, count.get());

  Disposable disposable2 = observable.subscribeWith(new TestObserver<String>());
  assertEquals(1, count.get());

  Disposable disposable3 = observable.subscribeWith(new TestObserver<String>());
  assertEquals(1, count.get());

  disposable1.dispose();
  assertEquals(1, count.get());

  disposable3.dispose();
  assertEquals(1, count.get());

  disposable2.dispose();
  assertEquals(0, count.get());
}
 
源代码10 项目: mobius   文件: RxConnectables.java
public static <I, O> Connectable<I, O> fromTransformer(
    @NonNull final ObservableTransformer<I, O> transformer) {
  checkNotNull(transformer);
  return new Connectable<I, O>() {
    @Nonnull
    @Override
    public Connection<I> connect(com.spotify.mobius.functions.Consumer<O> output) {
      final PublishSubject<I> subject = PublishSubject.create();

      final Disposable disposable =
          subject
              .compose(transformer)
              .subscribe(
                  new Consumer<O>() {
                    @Override
                    public void accept(O value) throws Throwable {
                      output.accept(value);
                    }
                  },
                  new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable error) throws Throwable {
                      RxJavaPlugins.onError(error);
                    }
                  },
                  new Action() {
                    @Override
                    public void run() throws Throwable {}
                  });

      return new Connection<I>() {
        public void accept(I effect) {
          subject.onNext(effect);
        }

        @Override
        public void dispose() {
          disposable.dispose();
        }
      };
    }
  };
}
 
源代码11 项目: mobius   文件: RxConnectables.java
@NonNull
public static <I, O> ObservableTransformer<I, O> toTransformer(
    final Connectable<I, O> connectable) {
  return new ObservableTransformer<I, O>() {
    @Override
    public @NonNull ObservableSource<O> apply(@NonNull Observable<I> upstream) {
      return Observable.create(
          new ObservableOnSubscribe<O>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<O> emitter) throws Throwable {
              com.spotify.mobius.functions.Consumer<O> output = emitter::onNext;
              final Connection<I> input = connectable.connect(output);
              final Disposable disposable =
                  upstream.subscribe(
                      new Consumer<I>() {
                        @Override
                        public void accept(I value) throws Throwable {
                          input.accept(value);
                        }
                      },
                      new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable error) throws Throwable {
                          emitter.onError(error);
                        }
                      },
                      new Action() {
                        @Override
                        public void run() throws Throwable {
                          emitter.onComplete();
                        }
                      });

              emitter.setCancellable(
                  new Cancellable() {
                    @Override
                    public void cancel() throws Throwable {
                      disposable.dispose();
                      input.dispose();
                    }
                  });
            }
          });
    }
  };
}
 
源代码12 项目: mobius   文件: RxMobius.java
/**
 * Add an {@link Action} for handling effects of a given type. The action will be invoked once
 * for every received effect object that extends the given class.
 *
 * <p>Adding handlers for two effect classes where one is a super-class of the other is
 * considered a collision and is not allowed. Registering the same class twice is also
 * considered a collision.
 *
 * @param effectClass the class to handle
 * @param action the action that should be invoked for the effect
 * @param <G> the effect class as a type parameter
 * @return this builder
 * @throws IllegalArgumentException if there is a handler collision
 */
public <G extends F> RxMobius.SubtypeEffectHandlerBuilder<F, E> addAction(
    final Class<G> effectClass, final Action action) {
  checkNotNull(effectClass);
  checkNotNull(action);
  return addTransformer(effectClass, Transformers.fromAction(action));
}
 
源代码13 项目: mobius   文件: RxMobius.java
/**
 * Add an {@link Action} for handling effects of a given type. The action will be invoked once
 * for every received effect object that extends the given class.
 *
 * <p>Adding handlers for two effect classes where one is a super-class of the other is
 * considered a collision and is not allowed. Registering the same class twice is also
 * considered a collision.
 *
 * @param effectClass the class to handle
 * @param action the action that should be invoked for the effect
 * @param scheduler the scheduler that should be used to invoke the action
 * @param <G> the effect class as a type parameter
 * @return this builder
 * @throws IllegalArgumentException if there is a handler collision
 */
public <G extends F> RxMobius.SubtypeEffectHandlerBuilder<F, E> addAction(
    final Class<G> effectClass, final Action action, Scheduler scheduler) {
  checkNotNull(effectClass);
  checkNotNull(action);
  return addTransformer(effectClass, Transformers.fromAction(action, scheduler));
}
 
源代码14 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Action} into the
 * stream as a {@link Completable} every time it receives an effect from the upstream effects
 * observable. This will result in calling the provided Action every time an effect is dispatched
 * to the created effect transformer.
 *
 * @param doEffect the {@link Action} to be run every time the effect is requested
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     RxMobius.SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromAction(@NonNull final Action doEffect) {
  return fromAction(doEffect, null);
}
 
 类所在包
 类方法
 同包方法