类io.reactivex.rxjava3.functions.Consumer源码实例Demo

下面列出了怎么用io.reactivex.rxjava3.functions.Consumer的API类实例代码及写法,或者点击链接到github查看源代码。

源代码1 项目: mobius   文件: RxEventSources.java
/**
 * Create an observable from the given event source.
 *
 * @param eventSource the eventSource you want to convert to an observable
 * @param <E> the event type
 * @return an Observable based on the provided event source
 */
@NonNull
public static <E> Observable<E> toObservable(final EventSource<E> eventSource) {
  return Observable.create(
      new ObservableOnSubscribe<E>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<E> emitter) throws Throwable {
          final com.spotify.mobius.disposables.Disposable disposable =
              eventSource.subscribe(
                  new com.spotify.mobius.functions.Consumer<E>() {
                    @Override
                    public void accept(E value) {
                      emitter.onNext(value);
                    }
                  });
          emitter.setCancellable(
              new Cancellable() {
                @Override
                public void cancel() throws Throwable {
                  disposable.dispose();
                }
              });
        }
      });
}
 
源代码2 项目: mobius   文件: RxMobius.java
/**
 * Optionally set a shared error handler in case a handler throws an uncaught exception.
 *
 * <p>The default is to use {@link RxJavaPlugins#onError(Throwable)}. Note that any exception
 * thrown by a handler is a fatal error and this method doesn't enable safe error handling, only
 * configurable crash reporting.
 *
 * @param function a function that gets told which sub-transformer failed and should return an
 *     appropriate handler for exceptions thrown.
 */
public RxMobius.SubtypeEffectHandlerBuilder<F, E> withFatalErrorHandler(
    final Function<ObservableTransformer<? extends F, E>, Consumer<Throwable>> function) {
  checkNotNull(function);
  this.onErrorFunction =
      new OnErrorFunction<ObservableTransformer<? extends F, E>, Consumer<Throwable>>() {
        @Override
        public Consumer<Throwable> apply(ObservableTransformer<? extends F, E> effectHandler) {
          try {
            return function.apply(effectHandler);
          } catch (Throwable e) {
            throw new RuntimeException(
                "FATAL: fatal error handler threw exception for effect handler: "
                    + effectHandler,
                e);
          }
        }
      };

  return this;
}
 
源代码3 项目: mobius   文件: Transformers.java
/**
 * Creates an {@link ObservableTransformer} that will flatten the provided {@link Consumer} into
 * the stream as a {@link Completable} every time it receives an effect from the upstream effects
 * observable. This will result in calling the consumer on the specified scheduler, and passing it
 * the requested effect object.
 *
 * @param doEffect the {@link Consumer} to be run every time the effect is requested
 * @param scheduler the {@link Scheduler} to be used when invoking the consumer
 * @param <F> the type of Effect this transformer handles
 * @param <E> these transformers are for effects that do not result in any events; however, they
 *     still need to share the same Event type
 * @return an {@link ObservableTransformer} that can be used with a {@link
 *     RxMobius.SubtypeEffectHandlerBuilder}.
 */
static <F, E> ObservableTransformer<F, E> fromConsumer(
    final Consumer<F> doEffect, @Nullable final Scheduler scheduler) {
  return new ObservableTransformer<F, E>() {
    @Override
    public ObservableSource<E> apply(Observable<F> effectStream) {
      return effectStream
          .flatMapCompletable(
              new Function<F, CompletableSource>() {
                @Override
                public CompletableSource apply(final F effect) {
                  Completable completable =
                      Completable.fromAction(
                          new Action() {
                            @Override
                            public void run() throws Throwable {
                              doEffect.accept(effect);
                            }
                          });
                  return scheduler == null ? completable : completable.subscribeOn(scheduler);
                }
              })
          .toObservable();
    }
  };
}
 
源代码4 项目: java-specialagent   文件: RxJava3Test.java
@Test
public void consumerTest(final MockTracer tracer) {
  final Observable<Integer> observable = createSequentialObservable(tracer, false);
  final List<Integer> result = new ArrayList<>();
  final Consumer<Integer> onNext = consumer(result);

  final Disposable disposable = observable.subscribe(onNext);
  logger.fine(String.valueOf(disposable));

  assertEquals(5, result.size());

  final List<MockSpan> spans = tracer.finishedSpans();
  assertEquals(1, spans.size());

  assertNull(tracer.scopeManager().active());
}
 
源代码5 项目: java-specialagent   文件: RxJava3Test.java
@Test
public void consumerTest2(final MockTracer tracer) {
  final Observable<Integer> observable = createSequentialObservable(tracer, false);
  final List<Integer> result = new ArrayList<>();
  final Consumer<Integer> onNext = consumer(result);
  final List<String> errorList = new ArrayList<>();
  observable.subscribe(onNext, onError(errorList));

  assertEquals(5, result.size());

  final List<MockSpan> spans = tracer.finishedSpans();
  assertEquals(1, spans.size());

  assertTrue(errorList.isEmpty());

  assertNull(tracer.scopeManager().active());
}
 
源代码6 项目: java-specialagent   文件: RxJava3Test.java
@Test
public void consumerTest3(final MockTracer tracer) {
  final Observable<Integer> observable = createSequentialObservable(tracer, false);
  final List<Integer> result = new ArrayList<>();
  final Consumer<Integer> onNext = consumer(result);
  final List<String> completeList = new ArrayList<>();
  final List<String> errorList = new ArrayList<>();
  observable.subscribe(onNext, onError(errorList), onComplete(completeList, tracer));

  assertEquals(5, result.size());
  assertTrue(completeList.contains(COMPLETED));
  assertEquals(1, completeList.size());
  assertTrue(errorList.isEmpty());

  final List<MockSpan> spans = tracer.finishedSpans();
  assertEquals(1, spans.size());

  assertNull(tracer.scopeManager().active());
}
 
源代码7 项目: java-specialagent   文件: RxJava3Test.java
@Test
public void consumerTest3WithError(final MockTracer tracer) {
  final Observable<Integer> observable = createSequentialObservable(tracer, true);
  final List<Integer> result = new ArrayList<>();
  final Consumer<Integer> onNext = consumer(result);
  final List<String> completeList = new ArrayList<>();
  final List<String> errorList = new ArrayList<>();
  observable.subscribe(onNext, onError(errorList), onComplete(completeList, tracer));

  assertEquals(0, result.size());
  assertEquals(0, completeList.size());
  assertEquals(1, errorList.size());

  final List<MockSpan> spans = tracer.finishedSpans();
  assertEquals(1, spans.size());

  assertNull(tracer.scopeManager().active());
}
 
源代码8 项目: RxCache   文件: TestWithGuava.java
public static void main(String[] args) {

        ApplicationContext ctx = new AnnotationConfigApplicationContext(ConfigWithGuava.class);

        RxCache rxCache = (RxCache) ctx.getBean("rxCache");

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {
            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
源代码9 项目: RxCache   文件: TestWithSpring.java
public static void main(String[] args) {

        ApplicationContext ctx = new AnnotationConfigApplicationContext(Config.class);

        RxCache rxCache = (RxCache) ctx.getBean("rxCache");

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {

            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
源代码10 项目: RxCache   文件: TestMapDB.java
public static void main(String[] args) {

        RxCache.config(new RxCache.Builder().memory(new MapDBImpl(100)));

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {
            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
源代码11 项目: RxCache   文件: TestCaffeine.java
public static void main(String[] args) {

        RxCache.config(new RxCache.Builder().memory(new CaffeineImpl(100)));

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {
            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
源代码12 项目: RxCache   文件: TestGuavaCache.java
public static void main(String[] args) {

        RxCache.config(new RxCache.Builder().memory(new GuavaCacheImpl(100)));

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {
            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
源代码13 项目: RxCache   文件: TestDiskImplWithMoshi.java
private static void testObject(RxCache rxCache) {

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test", u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {

            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
源代码14 项目: RxCache   文件: BasePersistence.java
public static void testObject(RxCache rxCache) {

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {

            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
源代码15 项目: RxCache   文件: Test.java
public static void main(String[] args) {

        RxCache.config(new RxCache.Builder());

        RxCache rxCache = RxCache.getRxCache();

        User u = new User();
        u.name = "tony";
        u.password = "123456";
        rxCache.save("test",u);

        Observable<Record<User>> observable = rxCache.load2Observable("test", User.class);

        observable.subscribe(new Consumer<Record<User>>() {

            @Override
            public void accept(Record<User> record) throws Exception {

                User user = record.getData();
                System.out.println(user.name);
                System.out.println(user.password);
            }
        });
    }
 
@SuppressWarnings("CheckReturnValue")
@Test public void fatalExceptionDuringReplayThrown() {
  PublishProcessor<String> subject = PublishProcessor.create();
  Flowable<String> flowable = subject.compose(ReplayingShare.<String>instance());

  flowable.subscribe();
  subject.onNext("Foo");

  Consumer<String> brokenAction = new Consumer<String>() {
    @Override public void accept(String s) {
      throw new OutOfMemoryError("broken!");
    }
  };
  try {
    flowable.subscribe(brokenAction);
    fail();
  } catch (OutOfMemoryError e) {
    assertEquals("broken!", e.getMessage());
  }
}
 
@SuppressWarnings("CheckReturnValue")
@Test public void fatalExceptionDuringReplayThrown() {
  PublishSubject<String> subject = PublishSubject.create();
  Observable<String> observable = subject.compose(ReplayingShare.<String>instance());

  observable.subscribe();
  subject.onNext("Foo");

  Consumer<String> brokenAction = new Consumer<String>() {
    @Override public void accept(String s) {
      throw new OutOfMemoryError("broken!");
    }
  };
  try {
    observable.subscribe(brokenAction);
    fail();
  } catch (OutOfMemoryError e) {
    assertEquals("broken!", e.getMessage());
  }
}
 
源代码18 项目: RxLifecycle   文件: MainActivity.java
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);

    Log.d(TAG, "onCreate()");

    setContentView(R.layout.activity_main);

    // Specifically bind this until onPause()
    Observable.interval(1, TimeUnit.SECONDS)
        .doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                Log.i(TAG, "Unsubscribing subscription from onCreate()");
            }
        })
        .compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long num) throws Exception {
                Log.i(TAG, "Started in onCreate(), running until onPause(): " + num);
            }
        });
}
 
源代码19 项目: RxLifecycle   文件: MainActivity.java
@Override
protected void onStart() {
    super.onStart();

    Log.d(TAG, "onStart()");

    // Using automatic unsubscription, this should determine that the correct time to
    // unsubscribe is onStop (the opposite of onStart).
    Observable.interval(1, TimeUnit.SECONDS)
        .doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                Log.i(TAG, "Unsubscribing subscription from onStart()");
            }
        })
        .compose(this.<Long>bindToLifecycle())
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long num) throws Exception {
                Log.i(TAG, "Started in onStart(), running until in onStop(): " + num);
            }
        });
}
 
源代码20 项目: RxLifecycle   文件: MainActivity.java
@Override
protected void onResume() {
    super.onResume();

    Log.d(TAG, "onResume()");

    // `this.<Long>` is necessary if you're compiling on JDK7 or below.
    //
    // If you're using JDK8+, then you can safely remove it.
    Observable.interval(1, TimeUnit.SECONDS)
        .doOnDispose(new Action() {
            @Override
            public void run() throws Exception {
                Log.i(TAG, "Unsubscribing subscription from onResume()");
            }
        })
        .compose(this.<Long>bindUntilEvent(ActivityEvent.DESTROY))
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long num) throws Exception {
                Log.i(TAG, "Started in onResume(), running until in onDestroy(): " + num);
            }
        });
}
 
源代码21 项目: mobius   文件: RxEventSources.java
/**
 * Create an event source from the given RxJava streams.
 *
 * <p>All streams must be mapped to your event type.
 *
 * @param sources the observables you want to include in this event source
 * @param <E> the event type
 * @return an EventSource based on the provided observables
 */
@SafeVarargs
public static <E> EventSource<E> fromObservables(@NonNull final ObservableSource<E>... sources) {
  final Observable<E> eventSource = Observable.mergeArray(sources);
  return new EventSource<E>() {

    @Nonnull
    @Override
    public com.spotify.mobius.disposables.Disposable subscribe(
        com.spotify.mobius.functions.Consumer<E> eventConsumer) {
      final Disposable disposable =
          eventSource.subscribe(
              new Consumer<E>() {
                @Override
                public void accept(E value) throws Throwable {
                  eventConsumer.accept(value);
                }
              },
              new Consumer<Throwable>() {
                @Override
                public void accept(Throwable error) throws Throwable {
                  RxJavaPlugins.onError(error);
                }
              });
      return new com.spotify.mobius.disposables.Disposable() {
        @Override
        public void dispose() {
          disposable.dispose();
        }
      };
    }
  };
}
 
源代码22 项目: mobius   文件: RxMobius.java
private static <F, E> Consumer<Throwable> defaultOnError(
    final ObservableTransformer<? extends F, E> effectHandler) {
  return new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Throwable {
      RxJavaPlugins.onError(
          new ConnectionException(
              "in effect handler: " + effectHandler.getClass().toString(), throwable));
    }
  };
}
 
源代码23 项目: java-specialagent   文件: RxJava3AgentIntercept.java
@SuppressWarnings("unchecked")
public static Object enter(final Object thiz, final int argc, final Object arg0, final Object arg1, final Object arg2) {
  if (arg0 == null || arg0.getClass().getName().startsWith("io.reactivex.rxjava3.internal") || arg0 instanceof TracingConsumer)
    return NULL;

  if (!isTracingEnabled) {
    isTracingEnabled = true;
    TracingRxJava3Utils.enableTracing();
  }

  if (arg0 instanceof Observer)
    return new TracingObserver<>((Observer<?>)arg0, "observer", GlobalTracer.get());

  if (!(arg0 instanceof Consumer))
    return NULL;

  final TracingConsumer<Object> tracingConsumer;
  if (argc == 1)
    tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, "consumer", GlobalTracer.get());
  else if (argc == 2)
    tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, (Consumer<Throwable>)arg1, "consumer", GlobalTracer.get());
  else if (argc == 3)
    tracingConsumer = new TracingConsumer<>((Consumer<Object>)arg0, (Consumer<Throwable>)arg1, (Action)arg2, "consumer", GlobalTracer.get());
  else
    tracingConsumer = null;

  if (tracingConsumer != null)
    ((Observable<Object>)thiz).subscribe(tracingConsumer);

  return null;
}
 
源代码24 项目: java-specialagent   文件: RxJava3Test.java
@Test
public void consumerTest4(final MockTracer tracer) {
  final Observable<Integer> observable = createSequentialObservable(tracer, false);
  final List<Integer> result = new ArrayList<>();
  final Consumer<Integer> onNext = consumer(result);
  final List<String> subscribeList = new ArrayList<>();
  final String subscribed = "subscribed";
  final Consumer<Object> onSubscribe = new Consumer<Object>() {
    @Override
    public void accept(final Object t) {
      subscribeList.add(subscribed);
    }
  };

  final List<String> completeList = new ArrayList<>();
  final List<String> errorList = new ArrayList<>();
  observable.doOnSubscribe(onSubscribe).subscribe(onNext, onError(errorList), onComplete(completeList, tracer));

  assertEquals(5, result.size());

  assertEquals(1, completeList.size());
  assertTrue(completeList.contains(COMPLETED));

  assertEquals(1, subscribeList.size());
  assertTrue(subscribeList.contains(subscribed));

  assertTrue(errorList.isEmpty());

  final List<MockSpan> spans = tracer.finishedSpans();
  assertEquals(1, spans.size());

  assertNull(tracer.scopeManager().active());
}
 
源代码25 项目: java-specialagent   文件: RxJava3Test.java
@Test
public void consumerTest4WithError(final MockTracer tracer) {
  final Observable<Integer> observable = createSequentialObservable(tracer, true);
  final List<Integer> result = new ArrayList<>();
  final Consumer<Integer> onNext = consumer(result);
  final List<String> subscribeList = new ArrayList<>();
  final Consumer<Object> onSubscribe = new Consumer<Object>() {
    @Override
    public void accept(final Object t) {
      subscribeList.add("subscribed");
    }
  };

  final List<String> completeList = new ArrayList<>();
  final List<String> errorList = new ArrayList<>();
  observable.doOnSubscribe(onSubscribe).subscribe(onNext, onError(errorList), onComplete(completeList, tracer));

  assertEquals(0, result.size());
  assertEquals(0, completeList.size());
  assertEquals(1, subscribeList.size());
  assertEquals(1, errorList.size());

  final List<MockSpan> spans = tracer.finishedSpans();
  assertEquals(1, spans.size());

  assertNull(tracer.scopeManager().active());
}
 
源代码26 项目: java-specialagent   文件: RxJava3Test.java
private static <T>Consumer<T> consumer(final List<T> result) {
  return new Consumer<T>() {
    @Override
    public void accept(final T t) {
      logger.fine(String.valueOf(t));
      result.add(t);
    }
  };
}
 
源代码27 项目: java-specialagent   文件: RxJava3Test.java
private static Consumer<Throwable> onError(final List<String> errorList) {
  return new Consumer<Throwable>() {
    @Override
    public void accept(final Throwable t) {
      errorList.add("error");
    }
  };
}
 
源代码28 项目: RxCache   文件: TestDiskImplWithMoshi.java
private static void testList(RxCache rxCache) {

        List<User> list = new ArrayList<>();

        User u1 = new User();
        u1.name = "tonyList1";
        u1.password = "list1123456";
        list.add(u1);

        User u2 = new User();
        u2.name = "tonyList12";
        u2.password = "list12345";
        list.add(u2);
        rxCache.save("list", list);

        Type type = TypeBuilder
                .newInstance(List.class)
                .addTypeParam(User.class)
                .build();

        Observable<Record<List<User>>> observable = rxCache.load2Observable("list", type);

        observable.subscribe(new Consumer<Record<List<User>>>() {

            @Override
            public void accept(Record<List<User>> record) throws Exception {

                List<User> recordDataList = record.getData();

                if (Preconditions.isNotBlank(recordDataList)) {
                    for (User user : recordDataList) {
                        System.out.println(user.name);
                        System.out.println(user.password);
                    }
                }
            }
        });
    }
 
源代码29 项目: RxCache   文件: TestDiskImplWithMoshi.java
private static void testSet(RxCache rxCache) {

        Set<User> set = new HashSet<>();

        User u1 = new User();
        u1.name = "tonySet1";
        u1.password = "set1123456";
        set.add(u1);

        User u2 = new User();
        u2.name = "tonySet12";
        u2.password = "set12345";
        set.add(u2);
        rxCache.save("set", set);

        Type type = TypeBuilder
                .newInstance(Set.class)
                .addTypeParam(User.class)
                .build();

        Observable<Record<Set<User>>> observable = rxCache.load2Observable("set", type);

        observable.subscribe(new Consumer<Record<Set<User>>>() {

            @Override
            public void accept(Record<Set<User>> record) throws Exception {

                Set<User> recordDataList = record.getData();

                if (Preconditions.isNotBlank(recordDataList)) {
                    for (User user : recordDataList) {
                        System.out.println(user.name);
                        System.out.println(user.password);
                    }
                }
            }
        });
    }
 
源代码30 项目: RxCache   文件: BasePersistence.java
public static void testList(RxCache rxCache) {

        List<User> list = new ArrayList<>();

        User u1 = new User();
        u1.name = "tonyList1";
        u1.password = "list1123456";
        list.add(u1);

        User u2 = new User();
        u2.name = "tonyList12";
        u2.password = "list12345";
        list.add(u2);
        rxCache.save("list", list);

        Type type = TypeBuilder
                .newInstance(List.class)
                .addTypeParam(User.class)
                .build();

        Observable<Record<List<User>>> observable = rxCache.load2Observable("list", type);

        observable.subscribe(new Consumer<Record<List<User>>>() {

            @Override
            public void accept(Record<List<User>> record) throws Exception {

                List<User> recordDataList = record.getData();

                if (Preconditions.isNotBlank(recordDataList)) {
                    for (User user : recordDataList) {
                        System.out.println(user.name);
                        System.out.println(user.password);
                    }
                }
            }
        });
    }
 
 类所在包
 同包方法